# Import Library and Load Data From Mongodb

In [2]:
from pyspark.sql import Row
from datetime import datetime
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.sql.window import Window
from pyspark.storagelevel import StorageLevel
import pyspark.sql.functions as f

In [3]:
import os
pyspark_submit_args = '--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0 pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

In [4]:
spark = SparkSession \
    .builder \
    .appName("aqi") \
    .config("spark.mongodb.input.uri", "mongodb://54.186.55.238/mydb4.air")\
    .getOrCreate()

## Default type is wrong, cast to right type

In [5]:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df2 = df.withColumn("aqi_co", df["aqi_co"].cast(DoubleType()))\
        .withColumn("aqi_no2", df["aqi_no2"].cast(DoubleType()))\
        .withColumn("aqi_o3", df["aqi_o3"].cast(DoubleType()))\
        .withColumn("aqi_so2", df["aqi_so2"].cast(DoubleType()))\
        .withColumn("aqi_pm10", df["aqi_pm10"].cast(DoubleType()))\
        .withColumn("arithmetic_mean_co", df["arithmetic_mean_co"].cast(DoubleType()))\
        .withColumn("arithmetic_mean_no2", df["arithmetic_mean_no2"].cast(DoubleType()))\
        .withColumn("arithmetic_mean_o3", df["arithmetic_mean_o3"].cast(DoubleType()))\
        .withColumn("max_aqi", df["max_aqi"].cast(DoubleType()))\
        .withColumn("state_code", df["state_code"].cast(StringType()))\
        .withColumn("aqi_pm25_frm", df["aqi_pm25_frm"].cast(DoubleType()))\
        .withColumn("aqi_pm25_nonfrm", df["aqi_pm25_nonfrm"].cast(DoubleType()))\
        .withColumn("aqi_pm25_speciation", df["aqi_pm25_speciation"].cast(DoubleType()))\
        .withColumn("arithmetic_mean_pressure", df["arithmetic_mean_pressure"].cast(DoubleType()))\
        .withColumn("arithmetic_mean_wind", df["arithmetic_mean_wind"].cast(DoubleType()))\
        .withColumn("arithmetic_mean_so2", df["arithmetic_mean_so2"].cast(DoubleType()))\
        .withColumn("arithmetic_mean_pm10", df["arithmetic_mean_pm10"].cast(DoubleType()))\
        .withColumn("arithmetic_mean_pm25_frm", df["arithmetic_mean_pm25_frm"].cast(DoubleType()))\
        .withColumn("arithmetic_mean_pm25_nonfrm", df["arithmetic_mean_pm25_nonfrm"].cast(DoubleType()))\
        .withColumn("arithmetic_mean_pm25_speciation", df["arithmetic_mean_pm25_speciation"].cast(DoubleType()))\
        .withColumn("arithmetic_mean_temperature", df["arithmetic_mean_temperature"].cast(DoubleType()))\
        .withColumn("date_local", df["date_local"].cast(DateType()))

