In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
     .appName("Test SparkSession") \
     .getOrCreate()

In [3]:
spark

## Data Preprocessing

In [4]:
%%time
df = spark.read.option("header", "true").csv("s3://aws-gsod/*/*.csv")

CPU times: user 105 ms, sys: 16.1 ms, total: 121 ms
Wall time: 17min 54s


In [5]:
df.cache()

DataFrame[ID: string, USAF: string, WBAN: string, Elevation: string, Country_Code: string, Latitude: string, Longitude: string, Date: string, Year: string, Month: string, Day: string, Mean_Temp: string, Mean_Temp_Count: string, Mean_Dewpoint: string, Mean_Dewpoint_Count: string, Mean_Sea_Level_Pressure: string, Mean_Sea_Level_Pressure_Count: string, Mean_Station_Pressure: string, Mean_Station_Pressure_Count: string, Mean_Visibility: string, Mean_Visibility_Count: string, Mean_Windspeed: string, Mean_Windspeed_Count: string, Max_Windspeed: string, Max_Gust: string, Max_Temp: string, Max_Temp_Quality_Flag: string, Min_Temp: string, Min_Temp_Quality_Flag: string, Precipitation: string, Precip_Flag: string, Snow_Depth: string, Fog: string, Rain_or_Drizzle: string, Snow_or_Ice: string, Hail: string, Thunder: string, Tornado: string]

In [6]:
df.show(5)

+------------+------+-----+---------+------------+--------+---------+----------+----+-----+---+---------+---------------+-------------+-------------------+-----------------------+-----------------------------+---------------------+---------------------------+---------------+---------------------+--------------+--------------------+-------------+--------+--------+---------------------+--------+---------------------+-------------+-----------+----------+---+---------------+-----------+----+-------+-------+
|          ID|  USAF| WBAN|Elevation|Country_Code|Latitude|Longitude|      Date|Year|Month|Day|Mean_Temp|Mean_Temp_Count|Mean_Dewpoint|Mean_Dewpoint_Count|Mean_Sea_Level_Pressure|Mean_Sea_Level_Pressure_Count|Mean_Station_Pressure|Mean_Station_Pressure_Count|Mean_Visibility|Mean_Visibility_Count|Mean_Windspeed|Mean_Windspeed_Count|Max_Windspeed|Max_Gust|Max_Temp|Max_Temp_Quality_Flag|Min_Temp|Min_Temp_Quality_Flag|Precipitation|Precip_Flag|Snow_Depth|Fog|Rain_or_Drizzle|Snow_or_Ice|Hail

In [7]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- USAF: string (nullable = true)
 |-- WBAN: string (nullable = true)
 |-- Elevation: string (nullable = true)
 |-- Country_Code: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Mean_Temp: string (nullable = true)
 |-- Mean_Temp_Count: string (nullable = true)
 |-- Mean_Dewpoint: string (nullable = true)
 |-- Mean_Dewpoint_Count: string (nullable = true)
 |-- Mean_Sea_Level_Pressure: string (nullable = true)
 |-- Mean_Sea_Level_Pressure_Count: string (nullable = true)
 |-- Mean_Station_Pressure: string (nullable = true)
 |-- Mean_Station_Pressure_Count: string (nullable = true)
 |-- Mean_Visibility: string (nullable = true)
 |-- Mean_Visibility_Count: string (nullable = true)
 |-- Mean_Windspeed: string (nullable = true)
 |-- Mean_Windspeed_C

In [8]:
from pyspark.sql import functions
df = df.select(*(functions.col(col).cast("float").alias(col) for col in df.columns))

In [9]:
cols = ['Elevation','Latitude', 'Longitude', 'Month', 'Mean_Temp', 'Mean_Dewpoint', 'Mean_Sea_Level_Pressure', 
        'Mean_Station_Pressure', 'Mean_Visibility', 'Mean_Windspeed', 'Precipitation']
df2 = df.select(*cols)

In [10]:
df2.printSchema()

root
 |-- Elevation: float (nullable = true)
 |-- Latitude: float (nullable = true)
 |-- Longitude: float (nullable = true)
 |-- Month: float (nullable = true)
 |-- Mean_Temp: float (nullable = true)
 |-- Mean_Dewpoint: float (nullable = true)
 |-- Mean_Sea_Level_Pressure: float (nullable = true)
 |-- Mean_Station_Pressure: float (nullable = true)
 |-- Mean_Visibility: float (nullable = true)
 |-- Mean_Windspeed: float (nullable = true)
 |-- Precipitation: float (nullable = true)



In [11]:
df2 = df2.na.drop().cache()

## LASSO Regression

In [12]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline, Model

In [13]:
features = ['Elevation', 'Latitude', 'Longitude', 'Month', 'Mean_Dewpoint', 'Mean_Sea_Level_Pressure', 
            'Mean_Station_Pressure', 'Mean_Visibility', 'Mean_Windspeed', 'Precipitation']
assembler = VectorAssembler(inputCols=features, outputCol='features')

In [20]:
standardizer = StandardScaler(inputCol="features", outputCol="scaledFeatures")

In [26]:
pipeline = Pipeline(stages = [assembler, standardizer])
df3 = pipeline.fit(df2).transform(df2)

In [28]:
train, test = df3.randomSplit([0.8, 0.2])

In [29]:
LR = LinearRegression(featuresCol='features', labelCol='Mean_Temp', regParam=0.5, elasticNetParam=1)
LR_model = LR.fit(train)

In [30]:
LR_model.coefficients

DenseVector([0.0051, -0.0591, 0.0, 0.0, 0.9464, -0.066, 0.0, 0.1136, 0.0, -2.3775])

In [31]:
import numpy as np
[features[i] for i in np.nonzero(LR_model.coefficients)[0]]

['Elevation',
 'Latitude',
 'Mean_Dewpoint',
 'Mean_Sea_Level_Pressure',
 'Mean_Visibility',
 'Precipitation']

In [32]:
predictions = LR_model.transform(test)

In [33]:
evaluator_LR = RegressionEvaluator(labelCol='Mean_Temp')

In [34]:
evaluator_LR.evaluate(predictions, {evaluator_LR.metricName: "r2"})

0.8739024059654438

In [35]:
spark.stop()