# Machine Learning Methods with Apache Spark

In this notebook we’ll analyze a real-world dataset and apply machine learning on it using Apache Spark.

Let’s start by downloading the dataset and creating a dataframe. This dataset can be found on DAX, the IBM Data Asset Exchange and can be downloaded for free.

https://developer.ibm.com/exchanges/data/all/jfk-weather-data/

## Import data and libraries

In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark import SparkConf
from pyspark import SparkContext
## local[K] : Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine. ##
conf = pyspark.SparkConf().setAppName('S_AppName').setMaster('local[4]')
conf.set('spark.driver.cores',4)
conf.get('spark.driver.cores','None')
sc = pyspark.SparkContext(conf=conf)
sc

In [2]:
spark = pyspark.sql.SparkSession(sc)

In [3]:
df = spark.read.option("header", "true").option("inferSchema","true")\
.csv('C:/Users/Sotiris/Documents/Coursera/IBM/IBM AI Engineering Professional Certificate/2. Scalable Machine Learning on Big Data using Apache Spark/noaa-weather-data-jfk-airport/jfk_weather.csv')

In [4]:
df.createOrReplaceTempView('df')
df.show()

+----------+--------------------+---------+--------+---------+----------------+----------+--------------------+----------------+-----------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+----------------------+---------------+-------------------+-------------------+---------------------+----------------------+--------------------+----------------------+------------+----------------------+-----------------------+-----------------------+-----------------------+------------------------------+----------------------------+------------------------+-----------------------+----------------------+----------------------+------------+-----------+------------+-----------+-------------+--------------+---------------------------+----------------------------+---------------------+------------------+-----------------+-----------------------+---------------------------+------------------+------------------+---------------+

In [5]:
import random
random.seed(42)

from pyspark.sql.functions import translate, col

df_cleaned = df \
    .withColumn("HOURLYWindSpeed", df.HOURLYWindSpeed.cast('double')) \
    .withColumn("HOURLYWindDirection", df.HOURLYWindDirection.cast('double')) \
    .withColumn("HOURLYStationPressure", translate(col("HOURLYStationPressure"), "s,", "")) \
    .withColumn("HOURLYPrecip", translate(col("HOURLYPrecip"), "s,", "")) \
    .withColumn("HOURLYRelativeHumidity", translate(col("HOURLYRelativeHumidity"), "*", "")) \
    .withColumn("HOURLYDRYBULBTEMPC", translate(col("HOURLYDRYBULBTEMPC"), "*", "")) \

df_cleaned =   df_cleaned \
                    .withColumn("HOURLYStationPressure", df_cleaned.HOURLYStationPressure.cast('double')) \
                    .withColumn("HOURLYPrecip", df_cleaned.HOURLYPrecip.cast('double')) \
                    .withColumn("HOURLYRelativeHumidity", df_cleaned.HOURLYRelativeHumidity.cast('double')) \
                    .withColumn("HOURLYDRYBULBTEMPC", df_cleaned.HOURLYDRYBULBTEMPC.cast('double')) \

df_filtered = df_cleaned.filter("""
    HOURLYWindSpeed <> 0
    and HOURLYWindDirection <> 0
    and HOURLYStationPressure <> 0
    and HOURLYPressureTendency <> 0
    and HOURLYPressureTendency <> 0
    and HOURLYPrecip <> 0
    and HOURLYRelativeHumidity <> 0
    and HOURLYDRYBULBTEMPC <> 0
""")

In [6]:
df_filtered.columns

['STATION',
 'STATION_NAME',
 'ELEVATION',
 'LATITUDE',
 'LONGITUDE',
 'DATE',
 'REPORTTPYE',
 'HOURLYSKYCONDITIONS',
 'HOURLYVISIBILITY',
 'HOURLYPRSENTWEATHERTYPE',
 'HOURLYDRYBULBTEMPF',
 'HOURLYDRYBULBTEMPC',
 'HOURLYWETBULBTEMPF',
 'HOURLYWETBULBTEMPC',
 'HOURLYDewPointTempF',
 'HOURLYDewPointTempC',
 'HOURLYRelativeHumidity',
 'HOURLYWindSpeed',
 'HOURLYWindDirection',
 'HOURLYWindGustSpeed',
 'HOURLYStationPressure',
 'HOURLYPressureTendency',
 'HOURLYPressureChange',
 'HOURLYSeaLevelPressure',
 'HOURLYPrecip',
 'HOURLYAltimeterSetting',
 'DAILYMaximumDryBulbTemp',
 'DAILYMinimumDryBulbTemp',
 'DAILYAverageDryBulbTemp',
 'DAILYDeptFromNormalAverageTemp',
 'DAILYAverageRelativeHumidity',
 'DAILYAverageDewPointTemp',
 'DAILYAverageWetBulbTemp',
 'DAILYHeatingDegreeDays',
 'DAILYCoolingDegreeDays',
 'DAILYSunrise',
 'DAILYSunset',
 'DAILYWeather',
 'DAILYPrecip',
 'DAILYSnowfall',
 'DAILYSnowDepth',
 'DAILYAverageStationPressure',
 'DAILYAverageSeaLevelPressure',
 'DAILYAverageWind

What is the correlation between HOURLYWindSpeed and HOURLYPressureTendency?

In [7]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYWindDirection","HOURLYStationPressure","HOURLYPressureTendency"],
                                  outputCol="features")
df_pipeline = vectorAssembler.transform(df_filtered)
from pyspark.ml.stat import Correlation
Correlation.corr(df_pipeline,"features").head()[0].toArray()

array([[ 1.        ,  0.06306013, -0.4204518 ,  0.06432947],
       [ 0.06306013,  1.        , -0.19199348, -0.27956253],
       [-0.4204518 , -0.19199348,  1.        ,  0.00581104],
       [ 0.06432947, -0.27956253,  0.00581104,  1.        ]])

