In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import findspark

In [2]:
findspark.find()

'/usr/local/opt/spark'

In [3]:
findspark.init()

In [4]:
spark = SparkSession.builder \
      .master("local[*]") \
      .appName("CaseStudySpark.com") \
      .getOrCreate()

Let’s start by downloading the dataset and creating a dataframe. This dataset can be found on DAX, the IBM Data Asset Exchange and can be downloaded for free.

https://developer.ibm.com/exchanges/data/all/jfk-weather-data/

In [5]:
# delete files from previous runs
!rm -f jfk_weather*

# download the file containing the data in CSV format
!wget http://max-training-data.s3-api.us-geo.objectstorage.softlayer.net/noaa-weather/jfk_weather.tar.gz

# extract the data
!tar xvfz jfk_weather.tar.gz
    
# create a dataframe out of it by using the first row as field names and trying to infer a schema based on contents
df = spark.read.option("header", "true").option("inferSchema","true").csv('jfk_weather.csv')

# register a corresponding query table
df.createOrReplaceTempView('df')

--2020-08-16 10:32:20--  http://max-training-data.s3-api.us-geo.objectstorage.softlayer.net/noaa-weather/jfk_weather.tar.gz
Resolving max-training-data.s3-api.us-geo.objectstorage.softlayer.net (max-training-data.s3-api.us-geo.objectstorage.softlayer.net)... 67.228.254.196
Connecting to max-training-data.s3-api.us-geo.objectstorage.softlayer.net (max-training-data.s3-api.us-geo.objectstorage.softlayer.net)|67.228.254.196|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2575759 (2.5M) [application/x-tar]
Saving to: ‘jfk_weather.tar.gz’


2020-08-16 10:32:26 (717 KB/s) - ‘jfk_weather.tar.gz’ saved [2575759/2575759]

x ./._jfk_weather.csv
x jfk_weather.csv


The dataset contains some null values, therefore schema inference didn’t work properly for all columns, in addition, a column contained trailing characters, so we need to clean up the data set first.

In [14]:
df.dtypes

