# Final for IBM's Scalable Machine Learning Using Spark

<a id = 'toc'></a>

## Table of Contents:

[1. Loading Libraries and Downloading Spark 2.4.5](#s1)

[2. Loading Data](#s2)

[3. Cleaning Data](#s3)

[4. Correlations](#s4)

[5. Creating Train/Test Split](#s5)

[6. Creating Feature Engineering Pipeline Objects and Convenience Functions](#s6)

[7. Linear Regression](#s7)

[8. Gradient Boosted Tree Regression](#s8)

[9. Logistic Regression Classifier](#s9)

[10. Random Forest Classifier](#s10)

[11. Gradient Boosted Tree Classifier](#s11)

<a id = s1></a>
### Loading Libraries and Downloading Spark

[Back to Table of Contents](#toc)

In [1]:
from IPython.display import Markdown, display
def printmd(string):
    display(Markdown('# <span style="color:red">'+string+'</span>'))


if ('sc' in locals() or 'sc' in globals()):
    printmd('Do not run this in a Spark Notebook')


In [2]:
# Downloading pyspark
!pip install pyspark==2.4.5

Collecting pyspark==2.4.5
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 142kB/s  eta 0:00:01    |██████████████                  | 95.4MB 9.5MB/s eta 0:00:13
[?25hCollecting py4j==0.10.7 (from pyspark==2.4.5)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 37.2MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Stored in directory: /home/dsxuser/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5


In [6]:
#Loading Spark Libraries

try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    printmd('If you are getting this error, restart your Kernel')

In [9]:
# Setting SparkConf to Local Machine
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

#Creating Sparksql object
spark = SparkSession.builder.getOrCreate()

<a id = 's2'></a>
### Loading Data

[Back to Table of Contents](#toc)

In [10]:
## Getting Data
# delete files from previous runs
!rm -f jfk_weather*

# download the file containing the data in CSV format
!wget http://max-training-data.s3-api.us-geo.objectstorage.softlayer.net/noaa-weather/jfk_weather.tar.gz

# extract the data
!tar xvfz jfk_weather.tar.gz
    
# create a dataframe out of it by using the first row as field names and trying to infer a schema based on contents
df = spark.read.option("header", "true").option("inferSchema","true").csv('jfk_weather.csv')

# register a corresponding query table
df.createOrReplaceTempView('df')

--2020-05-05 20:27:48--  http://max-training-data.s3-api.us-geo.objectstorage.softlayer.net/noaa-weather/jfk_weather.tar.gz
Resolving max-training-data.s3-api.us-geo.objectstorage.softlayer.net (max-training-data.s3-api.us-geo.objectstorage.softlayer.net)... 67.228.254.196
Connecting to max-training-data.s3-api.us-geo.objectstorage.softlayer.net (max-training-data.s3-api.us-geo.objectstorage.softlayer.net)|67.228.254.196|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2575759 (2.5M) [application/x-tar]
Saving to: ‘jfk_weather.tar.gz’


2020-05-05 20:27:49 (68.3 MB/s) - ‘jfk_weather.tar.gz’ saved [2575759/2575759]

./._jfk_weather.csv
jfk_weather.csv


<a id= 's3'></a>
## Data Cleaning
[Back to Table of Contents](#toc)

In [11]:
import random
random.seed(42)

from pyspark.sql.functions import translate, col

# Cleaning trailing special characters
df_cleaned = df \
    .withColumn("HOURLYWindSpeed", df.HOURLYWindSpeed.cast('double')) \
    .withColumn("HOURLYWindDirection", df.HOURLYWindDirection.cast('double')) \
    .withColumn("HOURLYStationPressure", translate(col("HOURLYStationPressure"), "s,", "")) \
    .withColumn("HOURLYPrecip", translate(col("HOURLYPrecip"), "s,", "")) \
    .withColumn("HOURLYRelativeHumidity", translate(col("HOURLYRelativeHumidity"), "*", "")) \
    .withColumn("HOURLYDRYBULBTEMPC", translate(col("HOURLYDRYBULBTEMPC"), "*", "")) \

# Casting as Doubles
df_cleaned =   df_cleaned \
                    .withColumn("HOURLYStationPressure", df_cleaned.HOURLYStationPressure.cast('double')) \
                    .withColumn("HOURLYPrecip", df_cleaned.HOURLYPrecip.cast('double')) \
                    .withColumn("HOURLYRelativeHumidity", df_cleaned.HOURLYRelativeHumidity.cast('double')) \
                    .withColumn("HOURLYDRYBULBTEMPC", df_cleaned.HOURLYDRYBULBTEMPC.cast('double')) \

df_filtered = df_cleaned.filter("""
    HOURLYWindSpeed <> 0
    and HOURLYWindSpeed IS NOT NULL
    and HOURLYWindDirection IS NOT NULL
    and HOURLYStationPressure IS NOT NULL
    and HOURLYPressureTendency IS NOT NULL
    and HOURLYPrecip IS NOT NULL
    and HOURLYRelativeHumidity IS NOT NULL
    and HOURLYDRYBULBTEMPC IS NOT NULL
""")

<a id = 's4'></a>
## Correlations

[Back to Table of Contents](#toc)

In [12]:
from pyspark.ml.feature import VectorAssembler
import seaborn as sns
vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYPressureTendency"],
                                  outputCol="features")

df_pipeline = vectorAssembler.transform(df_filtered)

from pyspark.ml.stat import Correlation

corr = Correlation.corr(df_pipeline,"features").head()[0].toArray()
corr

array([[ 1.        , -0.01324305],
       [-0.01324305,  1.        ]])

<a id = 's5'></a>
## Creating Train/Test Split

[Back to Table of Contents](#toc)

In [17]:
splits = df_filtered.randomSplit([.8,.2])

df_train = splits[0]
df_test = splits[1]

<a id ='s6'></a>
## Creating Feature Engineering Pipeline Objects / Convenience Functions

[Back to Table of Contents](#toc)

In [29]:
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer
from pyspark.ml import Pipeline

def create_ml_pipeline(vector_assembler = '' ,normalizer_ = False, onehotencoder = '', bucketizer = '', model = ''):

    if vector_assembler:
        vectorassembler = VectorAssembler(inputCols = vector_assembler ,outputCol="features")  
        print('Vectors Assembled\n')
        if normalizer_ == True:
            normalizer_ = Normalizer(inputCol = "features", outputCol="features_norm", p=1.0)
            print('Normalized\n')
           # if model:
            #    print('Changed the features column name to "features_norm"\n')
             #   model.featuresCol = 'features_norm'
    if bucketizer:
        from pyspark.ml.feature import Bucketizer, OneHotEncoderEstimator
        bucketizer = Bucketizer(splits=[ 0, 180, float('Inf') ],inputCol=bucketizer, outputCol=bucketizer+"Bucketized")
        print('Bucketized!')
        if onehotencoder:
            onehotencoder = OneHotEncoderEstimator(inputCol = bucketizer+"Bucketized" , outputCol = onehotencoder+'OHE')
            print('...And Hot Encoded\n')
    if onehotencoder:
            onehotencoder = OneHotEncoderEstimator(inputCol = onehotencoder , outputCol = onehotencoder+'OHE')
            print('One Hot Encoded\n')
    
    if model:
        print('Your model is ready to be put in the pipe!\n')
    
    pipe = [x for x in [vectorassembler,normalizer_,bucketizer,onehotencoder,model] if x]
    
    print('Here is the Pipeline:')
    [print(p) for p in pipe]
    
    pipeline = Pipeline(stages = pipe)
    
    return pipeline




In [14]:
def regression_metrics(prediction):
    from pyspark.ml.evaluation import RegressionEvaluator
    evaluator = RegressionEvaluator(
    labelCol="HOURLYWindSpeed", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(prediction)
    print("\nRMSE on test data = %g" % rmse)

In [15]:
def classification_metrics(prediction):
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    mcEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("HOURLYWindDirectionBucketized")
    accuracy = mcEval.evaluate(prediction)
    print("Accuracy on test data = %g" % accuracy)

<a id = 's7'></a>
## Linear Regression
[Back to Table of Contents](#toc)

In [19]:
from pyspark.ml.regression import LinearRegression

vec = ["HOURLYWindDirection","ELEVATION","HOURLYStationPressure"]
lr = LinearRegression(labelCol="HOURLYWindSpeed", featuresCol='features', maxIter=100, regParam=0.0, elasticNetParam=0.0) 

#LR1
pipeline = create_ml_pipeline(vector_assembler= vec , normalizer_=True, model = lr)

model = pipeline.fit(df_train)
prediction = model.transform(df_test)
regression_metrics(prediction)

Vectors Assembled

Normalized

Your model is ready to be put in the pipe!

Here is the Pipeline:
VectorAssembler_488261704753
Normalizer_2d354caf5ecb
LinearRegression_45c7bb426ac9

RMSE on test data = 5.35498


In [18]:
#LR2 - with normalized data

vec = ["HOURLYWindDirection","ELEVATION","HOURLYStationPressure"]
lr = LinearRegression(labelCol="HOURLYWindSpeed", featuresCol='features_norm', maxIter=100, regParam=0.0, elasticNetParam=0.0) 

pipeline = create_ml_pipeline(vector_assembler= vec , normalizer_=True, model = lr)

model = pipeline.fit(df_train)
prediction = model.transform(df_test)
regression_metrics(prediction)

Vectors Assembled

Normalized

Your model is ready to be put in the pipe!

Here is the Pipeline:
VectorAssembler_48d4a38c95db
Normalizer_b243060d6bf6
LinearRegression_0efc1e827425

RMSE on test data = 5.58542


<a id = 's8'></a>
## Gradient Boosted Tree
[Back to Table of Contents](#toc)

In [20]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(labelCol="HOURLYWindSpeed", maxIter=100)
vec = ["HOURLYWindDirection","ELEVATION","HOURLYStationPressure"]

pipeline = create_ml_pipeline(vector_assembler= vec , normalizer_=True, model = gbt)

model = pipeline.fit(df_train)
prediction = model.transform(df_test)
regression_metrics(prediction)

Vectors Assembled

Normalized

Your model is ready to be put in the pipe!

Here is the Pipeline:
VectorAssembler_e36364e35ce7
Normalizer_a107557c3595
GBTRegressor_98b5c0d4fa7a

RMSE on test data = 5.1381


<a id = 's9'></a>
## Logistic Regression
[Back to Table of Contents](#toc)

In [31]:
#LGReg1

from pyspark.ml.classification import LogisticRegression

vec = ["HOURLYWindSpeed","HOURLYDRYBULBTEMPC"]
lr =  LogisticRegression(labelCol="HOURLYWindDirectionBucketized", maxIter=10)
pipeline = create_ml_pipeline(vector_assembler= vec , bucketizer='HOURLYWindDirection', normalizer_=True, model = lr)

model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Vectors Assembled

Normalized

Bucketized!
Your model is ready to be put in the pipe!

Here is the Pipeline:
VectorAssembler_f638c2f61704
Normalizer_b711cb005012
Bucketizer_f01e531de411
LogisticRegression_618b39389237
Accuracy on test data = 0.688165


<a id = 's10'></a>
## Random Forest
[Back to Table of Contents](#toc)

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="HOURLYWindDirectionBucketized", numTrees=10)
pipeline = create_ml_pipeline(vector_assembler= vec , bucketizer='HOURLYWindDirection', normalizer_=True, model = rf)

model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)


<a id = 's11'></a>
## Gradient Boosted Tree Classifier
[Back to Table of Contents](#toc)

In [None]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="HOURLYWindDirectionBucketized", maxIter=100)

pipeline = create_ml_pipeline(vector_assembler= vec , bucketizer='HOURLYWindDirection', normalizer_=True, model = gbt)

model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)


Vectors Assembled

Normalized

Bucketized!
Your model is ready to be put in the pipe!

Here is the Pipeline:
VectorAssembler_25b92d73de42
Normalizer_fefa51e8a517
Bucketizer_fc8aeae1efb5
GBTClassifier_79b34ff73182
