In [2]:
# Section must be included at the beginning of each new notebook. Remember to change the app name.
# If you're using VirtualBox, change the below to '/home/user/spark-2.1.1-bin-hadoop2.7'
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('linear_regression_adv').getOrCreate()

# If you're getting an error with numpy, please type 'sudo pip3 install numpy --user' into the console.
# If you're getting an error with another package, type 'sudo pip3 install PACKAGENAME --user'. 
# Replace PACKAGENAME with the relevant package (such as pandas, etc).
from pyspark.ml.regression import LinearRegression

# First, let's import the data. Note that we can infer the schema as it's a CSV file.
df = spark.read.csv("Dataset/Bank.csv",inferSchema=True,header=True)

In [3]:
# Let's explore. Here's the first row of the data.
print(df.head())

# And the entire data structure. 
df.printSchema()

Row(age=56, job='housemaid', marital='married', education='basic.4y', default='no', housing='no', loan='no', contact='telephone', month='may', day='mon', duration=261, campaign=1, pdays=999, previous=0, poutcome='nonexistent', emp var rate=1.1, cons price idx=93.994, cons conf idx=-36.4, euribor3m=4.857, nr employed=5191.0, Deposit='no')
root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- emp var rate: double (nullable = true)
 |-- cons price idx: double (nulla

In [4]:
# Now that we understand the data's features, let's use a Python package to neatly describe the data.
import pandas as pd
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
age,41188,40.02406040594348,10.421249980934043,17,98
job,41188,,,admin.,unknown
marital,41188,,,divorced,unknown
education,41188,,,basic.4y,unknown
default,41188,,,no,yes
housing,41188,,,no,yes
loan,41188,,,no,yes
contact,41188,,,cellular,telephone
month,41188,,,apr,sep


In [5]:
Pdf=df.toPandas()

In [6]:
Pdf.head()

Unnamed: 0,age,job,marital,education,default,housing,loan,contact,month,day,...,campaign,pdays,previous,poutcome,emp var rate,cons price idx,cons conf idx,euribor3m,nr employed,Deposit
0,56,housemaid,married,basic.4y,no,no,no,telephone,may,mon,...,1.0,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
1,57,services,married,high.school,unknown,no,no,telephone,may,mon,...,1.0,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
2,37,services,married,high.school,no,yes,no,telephone,may,mon,...,1.0,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
3,40,admin.,married,basic.6y,no,no,no,telephone,may,mon,...,1.0,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
4,56,services,married,high.school,no,no,yes,telephone,may,mon,...,1.0,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no


In [7]:
Pdf['campaign']=Pdf['campaign'].fillna(Pdf['campaign'].mean())

In [8]:
Pdf.head(20)

Unnamed: 0,age,job,marital,education,default,housing,loan,contact,month,day,...,campaign,pdays,previous,poutcome,emp var rate,cons price idx,cons conf idx,euribor3m,nr employed,Deposit
0,56,housemaid,married,basic.4y,no,no,no,telephone,may,mon,...,1.0,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
1,57,services,married,high.school,unknown,no,no,telephone,may,mon,...,1.0,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
2,37,services,married,high.school,no,yes,no,telephone,may,mon,...,1.0,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
3,40,admin.,married,basic.6y,no,no,no,telephone,may,mon,...,1.0,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
4,56,services,married,high.school,no,no,yes,telephone,may,mon,...,1.0,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
5,45,services,married,basic.9y,unknown,no,no,telephone,may,mon,...,1.0,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
6,59,admin.,married,professional.course,no,no,no,telephone,may,mon,...,1.0,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
7,41,blue-collar,married,unknown,unknown,no,no,telephone,may,mon,...,1.0,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
8,24,technician,single,professional.course,no,yes,no,telephone,may,mon,...,1.0,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
9,25,services,single,high.school,no,yes,no,telephone,may,mon,...,1.0,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no


In [9]:

Pdf.drop(['job','marital','education','cons conf idx','housing'],axis=1)

Unnamed: 0,age,default,loan,contact,month,day,duration,campaign,pdays,previous,poutcome,emp var rate,cons price idx,euribor3m,nr employed,Deposit
0,56,no,no,telephone,may,mon,261,1.000000,999,0,nonexistent,1.1,93.994,4.857,5191.0,no
1,57,unknown,no,telephone,may,mon,149,1.000000,999,0,nonexistent,1.1,93.994,4.857,5191.0,no
2,37,no,no,telephone,may,mon,226,1.000000,999,0,nonexistent,1.1,93.994,4.857,5191.0,no
3,40,no,no,telephone,may,mon,151,1.000000,999,0,nonexistent,1.1,93.994,4.857,5191.0,no
4,56,no,yes,telephone,may,mon,307,1.000000,999,0,nonexistent,1.1,93.994,4.857,5191.0,no
5,45,unknown,no,telephone,may,mon,198,1.000000,999,0,nonexistent,1.1,93.994,4.857,5191.0,no
6,59,no,no,telephone,may,mon,139,1.000000,999,0,nonexistent,1.1,93.994,4.857,5191.0,no
7,41,unknown,no,telephone,may,mon,217,1.000000,999,0,nonexistent,1.1,93.994,4.857,5191.0,no
8,24,no,no,telephone,may,mon,380,1.000000,999,0,nonexistent,1.1,93.994,4.857,5191.0,no
9,25,no,no,telephone,may,mon,50,1.000000,999,0,nonexistent,1.1,93.994,4.857,5191.0,no


In [10]:
sdf = spark.createDataFrame(Pdf)

In [11]:
sdf=sdf.drop('job','marital','education','cons conf idx','housing')

In [12]:
sdf.show()

+---+-------+----+---------+-----+---+--------+-----------------+-----+--------+-----------+------------+--------------+---------+-----------+-------+
|age|default|loan|  contact|month|day|duration|         campaign|pdays|previous|   poutcome|emp var rate|cons price idx|euribor3m|nr employed|Deposit|
+---+-------+----+---------+-----+---+--------+-----------------+-----+--------+-----------+------------+--------------+---------+-----------+-------+
| 56|     no|  no|telephone|  may|mon|     261|              1.0|  999|       0|nonexistent|         1.1|        93.994|    4.857|     5191.0|     no|
| 57|unknown|  no|telephone|  may|mon|     149|              1.0|  999|       0|nonexistent|         1.1|        93.994|    4.857|     5191.0|     no|
| 37|     no|  no|telephone|  may|mon|     226|              1.0|  999|       0|nonexistent|         1.1|        93.994|    4.857|     5191.0|     no|
| 40|     no|  no|telephone|  may|mon|     151|              1.0|  999|       0|nonexistent|  

In [13]:
# Import VectorAssembler and Vectors
from pyspark.ml.feature import VectorAssembler

# The input columns are the feature column names, and the output column is what you'd like the new column to be named. 
vector_assembler = VectorAssembler(inputCols =  ['age', 'campaign','duration', 'previous', 'pdays', 'emp var rate', 'euribor3m', 'nr employed'], outputCol = 'TermDeposit')


# Now that we've created the assembler variable, let's actually transform the data.
vector_output = vector_assembler.transform(df)

# Using print schema, you see that the features output column has been added. 
vector_output.printSchema()

# You can see that the features column is a DenseVector that combines the various features as expected.
vector_output.head(1)

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- emp var rate: double (nullable = true)
 |-- cons price idx: double (nullable = true)
 |-- cons conf idx: double (nullable = true)
 |-- euribor3m: double (nullable = true)
 |-- nr employed: double (nullable = true)
 |-- Deposit: string (nullable = true)
 |-- TermDeposit: vector (nullable = true)



[Row(age=56, job='housemaid', marital='married', education='basic.4y', default='no', housing='no', loan='no', contact='telephone', month='may', day='mon', duration=261, campaign=1, pdays=999, previous=0, poutcome='nonexistent', emp var rate=1.1, cons price idx=93.994, cons conf idx=-36.4, euribor3m=4.857, nr employed=5191.0, Deposit='no', TermDeposit=DenseVector([56.0, 1.0, 261.0, 0.0, 999.0, 1.1, 4.857, 5191.0]))]

SyntaxError: positional argument follows keyword argument (<ipython-input-14-a30f42703da0>, line 2)

In [20]:
sdf.printSchema()

root
 |-- age: long (nullable = true)
 |-- default: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- duration: long (nullable = true)
 |-- campaign: double (nullable = true)
 |-- pdays: long (nullable = true)
 |-- previous: long (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- emp var rate: double (nullable = true)
 |-- cons price idx: double (nullable = true)
 |-- euribor3m: double (nullable = true)
 |-- nr employed: double (nullable = true)
 |-- Deposit: integer (nullable = true)



In [15]:
# Because the features have been combined into one vector, we no longer need them. Below we select the features and label.
vector_output = vector_output.select(['Deposit', 'TermDeposit'])

# You can see that the dataframe now only contains two columns. 
print(vector_output.head(1))
vector_output.show(3)

[Row(Deposit='no', TermDeposit=DenseVector([56.0, 1.0, 261.0, 0.0, 999.0, 1.1, 4.857, 5191.0]))]
+-------+--------------------+
|Deposit|         TermDeposit|
+-------+--------------------+
|     no|[56.0,1.0,261.0,0...|
|     no|[57.0,1.0,149.0,0...|
|     no|[37.0,1.0,226.0,0...|
+-------+--------------------+
only showing top 3 rows



In [17]:
# Let's do a randomised 70/30 split. Remember, you should explain why you chose a particular split. 
train_data,test_data = vector_output.randomSplit([0.7,0.3])


In [18]:
# Importing the LR package.
from pyspark.ml.regression import LinearRegression

# Instantiate the instance.
lr = LinearRegression(featuresCol='TermDeposit', labelCol='Deposit')

# Fit the training data.
lr_model = lr.fit(train_data)

# Print the coefficients.
print("Coefficients: " + str(lr_model.coefficients))

# Print the intercept.
print("Intercept: " + str(lr_model.intercept) + "\n")

# Summarise the model and print out some evaluation metrics.
training_summary = lr_model.summary

# Print RMSE. 
print("RMSE: " + str(training_summary.rootMeanSquaredError))

# Print R2.
print("R2: " + str(training_summary.r2))

IllegalArgumentException: 'requirement failed: Column Deposit must be of type NumericType but was actually of type StringType.'

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import IntegerType
sdf = sdf.withColumn("Deposit", sdf["Deposit"].cast(IntegerType()))
sdf.columns

In [None]:
# The input columns are the feature column names, and the output column is what you'd like the new column to be named. 
#vector_assembler = VectorAssembler(inputCols = ['age', 'loan', 'month', 'default', 'contact', 'month' 'campaign', 'previous', 'pdays', 'poutcome', 'emp var rate', 'cons price index', 'euribor3m', 'nr employed'], outputCol = 'TermDeposit'

In [None]:
sdf.printSchema()

In [None]:
# The input columns are the feature column names, and the output column is what you'd like the new column to be named. 
assembler = VectorAssembler(inputCols = ['age', 'campaign','duration', 'previous', 'pdays', 'emp var rate', 'euribor3m', 'nr employed'], outputCol = 'TermDeposit')




In [None]:
output = assembler.transform(sdf)


In [None]:
output.take(1)

In [None]:
output.printSchema()

In [None]:
final_data = output.select(['TermDeposit', 'Deposit'])
final_data.show(3)

In [None]:
final_data.printSchema()

In [None]:

train, test = final_data.randomSplit([0.7, 0.3])
train.describe().show()

In [None]:
output.take(1)

In [None]:
# These are the default values:
# featuresCol: What is the features column named? 
# labelCol: What is the label column named?
lr = LinearRegression(featuresCol='TermDeposit', labelCol='Deposit')

In [None]:
# Fit/train the model. Fit the model using the training data. Note that there is no training/testing split so far.
lrModel = lr.fit(train)

In [None]:
# Summarize the model over the training set and print out some metrics.
trainingSummary = lrModel.summary

# Let's see some examples from the documentation:
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

# There is some other, potentially more interesting, information that we can see.
trainingSummary.residuals.show()

# Print Root Mean Squared Error. 
print("RMSE: {}".format(trainingSummary.rootMeanSquaredError))

# Print R-Squared.
print("R2: {}".format(trainingSummary.r2))