In [4]:
import pandas as pd
import numpy as np
import time
import matplotlib.pyplot as plt
import sys
import seaborn
from scipy.stats import *
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler, IndexToString
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

In [5]:
spark = SparkSession.builder \
.master("local") \
.appName("Exercise3")\
.getOrCreate()

In [6]:
df = spark.read.csv('exampleData.csv', header=True)

In [7]:
df.schema

StructType(List(StructField(_c0,StringType,true),StructField(dateTime,StringType,true),StructField(indicator_rain,StringType,true),StructField(precipitation,StringType,true),StructField(indicator_temp,StringType,true),StructField(air_temperature,StringType,true),StructField(indicator_wetb,StringType,true),StructField(wetb,StringType,true),StructField(dewpt,StringType,true),StructField(vappr,StringType,true),StructField(relative_humidity,StringType,true),StructField(msl,StringType,true),StructField(indicator_wdsp,StringType,true),StructField(wind_speed,StringType,true),StructField(indicator_wddir,StringType,true),StructField(wind_from_direction,StringType,true)))

In [8]:
df.dtypes

[('_c0', 'string'),
 ('dateTime', 'string'),
 ('indicator_rain', 'string'),
 ('precipitation', 'string'),
 ('indicator_temp', 'string'),
 ('air_temperature', 'string'),
 ('indicator_wetb', 'string'),
 ('wetb', 'string'),
 ('dewpt', 'string'),
 ('vappr', 'string'),
 ('relative_humidity', 'string'),
 ('msl', 'string'),
 ('indicator_wdsp', 'string'),
 ('wind_speed', 'string'),
 ('indicator_wddir', 'string'),
 ('wind_from_direction', 'string')]

In [9]:
df.show(1, vertical=True)

-RECORD 0--------------------------------
 _c0                 | 14                
 dateTime            | 01-jan-1990 00:00 
 indicator_rain      | 0                 
 precipitation       | 0.3               
 indicator_temp      | 0                 
 air_temperature     | 9.1               
 indicator_wetb      | 0                 
 wetb                | 9.0               
 dewpt               | 8.9               
 vappr               | 11.4              
 relative_humidity   | 99                
 msl                 | 1006.7            
 indicator_wdsp      | 2                 
 wind_speed          | 7                 
 indicator_wddir     | 2                 
 wind_from_direction | 190               
only showing top 1 row



### String Indexer Example

In [10]:
indexer = StringIndexer(inputCol="dateTime" ,outputCol="indexedTime")

In [11]:
indexed = indexer.fit(df).transform(df) # return a df

In [12]:
indexed.show(2, vertical=True) 

-RECORD 0--------------------------------
 _c0                 | 14                
 dateTime            | 01-jan-1990 00:00 
 indicator_rain      | 0                 
 precipitation       | 0.3               
 indicator_temp      | 0                 
 air_temperature     | 9.1               
 indicator_wetb      | 0                 
 wetb                | 9.0               
 dewpt               | 8.9               
 vappr               | 11.4              
 relative_humidity   | 99                
 msl                 | 1006.7            
 indicator_wdsp      | 2                 
 wind_speed          | 7                 
 indicator_wddir     | 2                 
 wind_from_direction | 190               
 indexedTime         | 2667.0            
-RECORD 1--------------------------------
 _c0                 | 15                
 dateTime            | 01-jan-1990 01:00 
 indicator_rain      | 0                 
 precipitation       | 0.2               
 indicator_temp      | 0          

In [13]:
indexed.dtypes

[('_c0', 'string'),
 ('dateTime', 'string'),
 ('indicator_rain', 'string'),
 ('precipitation', 'string'),
 ('indicator_temp', 'string'),
 ('air_temperature', 'string'),
 ('indicator_wetb', 'string'),
 ('wetb', 'string'),
 ('dewpt', 'string'),
 ('vappr', 'string'),
 ('relative_humidity', 'string'),
 ('msl', 'string'),
 ('indicator_wdsp', 'string'),
 ('wind_speed', 'string'),
 ('indicator_wddir', 'string'),
 ('wind_from_direction', 'string'),
 ('indexedTime', 'double')]

