In [3]:
# Import Necessary Modules
sc.install_pypi_package("pandas==1.0.1") #Install pandas version 1.0.1 
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
import six
import pandas as pd
from pyspark.sql.types import DoubleType

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Read Data and Check Schema
df = spark.read.csv("s3://aws-emr-resources-809118582415-us-east-2/datasetProcessed/part-00000-ba44a9e5-7570-4521-b600-3ded87f71063-c000.csv", header=True)
df.printSchema()
df.take(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Date: string (nullable = true)
 |-- CO: string (nullable = true)
 |-- NO2: string (nullable = true)
 |-- SO2: string (nullable = true)
 |-- Ozone: string (nullable = true)
 |-- PM10: string (nullable = true)
 |-- PM2: string (nullable = true)

[Row(Date='01/01/2016', CO='0.2', NO2='0.016300000000000002', SO2='0.0021000000000000003', Ozone='0.027', PM10='0.012', PM2='0.0114'), Row(Date='01/02/2016', CO='0.2', NO2='0.015099999999999999', SO2='0.0028', Ozone='0.027', PM10='0.008', PM2='0.006')]

In [5]:
# Convert Strings to Type Double
converted_df = df.withColumn("CO", df["CO"].cast(DoubleType())).withColumn("NO2", df["NO2"].cast(DoubleType())).withColumn("SO2", df["SO2"].cast(DoubleType())).withColumn("Ozone", df["Ozone"].cast(DoubleType())).withColumn("PM10", df["PM10"].cast(DoubleType())).withColumn("PM2", df["PM2"].cast(DoubleType()))
converted_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Date: string (nullable = true)
 |-- CO: double (nullable = true)
 |-- NO2: double (nullable = true)
 |-- SO2: double (nullable = true)
 |-- Ozone: double (nullable = true)
 |-- PM10: double (nullable = true)
 |-- PM2: double (nullable = true)

In [6]:
# Describe Data
converted_df.describe().toPandas().transpose()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

             0                     1  ...           3                    4
summary  count                  mean  ...         min                  max
Date      1427                  None  ...  01/01/2016           12/31/2019
CO        1427    0.3908199018920808  ...         0.1                  1.9
NO2       1427   0.02103279607568324  ...     -0.0028                0.059
SO2       1427  0.001843658023826205  ...         0.0  0.07970000000000001
Ozone     1427   0.03740084092501743  ...       0.002                0.083
PM10      1427   0.01675543097407143  ...         0.0                0.072
PM2       1427  0.009270707778556427  ...     -0.0023               0.0626

[8 rows x 5 columns]

In [7]:
# Check for Correlation
for i in converted_df.columns:
    if not(isinstance(converted_df.select(i).take(1)[0][0], six.string_types)):
        print("Correlation to Ozone for ", i, converted_df.stat.corr("Ozone",i))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correlation to Ozone for  CO 0.10111684790089312
Correlation to Ozone for  NO2 0.17134671055782777
Correlation to Ozone for  SO2 0.08811528972667469
Correlation to Ozone for  Ozone 1.0
Correlation to Ozone for  PM10 0.38991830152065904
Correlation to Ozone for  PM2 0.07065515504245704

In [8]:
# Prepare Data For Linear Regression by defining features and labels
assembled_vector = VectorAssembler(inputCols = ["CO","NO2","SO2","PM10","PM2"], outputCol = "features")
vec_ozone_df = assembled_vector.transform(converted_df)
vec_ozone_df = vec_ozone_df.select(['features','Ozone'])
vec_ozone_df.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
|            features|Ozone|
+--------------------+-----+
|[0.2,0.0163000000...|0.027|
|[0.2,0.0150999999...|0.027|
|[0.2,0.0153000000...|0.027|
|[0.2,0.0167,3.0E-...|0.029|
|[0.4,0.0355,0.001...|0.023|
|[0.4,0.0344,0.001...|0.019|
|[0.4,0.0394,0.003...|0.025|
|[0.3,0.0212,0.001...|0.016|
|[0.4,0.0238,6.0E-...|0.017|
|[0.2,0.0168000000...|0.027|
+--------------------+-----+
only showing top 10 rows

In [9]:
# Randomly Split Data for Training and Testing
split_df = vec_ozone_df.randomSplit([0.7, 0.3])
train_df = split_df[0]
test_df = split_df[1]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# Create Linear Regression Model with Training Data
linear_reg = LinearRegression(featuresCol = 'features', labelCol = 'Ozone', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = linear_reg.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Coefficients: [0.0,0.0,0.0,0.0,0.0]
Intercept: 0.037045778229908406

In [11]:
# Check RMSE and R-Squared Values
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RMSE: 0.012734
r2: 0.000000

In [12]:
# Describe Training Set
train_df.describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+
|summary|               Ozone|
+-------+--------------------+
|  count|                 983|
|   mean|0.037045778229908406|
| stddev|0.012740594549318196|
|    min|               0.005|
|    max|               0.074|
+-------+--------------------+