[('STATION', 'string'),
 ('STATION_NAME', 'string'),
 ('ELEVATION', 'double'),
 ('LATITUDE', 'double'),
 ('LONGITUDE', 'double'),
 ('DATE', 'string'),
 ('REPORTTPYE', 'string'),
 ('HOURLYSKYCONDITIONS', 'string'),
 ('HOURLYVISIBILITY', 'string'),
 ('HOURLYPRSENTWEATHERTYPE', 'string'),
 ('HOURLYDRYBULBTEMPF', 'string'),
 ('HOURLYDRYBULBTEMPC', 'string'),
 ('HOURLYWETBULBTEMPF', 'string'),
 ('HOURLYWETBULBTEMPC', 'string'),
 ('HOURLYDewPointTempF', 'string'),
 ('HOURLYDewPointTempC', 'string'),
 ('HOURLYRelativeHumidity', 'string'),
 ('HOURLYWindSpeed', 'string'),
 ('HOURLYWindDirection', 'string'),
 ('HOURLYWindGustSpeed', 'int'),
 ('HOURLYStationPressure', 'string'),
 ('HOURLYPressureTendency', 'int'),
 ('HOURLYPressureChange', 'string'),
 ('HOURLYSeaLevelPressure', 'string'),
 ('HOURLYPrecip', 'string'),
 ('HOURLYAltimeterSetting', 'string'),
 ('DAILYMaximumDryBulbTemp', 'int'),
 ('DAILYMinimumDryBulbTemp', 'int'),
 ('DAILYAverageDryBulbTemp', 'int'),
 ('DAILYDeptFromNormalAverageTem

In [34]:
from pyspark.sql import functions as F

missing = df.select([F.count(F.when(F.isnan(i) | \
                          F.col(i).contains('NA') | \
                          F.col(i).contains('NULL') | \
                          F.col(i).isNull(), i)).alias(i) \
                    for i in df.columns]) 

In [37]:
import pixiedust
display(missing)

STATION,STATION_NAME,ELEVATION,LATITUDE,LONGITUDE,DATE,REPORTTPYE,HOURLYSKYCONDITIONS,HOURLYVISIBILITY,HOURLYPRSENTWEATHERTYPE,HOURLYDRYBULBTEMPF,HOURLYDRYBULBTEMPC,HOURLYWETBULBTEMPF,HOURLYWETBULBTEMPC,HOURLYDewPointTempF,HOURLYDewPointTempC,HOURLYRelativeHumidity,HOURLYWindSpeed,HOURLYWindDirection,HOURLYWindGustSpeed,HOURLYStationPressure,HOURLYPressureTendency,HOURLYPressureChange,HOURLYSeaLevelPressure,HOURLYPrecip,HOURLYAltimeterSetting,DAILYMaximumDryBulbTemp,DAILYMinimumDryBulbTemp,DAILYAverageDryBulbTemp,DAILYDeptFromNormalAverageTemp,DAILYAverageRelativeHumidity,DAILYAverageDewPointTemp,DAILYAverageWetBulbTemp,DAILYHeatingDegreeDays,DAILYCoolingDegreeDays,DAILYSunrise,DAILYSunset,DAILYWeather,DAILYPrecip,DAILYSnowfall,DAILYSnowDepth,DAILYAverageStationPressure,DAILYAverageSeaLevelPressure,DAILYAverageWindSpeed,DAILYPeakWindSpeed,PeakWindDirection,DAILYSustainedWindSpeed,DAILYSustainedWindDirection,MonthlyMaximumTemp,MonthlyMinimumTemp,MonthlyMeanTemp,MonthlyAverageRH,MonthlyDewpointTemp,MonthlyWetBulbTemp,MonthlyAvgHeatingDegreeDays,MonthlyAvgCoolingDegreeDays,MonthlyStationPressure,MonthlySeaLevelPressure,MonthlyAverageWindSpeed,MonthlyTotalSnowfall,MonthlyDeptFromNormalMaximumTemp,MonthlyDeptFromNormalMinimumTemp,MonthlyDeptFromNormalAverageTemp,MonthlyDeptFromNormalPrecip,MonthlyTotalLiquidPrecip,MonthlyGreatestPrecip,MonthlyGreatestPrecipDate,MonthlyGreatestSnowfall,MonthlyGreatestSnowfallDate,MonthlyGreatestSnowDepth,MonthlyGreatestSnowDepthDate,MonthlyDaysWithGT90Temp,MonthlyDaysWithLT32Temp,MonthlyDaysWithGT32Temp,MonthlyDaysWithLT0Temp,MonthlyDaysWithGT001Precip,MonthlyDaysWithGT010Precip,MonthlyDaysWithGT1Snow,MonthlyMaxSeaLevelPressureValue,MonthlyMaxSeaLevelPressureDate,MonthlyMaxSeaLevelPressureTime,MonthlyMinSeaLevelPressureValue,MonthlyMinSeaLevelPressureDate,MonthlyMinSeaLevelPressureTime,MonthlyTotalHeatingDegreeDays,MonthlyTotalCoolingDegreeDays,MonthlyDeptFromNormalHeatingDD,MonthlyDeptFromNormalCoolingDD,MonthlyTotalSeasonToDateHeatingDD,MonthlyTotalSeasonToDateCoolingDD
0,114545,0,0,0,0,0,21210,18081,94571,3140,3140,3298,3298,3148,3148,3148,3187,3308,99069,3289,66507,85375,15023,34782,27667,103673,103611,104852,104858,112013,111905,111905,104852,104852,0,0,113066,109411,111415,109752,111418,111908,111433,111488,111488,111434,111434,114445,114446,114446,114545,114545,114545,114544,114544,114455,114455,114544,114255,114445,114446,114446,114452,114452,114545,114545,114456,114507,114456,114516,114448,114448,114448,114448,114545,114545,114544,114545,0,0,114545,0,0,114444,114444,114444,114444,114544,114544


In [38]:
import random
random.seed(42)

from pyspark.sql.functions import translate, col

df_cleaned = df \
    .withColumn("HOURLYWindSpeed", df.HOURLYWindSpeed.cast('double')) \
    .withColumn("HOURLYWindDirection", df.HOURLYWindDirection.cast('double')) \
    .withColumn("HOURLYStationPressure", translate(col("HOURLYStationPressure"), "s,", "")) \
    .withColumn("HOURLYPrecip", translate(col("HOURLYPrecip"), "s,", "")) \
    .withColumn("HOURLYRelativeHumidity", translate(col("HOURLYRelativeHumidity"), "*", "")) \
    .withColumn("HOURLYDRYBULBTEMPC", translate(col("HOURLYDRYBULBTEMPC"), "*", "")) \

df_cleaned =   df_cleaned \
                    .withColumn("HOURLYStationPressure", df_cleaned.HOURLYStationPressure.cast('double')) \
                    .withColumn("HOURLYPrecip", df_cleaned.HOURLYPrecip.cast('double')) \
                    .withColumn("HOURLYRelativeHumidity", df_cleaned.HOURLYRelativeHumidity.cast('double')) \
                    .withColumn("HOURLYDRYBULBTEMPC", df_cleaned.HOURLYDRYBULBTEMPC.cast('double')) \

df_filtered = df_cleaned.filter("""
    HOURLYWindSpeed <> 0
    and HOURLYWindSpeed IS NOT NULL
    and HOURLYWindDirection IS NOT NULL
    and HOURLYStationPressure IS NOT NULL
    and HOURLYPressureTendency IS NOT NULL
    and HOURLYPrecip IS NOT NULL
    and HOURLYRelativeHumidity IS NOT NULL
    and HOURLYDRYBULBTEMPC IS NOT NULL
""")

We want to predict the value of one column based of some others. It is sometimes helpful to print a correlation matrix. 

In [39]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYWindDirection","HOURLYStationPressure"],
                                  outputCol="features")
df_pipeline = vectorAssembler.transform(df_filtered)

from pyspark.ml.stat import Correlation
Correlation.corr(df_pipeline,"features").head()[0].toArray()

array([[ 1.        ,  0.25478863, -0.26171147],
       [ 0.25478863,  1.        , -0.13377466],
       [-0.26171147, -0.13377466,  1.        ]])

As we can see, HOURLYWindSpeed and HOURLYWindDirection correlate with 0.25478863 whereas HOURLYWindSpeed  and HOURLYStationPressure correlate with -0.26171147, this is a good sign if we want to predict HOURLYWindSpeed from HOURLYWindDirection and HOURLYStationPressure. Note that the numbers can change over time as we are always working with the latest data.
Since this is supervised learning, let’s split our data into train (80%) and test (20%) set.

In [51]:
vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYPressureTendency"],
                                  outputCol="features")
