<a href="https://colab.research.google.com/github/roitraining/SparkforDataScientists/blob/Development/Ch05_RegressionAnalysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Initialize the spark environment and load the helper functions we have provided.

In [0]:
import sys

rootpath = '/home/student/ROI/Spark/'
datapath = f'{rootpath}datasets/'
sys.path.append(rootpath)
import pyspark_helpers as pyh
from pyspark_helpers import *
sc, spark, conf = initspark()

import pandas as pd
import matplotlib as mp
import numpy
from matplotlib import pyplot as plt

from pyspark_helpers import display

### Read in a simple dataset of Boston Housing Prices.

In [0]:
filename = 'boston.csv'
df = spark.read.csv(f'{datapath}/finance/{filename}', header = True, inferSchema = True)
display(df)
df.printSchema()

# Save a pointer to the raw data
dfRaw = df



### Let's look at the result of StringIndex to understand how it works.

In [0]:
from pyspark.ml.feature import StringIndexer
col = 'TOWN'
indexer = StringIndexer(inputCol = col, outputCol = col+'_Index')
x1 = indexer.fit(df).transform(df).select(col, col+'_Index').distinct()
display(x1.orderBy(col))
display(x1.orderBy(col+'_Index'))



### Now try it with a convenient helper function we wrote to encode a list of multiple columns automatically.

In [0]:
x2 = pyh.StringIndexEncode(df, ['TOWN', 'TRACT'])
display(x2)


### Let's take a look at how OneHotEncoder works.

In [0]:
col = 'TOWN'
from pyspark.ml.feature import OneHotEncoderEstimator

encoder = OneHotEncoderEstimator(inputCols=[col + '_Index'], outputCols=[col+'_Vector'])
display(encoder.fit(x2).transform(x2).orderBy(col + '_Index'))


### Now try our convenient helper function. Note that it automatically called StringIndexer first so we can work on the raw string version of the column.

In [0]:
x = pyh.OneHotEncode(x2, ['TOWN', 'TRACT'])
display (x)

### Let's have a look at the Median Value, which is the target we want to predict.
Spark does not have plotting of it's own, instead we bring back the data to the driver to plot. So we need to make sure not to bring back more than the driver can handle.
The .toPandas method will act like collect but bring it back as a Pandas DataFrame which is easily plotted

In [0]:
%matplotlib inline
import pandas as pd
import seaborn as sns
#sns.distplot(df.toPandas()['MEDV'])

sns.distplot(df.toPandas()['MEDV'])
plt.show()



### There's some outlier data there past 48 so let's just eliminate it for now.

In [0]:
sns.distplot(df.where('MEDV < 48').toPandas()['MEDV'])
print(df.columns)

# If we want to filter out the outliers
dfRaw = dfRaw.where('MEDV < 48')

### Let's put it all together now. Identify the categorical and numeric features and target.


In [0]:
# if filename == 'avocado.csv':
#     df = dfRaw.withColumnRenamed('4046','PLU4046').withColumnRenamed('4225','PLU4225').withColumnRenamed('4770','PLU4770')
#     df.createOrReplaceTempView('dfRaw')
#     df.printSchema()

#     sql = '''select AveragePrice as target, `Total Volume` as totalvolume
#     , PLU4046, PLU4225, PLU4770
#     , `Small Bags` as smallbags, `Large Bags` as largebags, `XLarge Bags` as xlargebags
#     , type, year, region
#     FROM dfRaw'''

#     df = spark.sql(sql)
#     print(df)

#     numeric_features = ['totalvolume','PLU4046', 'PLU4225', 'PLU4770', 'smallbags', 'largebags', 'xlargebags']
#     categorical_features = ['type', 'year','region']
#     target_label = 'target'
#     print(df.take(1))
# else:
numeric_features = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO']
categorical_features = ['TOWN'] #['TOWN', 'TRACT']
target_label = 'MEDV'
df = dfRaw.select(categorical_features + numeric_features + [target_label])
df.printSchema()

print ('******')
display(df.describe())

print ('******')
display(df)

### Turn the DataFrame into vectors.
Use our MakeMLDataFrame helper function to automatically encode the list of categorical values, and bundle everything up into a features and target vector as needed for ML training


In [0]:
dfML = pyh.MakeMLDataFrame(df, categorical_features, numeric_features, target_label, False)
display(dfML)
dfML.printSchema()



### Split the dataset into train and test.

In [0]:
train, test = dfML.randomSplit([.7,.3], seed = 1000)
print (f'Training set row count {train.count()}')
print (f'Testing set row count {test.count()}')



### Run Linear Regression.

In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='target', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lrModel = lr.fit(train)
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

print("Root Mean Squared Error: {}\nR Squared (R2) {}".format(lrModel.summary.rootMeanSquaredError, lrModel.summary.r2))


In [0]:
#lrModel.write().overwrite().save('lrModel')
#lrModel2 = lrModel2 = LinearRegression.load('lrModel')


### Run test data.


In [0]:
lrPredictions = lrModel.transform(test)
display(lrPredictions.select("prediction","target","features"), 30)
from pyspark.ml.evaluation import RegressionEvaluator
lrEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="target",metricName="r2")
testResult = lrModel.evaluate(test)
print("Root Mean Squared Error on Test set: {}".format(testResult.rootMeanSquaredError))

### Try Decision Tree Regression.

In [0]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'target')
dtModel = dt.fit(train)
dtPredictions = dtModel.transform(test)
display(dtPredictions.select("prediction","target","features"), 30)
important = dtModel.featureImportances
print(type(important), important)
#importantDict = zip(important[1], important[2])
#print (importantDict)
print (important[3])
from pyspark.ml.evaluation import RegressionEvaluator
dtEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="target",metricName="rmse")
testResult = dtEvaluator.evaluate(dtPredictions)
print("Root Mean Squared Error: {}".format(testResult))
dfML.take(1)

### Try Gradient Boosted Tree.

In [0]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'target', maxIter=10)
gbtModel = gbt.fit(train)
gbtPredictions = gbtModel.transform(test)
display(gbtPredictions.select('prediction', 'target', 'features'), 20)
from pyspark.ml.evaluation import RegressionEvaluator
gbtEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="target",metricName="rmse")
testResult = gbtEvaluator.evaluate(gbtPredictions)
print("Root Mean Squared Error: {}".format(testResult))