In [7]:
df2.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- aqi_co: double (nullable = true)
 |-- aqi_no2: double (nullable = true)
 |-- aqi_o3: double (nullable = true)
 |-- aqi_pm10: double (nullable = true)
 |-- aqi_pm25_frm: double (nullable = true)
 |-- aqi_pm25_nonfrm: double (nullable = true)
 |-- aqi_pm25_speciation: double (nullable = true)
 |-- aqi_so2: double (nullable = true)
 |-- arithmetic_mean_co: double (nullable = true)
 |-- arithmetic_mean_no2: double (nullable = true)
 |-- arithmetic_mean_o3: double (nullable = true)
 |-- arithmetic_mean_pm10: double (nullable = true)
 |-- arithmetic_mean_pm25_frm: double (nullable = true)
 |-- arithmetic_mean_pm25_nonfrm: double (nullable = true)
 |-- arithmetic_mean_pm25_speciation: double (nullable = true)
 |-- arithmetic_mean_pressure: double (nullable = true)
 |-- arithmetic_mean_so2: double (nullable = true)
 |-- arithmetic_mean_temperature: double (nullable = true)
 |-- arithmetic_mean_wind: double (nu

# Site-Level Prediction : Random Forest Model (timing with 10,50,100 tree)

In [8]:
common_key = ['county_name','county_code','site_num']

In [9]:
df4 = df2.withColumn('lag_1_aqi',lag('max_aqi',1).over(Window.partitionBy(common_key).orderBy('date_local'))).persist(StorageLevel.DISK_ONLY)

In [24]:
df5 = df4.select(lead('max_aqi',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('label'),
                'county_name','county_code','site_num','date_local','max_aqi','latitude','longitude',
                  'aqi_co',
                 'aqi_no2',
                 'aqi_o3',
                 'aqi_pm10',
                 'aqi_pm25_frm',
                 'aqi_pm25_nonfrm',
                 'aqi_pm25_speciation',
                 'aqi_so2',
                 'arithmetic_mean_co',
                 'arithmetic_mean_no2',
                 'arithmetic_mean_o3',
                 'arithmetic_mean_pm10',
                 'arithmetic_mean_pm25_frm',
                 'arithmetic_mean_pm25_nonfrm',
                 'arithmetic_mean_pm25_speciation',
                 'arithmetic_mean_pressure',
                 'arithmetic_mean_so2',
                 'arithmetic_mean_temperature',
                 'arithmetic_mean_wind',
                 lag('max_aqi',2).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_2_max_aqi'),
                 lag('max_aqi',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_max_aqi'),
                 lag('aqi_co',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_aqi_co'),
                 lag('aqi_no2',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_aqi_no2'),
                 lag('aqi_o3',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_aqi_o3'),
                 lag('aqi_pm10',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_aqi_pm10'),
                 lag('aqi_pm25_frm',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_aqi_pm25_frm'),
                 lag('aqi_pm25_nonfrm',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_aqi_pm25_nonfrm'),
                 lag('aqi_pm25_speciation',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_aqi_pm25_speciation'),
                 lag('aqi_so2',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_aqi_so2'),
                 lag('arithmetic_mean_co',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_arithmetic_mean_co'),
                 lag('arithmetic_mean_no2',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_arithmetic_mean_no2'),
                 lag('arithmetic_mean_o3',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_arithmetic_mean_o3'),
                 lag('arithmetic_mean_pm10',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_arithmetic_mean_pm10'),
                 lag('arithmetic_mean_pm25_frm',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_arithmetic_mean_pm25_frm'),
                 lag('arithmetic_mean_pm25_nonfrm',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_arithmetic_mean_pm25_nonfrm'),
                 lag('arithmetic_mean_pm25_speciation',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_arithmetic_mean_pm25_speciation'),
                 lag('arithmetic_mean_pressure',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_arithmetic_mean_pressure'),
                 lag('arithmetic_mean_so2',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_arithmetic_mean_so2'),
                 lag('arithmetic_mean_temperature',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_arithmetic_mean_temperature'),
                 lag('arithmetic_mean_wind',1).over(Window.partitionBy(common_key).orderBy('date_local')).alias('lag_1_arithmetic_mean_wind'),
                ).persist(StorageLevel.DISK_ONLY)

In [25]:
df5.select('county_name','county_code','site_num','max_aqi','date_local','lag_2_max_aqi','lag_1_max_aqi','label').show(100)

+-----------+-----------+--------+-------+----------+-------------+-------------+-----+
|county_name|county_code|site_num|max_aqi|date_local|lag_2_max_aqi|lag_1_max_aqi|label|
+-----------+-----------+--------+-------+----------+-------------+-------------+-----+
|      Allen|          3|       2|   35.0|2017-03-01|         null|         null| 37.0|
|      Allen|          3|       2|   37.0|2017-03-02|         null|         35.0| 41.0|
|      Allen|          3|       2|   41.0|2017-03-03|         35.0|         37.0| 34.0|
|      Allen|          3|       2|   34.0|2017-03-04|         37.0|         41.0| 38.0|
|      Allen|          3|       2|   38.0|2017-03-05|         41.0|         34.0| 36.0|
|      Allen|          3|       2|   36.0|2017-03-06|         34.0|         38.0| 38.0|
|      Allen|          3|       2|   38.0|2017-03-07|         38.0|         36.0| 42.0|
|      Allen|          3|       2|   42.0|2017-03-08|         36.0|         38.0| 39.0|
|      Allen|          3|       

In [26]:
df6 = df5.withColumn("trend1", col("max_aqi")/col("lag_1_max_aqi")).withColumn("trend2",col("max_aqi")/col("lag_2_max_aqi"))

In [27]:
df7 = df6.filter('label > 0').na.fill(0)

In [29]:
va = VectorAssembler(outputCol="features", inputCols=df7.columns[6:]) 
df8 = va.transform(df7).select("features", "label")

In [38]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [39]:
%%time

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = df8.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol='features', numTrees=100)

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[rf])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

#rfModel = model.stages[1]
#print(rfModel)  # summary only

+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
| 33.08112398116898| 23.0|(44,[0,1,3,4,11,1...|
| 32.03715781569065| 31.0|(44,[0,1,3,4,11,1...|
|33.703216778991056| 41.0|(44,[0,1,3,4,11,1...|
|31.880766624880106| 27.0|(44,[0,1,3,4,11,1...|
|28.045838637613528| 26.0|(44,[0,1,3,4,11,1...|
+------------------+-----+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 17.0039
CPU times: user 113 ms, sys: 8.16 ms, total: 121 ms
Wall time: 9min 31s


In [40]:
%%time


# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = df8.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol='features', numTrees=10)

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[rf])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

#rfModel = model.stages[1]
#print(rfModel)  # summary only

+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
|35.391528548542006| 47.0|(44,[0,1,3,4,11,1...|
| 27.74044564019846| 16.0|(44,[0,1,3,4,11,1...|
|27.876046629876203| 27.0|(44,[0,1,3,4,11,1...|
|  31.0133962057185| 31.0|(44,[0,1,3,4,11,1...|
|29.349164688496195| 26.0|(44,[0,1,3,4,11,1...|
+------------------+-----+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 18.1925
CPU times: user 30.9 ms, sys: 17 ms, total: 47.8 ms
Wall time: 1min 15s


In [41]:
%%time


# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = df8.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol='features', numTrees=50)

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[rf])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

#rfModel = model.stages[1]
#print(rfModel)  # summary only

+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
| 32.58379812283956| 23.0|(44,[0,1,3,4,11,1...|
| 25.57251823927943| 16.0|(44,[0,1,3,4,11,1...|
|27.743214949548197| 27.0|(44,[0,1,3,4,11,1...|
|28.793958275885394| 19.0|(44,[0,1,3,4,11,1...|
|28.571704653482993| 34.0|(44,[0,1,3,4,11,1...|
+------------------+-----+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 16.7988
CPU times: user 64.6 ms, sys: 12.3 ms, total: 77 ms
Wall time: 3min 4s


## Model: GBoost


In [43]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [45]:
%%time

# Split the data into training and test sets (30% held out for testing)
(trainingData3, testData3) = df8.randomSplit([0.7, 0.3])

# Train a RandomForest model.
gbt = GBTRegressor(featuresCol='features')

# Chain indexer and forest in a Pipeline
pipeline3 = Pipeline(stages=[gbt])

# Train model.  This also runs the indexer.
model3 = pipeline3.fit(trainingData)

# Make predictions.
predictions3 = model3.transform(testData)

# Select example rows to display.
predictions3.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator3 = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse3 = evaluator3.evaluate(predictions3)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse3)

+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
| 31.32354748679522| 23.0|(44,[0,1,3,4,11,1...|
|26.079270957812813| 16.0|(44,[0,1,3,4,11,1...|
|24.593950597865202| 27.0|(44,[0,1,3,4,11,1...|
| 24.45541347385461| 19.0|(44,[0,1,3,4,11,1...|
| 25.98360308059669| 34.0|(44,[0,1,3,4,11,1...|
+------------------+-----+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 15.6425
CPU times: user 103 ms, sys: 17.3 ms, total: 121 ms
Wall time: 9min 13s