df_pipeline = vectorAssembler.transform(df_filtered)
Correlation.corr(df_pipeline,"features").head()[0].toArray()

array([[ 1.        , -0.01324305],
       [-0.01324305,  1.        ]])

In [40]:
splits = df_filtered.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

Again, we can re-use our feature engineering pipeline

In [41]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer
from pyspark.ml import Pipeline

vectorAssembler = VectorAssembler(inputCols=[
                                    "HOURLYWindDirection",
                                    "ELEVATION",
                                    "HOURLYStationPressure"],
                                  outputCol="features")

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

Now we define a function for evaluating our regression prediction performance. We’re using RMSE (Root Mean Squared Error) here, the smaller the better…



In [43]:
def regression_metrics(prediction):
    from pyspark.ml.evaluation import RegressionEvaluator
    evaluator = RegressionEvaluator(
    labelCol="HOURLYWindSpeed", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(prediction)
    print("RMSE on test data = %g" % rmse)

Let’s run a linear regression model first for building a baseline.



In [44]:
#LR1
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol="HOURLYWindSpeed", featuresCol='features', maxIter=100, regParam=0.0, elasticNetParam=0.0)
pipeline = Pipeline(stages=[vectorAssembler, normalizer, lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
regression_metrics(prediction)

RMSE on test data = 5.39056


Let us use features_norm over features and check out the RMSE

In [52]:
lr2 = LinearRegression(labelCol="HOURLYWindSpeed", featuresCol='features_norm', maxIter=100, regParam=0.0, elasticNetParam=0.0)
pipeline = Pipeline(stages=[vectorAssembler, normalizer, lr2])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
regression_metrics(prediction)

RMSE on test data = 5.10297


Now we’ll try a Gradient Boosted Tree Regressor

In [45]:
#GBT1
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(labelCol="HOURLYWindSpeed", maxIter=100)
pipeline = Pipeline(stages=[vectorAssembler, normalizer, gbt])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
regression_metrics(prediction)

RMSE on test data = 5.18352


Now let’s switch gears. Previously, we tried to predict HOURLYWindSpeed, but now we predict HOURLYWindDirection. In order to turn this into a classification problem we discretize the value using the Bucketizer. The new feature is called HOURLYWindDirectionBucketized.

In [46]:
from pyspark.ml.feature import Bucketizer, OneHotEncoder
bucketizer = Bucketizer(splits=[0, 180, float('Inf') ], inputCol="HOURLYWindDirection", outputCol="HOURLYWindDirectionBucketized")
encoder = OneHotEncoder(inputCol="HOURLYWindDirectionBucketized", outputCol="HOURLYWindDirectionOHE")


Again, we define a function in order to assess how we perform. Here we just use the accuracy measure which gives us the fraction of correctly classified examples. Again, 0 is bad, 1 is good.

In [47]:
def classification_metrics(prediction):
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    mcEval = MulticlassClassificationEvaluator().setMetricName("accuracy").setPredictionCol("prediction").setLabelCol("HOURLYWindDirectionBucketized")
    accuracy = mcEval.evaluate(prediction)
    print("Accuracy on test data = %g" % accuracy)

Again, for baselining we use LogisticRegression.

In [48]:
#LGReg1

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="HOURLYWindDirectionBucketized", maxIter=10)
#,"ELEVATION","HOURLYStationPressure","HOURLYPressureTendency","HOURLYPrecip"

vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC"],
                                  outputCol="features")