### Task - Predict air_temp from date, humidity, wind speed and msl

In [14]:
# Parse data type
parsed_df = indexed.selectExpr("cast(dateTime as string) dateTime", 
                          "cast(relative_humidity as float) relative_humidity",
                          "cast(wind_speed as float) wind_speed",
                          "cast(indexedTime as double) indexedTime",
                          "cast(msl as float) msl",
                          "cast(air_temperature as float) label") # df after parsed

In [15]:
# Put all discreted data/features into 1D vector (as 1 column in the df)
vectorAssembler = VectorAssembler(inputCols = ['relative_humidity', 'wind_speed', 'indexedTime', 'msl'], handleInvalid="skip",outputCol ='features')

In [16]:
vectorized_df = vectorAssembler.transform(parsed_df)

In [17]:
vectorized_df.show(1)

+-----------------+-----------------+----------+-----------+------+-----+--------------------+
|         dateTime|relative_humidity|wind_speed|indexedTime|   msl|label|            features|
+-----------------+-----------------+----------+-----------+------+-----+--------------------+
|01-jan-1990 00:00|             99.0|       7.0|     2667.0|1006.7|  9.1|[99.0,7.0,2667.0,...|
+-----------------+-----------------+----------+-----------+------+-----+--------------------+
only showing top 1 row



In [18]:
dataset = vectorized_df.select("features", "label") # remove unnecessary cols

In [19]:
dataset.show(1)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[99.0,7.0,2667.0,...|  9.1|
+--------------------+-----+
only showing top 1 row



### Model - Gradient-Boosted Trees

In [20]:
# Split data
(trainData, testData) = dataset.randomSplit([0.8, 0.2])

In [21]:
print("Train sample: {}".format(trainData.count()))
print("Test sample: {}".format(testData.count()))

Train sample: 151636
Test sample: 37512


In [22]:
testData.show(1)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0,0.0,232493.0...| 15.0|
+--------------------+-----+
only showing top 1 row



In [23]:
#### NORMAL 

In [24]:
gbt = GBTRegressor(featuresCol="features", maxIter=30, maxDepth = 11)
s = time.time()
model_direct = gbt.fit(trainData)
print(time.time() - s)

168.6327097415924


In [25]:
pred_direct = model_direct.transform(testData)

In [26]:
pred_direct.show(5)

+--------------------+-----+-----------------+
|            features|label|       prediction|
+--------------------+-----+-----------------+
|[0.0,0.0,232493.0...| 15.0| 12.4332721600491|
|[0.0,0.0,242550.0...| 13.1|17.03589734558741|
|[0.0,1.0,167197.0...|  6.2|9.563238967033106|
|[0.0,1.0,175267.0...|  8.8|6.457641089715971|
|[0.0,1.0,175268.0...|  9.6|6.457641089715971|
+--------------------+-----+-----------------+
only showing top 5 rows



In [27]:
#### WITH PIPELINE

In [28]:
# Transform step - Index features (if needed)
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", handleInvalid="skip").fit(dataset)

# Transform step - Index labels (if needed)
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(dataset)

# Transform step - Indexed to orginal (if the labels are indexed)
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",labels=labelIndexer.labels)

In [29]:
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=30, maxDepth = 11)

In [30]:
# No idea what the param meaning?
# gbt.explainParam("maxIter") # explain specific param
# gbt.explainParams() # explain all params

In [31]:
pipeline = Pipeline(stages=[featureIndexer, gbt])
# pipeline = Pipeline(stages=[featureIndexer, labelIndexer, gbt])
# pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt, labelConverter])

In [32]:
s = time.time()
model_gbt = pipeline.fit(trainData)
print(time.time() - s)

169.29224920272827


In [33]:
pred = model_gbt.transform(testData) # return a df

In [34]:
pred.show(5)

+--------------------+-----+--------------------+-----------------+
|            features|label|     indexedFeatures|       prediction|
+--------------------+-----+--------------------+-----------------+
|[0.0,0.0,232493.0...| 15.0|[0.0,0.0,232493.0...| 12.4332721600491|
|[0.0,0.0,242550.0...| 13.1|[0.0,0.0,242550.0...|17.03589734558741|
|[0.0,1.0,167197.0...|  6.2|[0.0,1.0,167197.0...|9.563238967033106|
|[0.0,1.0,175267.0...|  8.8|[0.0,1.0,175267.0...|6.457641089715971|
|[0.0,1.0,175268.0...|  9.6|[0.0,1.0,175268.0...|6.457641089715971|
+--------------------+-----+--------------------+-----------------+
only showing top 5 rows



### Model - Random Forest

In [35]:
rforest = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=40, maxDepth=7)

In [36]:
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rforest, labelConverter])

In [37]:
s = time.time()
model_forest = pipeline.fit(trainData)
print(time.time() - s)

28.511860132217407


In [38]:
pred_forest = model_forest.transform(testData).select("features", "label", "predictedLabel")

In [39]:
pred_forest.show(10)

+--------------------+-----+--------------+
|            features|label|predictedLabel|
+--------------------+-----+--------------+
|[0.0,0.0,232493.0...| 15.0|          16.5|
|[0.0,0.0,242550.0...| 13.1|          15.3|
|[0.0,1.0,167197.0...|  6.2|           4.8|
|[0.0,1.0,175267.0...|  8.8|          16.0|
|[0.0,1.0,175268.0...|  9.6|          16.0|
|[0.0,1.0,220927.0...| 12.9|          16.0|
|[0.0,2.0,172589.0...| 16.3|          16.0|
|[0.0,2.0,177269.0...|  6.3|          16.0|
|[0.0,2.0,180641.0...| 15.8|          16.0|
|[0.0,2.0,220926.0...| 13.3|          16.0|
+--------------------+-----+--------------+
only showing top 10 rows



### Evaluation metrics

In [40]:
rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse") # root mean square err

In [None]:
rmse_result = rmse.evaluate(pred)

In [None]:
rmse_result

In [None]:
rsquare = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2") # r-squared

In [None]:
rsquare_result = rsquare.evaluate(pred)

In [None]:
rsquare_result

In [None]:
mae = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae") # mean absolute err

In [None]:
mae_result = mae.evaluate(pred)

In [None]:
mae_result

### Plotting

In [None]:
pred_pd = pred.toPandas()

In [None]:
seaborn.set(style="whitegrid", font_scale = 1.9)

In [None]:
# Using Seaborn
fig, ax = plt.subplots()
seaborn.set(color_codes=True)
seaborn.set(rc={'figure.figsize':(15, 10)})
seaborn.regplot(x="label", y="prediction", fit_reg=False, ax=ax,data= pred_pd,scatter_kws={"color": "green"});
seaborn.regplot(x="label", y="prediction",scatter=False, ax=ax, data= pred_pd, line_kws={"color": "red"});

In [None]:
# Using matplotlib
fig_, ax_ = plt.subplots(figsize=(15, 10))
ax_.scatter(pred_pd.label, pred_pd.prediction, color='green', marker='o')
# Line plot (regression line)
m, b = np.polyfit(pred_pd.label, pred_pd.prediction, 1) # compute linear regression
ax_.plot(pred_pd.label, m*pred_pd.label + b, color='red')
ax_.set_xlabel('label', fontsize=20)
ax_.set_ylabel('prediction', fontsize=20)
plt.xticks(fontsize=15)
plt.yticks(fontsize=15)
plt.show()