## Pipelines and Regression

Let's try to predict "HOURLYWindSpeed" from "HOURLYWindDirection", "ELEVATION" and "HOURLYStationPressure.

Since this is supervised learning, let’s split our data into train (80%) and test (20%) set.

In [8]:
splits = df_filtered.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [9]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, Normalizer
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline

vectorAssembler = VectorAssembler(inputCols=[
                                    "HOURLYWindDirection",
                                    "ELEVATION",
                                    "HOURLYStationPressure"],
                                  outputCol="features")

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

Now we define a function for evaluating our regression prediction performance. We’re using RMSE (Root Mean Squared Error) here, the smaller the better…

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator
def regression_metrics(prediction):
    evaluator = RegressionEvaluator(labelCol="HOURLYWindSpeed", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(prediction)
    print("RMSE on test data = %g" % rmse)

In [11]:
#Linear Regression

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol="HOURLYWindSpeed", featuresCol='features', maxIter=100, regParam=0.0, elasticNetParam=0.0)
pipeline = Pipeline(stages=[vectorAssembler, normalizer,lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
regression_metrics(prediction)

RMSE on test data = 6.18918


In [12]:
prediction[['HOURLYWindSpeed','features','prediction']].show()

+---------------+-----------------+------------------+
|HOURLYWindSpeed|         features|        prediction|
+---------------+-----------------+------------------+
|           20.0| [70.0,3.4,29.84]| 13.83868514154625|
|           23.0| [40.0,3.4,29.58]| 16.51819466174169|
|           32.0|[270.0,3.4,28.91]|23.042196952840072|
|           26.0|[260.0,3.4,28.96]|22.546451582496616|
|           18.0| [80.0,3.4,30.08]|11.387220187364221|
|           23.0| [70.0,3.4,29.99]| 12.31426600817099|
|           13.0| [20.0,3.4,29.47]| 17.66089070778014|
|           20.0|[140.0,3.4,29.68]| 15.37797183167504|
|            9.0| [180.0,3.4,29.6]|  16.1414180063486|
|            9.0|[200.0,3.4,29.64]|15.710117555885233|
|           14.0| [90.0,3.4,29.66]|15.643199420033284|
|           10.0|[120.0,3.4,29.32]|19.061366433339003|
|            7.0|[310.0,3.4,29.73]| 14.65912832726201|
|           13.0|[160.0,3.4,29.84]|13.727136074511463|
|           14.0|[190.0,3.4,29.84]|13.689953052166572|
|         

In [13]:
#Linear Regression normalized

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol="HOURLYWindSpeed", featuresCol='features_norm', maxIter=100, regParam=0.0, elasticNetParam=0.0)
pipeline = Pipeline(stages=[vectorAssembler, normalizer,lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
regression_metrics(prediction)

RMSE on test data = 6.55505


In [14]:
#GBTRegressor

from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(labelCol="HOURLYWindSpeed", maxIter=100)
pipeline = Pipeline(stages=[vectorAssembler, normalizer,gbt])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
regression_metrics(prediction)

RMSE on test data = 7.20118


## Classification

Now let’s switch gears. Previously, we tried to predict "HOURLYWindSpeed", but now we predict "HOURLYWindDirection". In order to turn this into a classification problem we discretize the value using the Bucketizer. The new feature is called "HOURLYWindDirectionBucketized" with values 0 or 1 meaning South or North wind.

In [15]:
from pyspark.ml.feature import Bucketizer, OneHotEncoder

bucketizer = Bucketizer(splits=[ 0, 180, float('Inf') ],inputCol="HOURLYWindDirection", outputCol="HOURLYWindDirectionBucketized")
encoder = OneHotEncoder(inputCol="HOURLYWindDirectionBucketized", outputCol="HOURLYWindDirectionOHE")

Again, we define a function in order to assess how we perform. Here we just use the accuracy measure which gives us the fraction of correctly classified examples. 0 is bad, 1 is good.

In [16]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
def classification_metrics(prediction):
    mcEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("HOURLYWindDirectionBucketized")
    accuracy = mcEval.evaluate(prediction)
    print("Accuracy on test data = %g" % accuracy)

For baselining we use LogisticRegression.

In [17]:
#LogisticRegression

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="HOURLYWindDirectionBucketized", maxIter=10)
#,"ELEVATION","HOURLYStationPressure","HOURLYPressureTendency","HOURLYPrecip"

vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC"],
                                  outputCol="features")

pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Accuracy on test data = 0.589226


Let’s try some other Algorithms and see if model performance increases. It’s also important to tweak other parameters like parameters of individual algorithms (e.g. number of trees for RandomForest) or parameters in the feature engineering pipeline, e.g. train/test split ratio, normalization, bucketing, …

In [18]:
#RandomForestClassifier

from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="HOURLYWindDirectionBucketized", numTrees=30)

vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC","ELEVATION","HOURLYStationPressure","HOURLYPressureTendency","HOURLYPrecip"],
                                  outputCol="features")

pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,rf])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Accuracy on test data = 0.649832


In [19]:
#GBTClassifier

from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="HOURLYWindDirectionBucketized", maxIter=100)

vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC","ELEVATION","HOURLYStationPressure","HOURLYPressureTendency","HOURLYPrecip"],
                                  outputCol="features")

pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,gbt])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Accuracy on test data = 0.612795


In [20]:
#RandomForestClassifier (less trees)

from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="HOURLYWindDirectionBucketized", numTrees=10)

vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC","ELEVATION","HOURLYStationPressure","HOURLYPressureTendency","HOURLYPrecip"],
                                  outputCol="features")

pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,rf])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Accuracy on test data = 0.646465