pipeline = Pipeline(stages=[bucketizer, vectorAssembler, normalizer, lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Accuracy on test data = 0.685767


Let’s try some other Algorithms and see if model performance increases. It’s also important to tweak other parameters like parameters of individual algorithms (e.g. number of trees for RandomForest) or parameters in the feature engineering pipeline, e.g. train/test split ratio, normalization, bucketing, …

In [49]:
#RF1

from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="HOURLYWindDirectionBucketized", numTrees=30)

vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC","ELEVATION","HOURLYStationPressure","HOURLYPressureTendency","HOURLYPrecip"],
                                  outputCol="features")

pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,rf])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Accuracy on test data = 0.726793


In [53]:
# try with 10 trees
rf2 = RandomForestClassifier(labelCol="HOURLYWindDirectionBucketized", numTrees=10)

vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC","ELEVATION","HOURLYStationPressure","HOURLYPressureTendency","HOURLYPrecip"],
                                  outputCol="features")

pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,rf2])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Accuracy on test data = 0.727023


In [50]:
#GBT2

from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="HOURLYWindDirectionBucketized", maxIter=100)

vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC","ELEVATION","HOURLYStationPressure","HOURLYPressureTendency","HOURLYPrecip"],
                                  outputCol="features")

pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,gbt])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Accuracy on test data = 0.734815
