In [1]:
import pyspark
sc = pyspark.SparkContext()
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [2]:
from statistics import mean
import numpy as np
import re
import os
files = os.listdir('WeatherDatasets/')
files

['state05_CO.txt.gz',
 'state06_CT.txt.gz',
 'state03_AR.txt.gz',
 'state04_CA.txt.gz',
 'state01_AL.txt.gz',
 'state02_AZ.txt.gz']

In [3]:
def datacleaner(row):
    row = row.replace('-9999','')
    return row[:16]+re.sub(r'[A-Za-z]','',row[16:])

In [4]:
combinedRdd = sc.emptyRDD()
for file in files:
    textRdd = sc.textFile(os.path.join('WeatherDatasets',files[0])).map(datacleaner)
    def getRow(X):
        return [file[8:10],int(X[6:10]),int(X[10:12]),X[12:16],mean(map(float,X[16:].split()))]
    newRdd = textRdd.map(getRow)
    combinedRdd = combinedRdd.union(newRdd)
combinedRdd.take(5)

[['CO', 1893, 10, 'TMAX', 35.0],
 ['CO', 1893, 10, 'TMIN', 22.532258064516128],
 ['CO', 1893, 10, 'PRCP', 4.870967741935484],
 ['CO', 1893, 11, 'TMAX', 28.316666666666666],
 ['CO', 1893, 11, 'TMIN', 17.15]]

In [5]:
df_data = sqlContext.createDataFrame(combinedRdd, ['STATE','YEAR',"MONTH","METRIC","VALUE"])
df_data.show(20)

+-----+----+-----+------+------------------+
|STATE|YEAR|MONTH|METRIC|             VALUE|
+-----+----+-----+------+------------------+
|   CO|1893|   10|  TMAX|              35.0|
|   CO|1893|   10|  TMIN|22.532258064516128|
|   CO|1893|   10|  PRCP| 4.870967741935484|
|   CO|1893|   11|  TMAX|28.316666666666666|
|   CO|1893|   11|  TMIN|             17.15|
|   CO|1893|   11|  PRCP| 4.333333333333333|
|   CO|1893|   11|  SNOW|               4.5|
|   CO|1893|   12|  TMAX|27.366666666666667|
|   CO|1893|   12|  TMIN|              17.0|
|   CO|1893|   12|  PRCP| 3.629032258064516|
|   CO|1893|   12|  SNOW|11.333333333333334|
|   CO|1894|    1|  TMAX|24.541666666666668|
|   CO|1894|    1|  TMIN|13.977272727272727|
|   CO|1894|    1|  PRCP|3.2758620689655173|
|   CO|1894|    1|  SNOW|3.3225806451612905|
|   CO|1894|    2|  TMAX|              21.5|
|   CO|1894|    2|  TMIN|             11.25|
|   CO|1894|    2|  PRCP| 4.464285714285714|
|   CO|1894|    2|  SNOW|              16.0|
|   CO|189

In [6]:
from pyspark.sql.functions import rand
data = df_data.groupBy(['STATE','YEAR','MONTH']).pivot('METRIC').avg('VALUE').fillna(0)
data.orderBy(rand()).show(10)

+-----+----+-----+------------------+------------------+------------------+------------------+------------------+
|STATE|YEAR|MONTH|              PRCP|              SNOW|              SNWD|              TMAX|              TMIN|
+-----+----+-----+------------------+------------------+------------------+------------------+------------------+
|   AR|1923|    4| 3.163492063492064|            1.9625|3.9681972789115645|31.694614729672203|15.805469544139493|
|   CA|1948|    1|  2.86231884057971| 4.699169110459432| 4.790610405126535| 20.47539738195418| 5.215109179281327|
|   AL|1987|    1|1.0657504937458855| 1.818440352596359|1.0299999999999998|20.021582181259603|  5.32347670250896|
|   CA|1973|    1|0.6438172043010754| 1.207807386629266|2.8924541434135422|17.141129032258068|3.2896505376344085|
|   AR|1896|    1|3.8064516129032255| 7.204637096774194|               0.0| 25.82459677419355| 9.578629032258064|
|   AZ|1940|    9| 5.613079918374147|              1.25|               1.2| 39.691849462

In [7]:
firstYear = data.agg({"YEAR": "min"}).collect()[0][0]
data = data.withColumn('YEAR',data.YEAR-firstYear)
print(firstYear)
data.show(5)

1893
+-----+----+-----+------------------+-------------------+--------------------+------------------+------------------+
|STATE|YEAR|MONTH|              PRCP|               SNOW|                SNWD|              TMAX|              TMIN|
+-----+----+-----+------------------+-------------------+--------------------+------------------+------------------+
|   CA|  49|   10| 6.141084296561395| 1.8779803646563815|  2.7615865701119158|33.491367219615995| 17.91606616046816|
|   AZ| 102|    9|2.9816228790366717| 0.2616666666666666|0.015892857142857143|37.771653373993274| 21.46551724137931|
|   CA| 107|   10|2.5755904607072573|0.11303123399897592|0.001536098310291...|32.155666790260774|16.682343256175294|
|   AR| 100|    2| 2.178679261559697| 2.7653059903059902|  3.0509509009509004|19.093890484515484| 6.775484931734931|
|   CT| 117|    3|2.5027697441601777| 1.8899481566820282|   1.391853689228828|25.633870967741935| 11.10725806451613|
+-----+----+-----+------------------+-------------------+--

In [8]:
from pyspark.ml.feature import CountVectorizer
from pyspark.sql.functions import col, split
data = data.withColumn("STATE", split(col("STATE")," "))
stateVectorizer = CountVectorizer(inputCol="STATE", outputCol="STATE_OHE", vocabSize=6, minDF=1.0)
stateVectorizer_model = stateVectorizer.fit(data)
ohe_data = stateVectorizer_model.transform(data)

In [9]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols=['STATE_OHE','YEAR','MONTH','PRCP','SNOW','SNWD'],
                                  outputCol='features')
X = vectorAssembler.transform(ohe_data)
X = X.select(['features','TMAX','TMIN'])
X.show(5)

+--------------------+------------------+------------------+
|            features|              TMAX|              TMIN|
+--------------------+------------------+------------------+
|(11,[3,6,7,8,9,10...|33.491367219615995| 17.91606616046816|
|(11,[2,6,7,8,9,10...|37.771653373993274| 21.46551724137931|
|(11,[3,6,7,8,9,10...|32.155666790260774|16.682343256175294|
|(11,[1,6,7,8,9,10...|19.093890484515484| 6.775484931734931|
|(11,[5,6,7,8,9,10...|25.633870967741935| 11.10725806451613|
+--------------------+------------------+------------------+
only showing top 5 rows



In [10]:
#train test split
(X_train,X_val) = X.randomSplit([0.7,0.3])

## Train on Maximum temperature

In [11]:
from pyspark.ml.regression import RandomForestRegressor
rf_max = RandomForestRegressor(featuresCol='features',labelCol='TMAX')
rf_max_model = rf_max.fit(X_train)
predictions = rf_max_model.transform(X_val)
predictions.show(5)

+--------------------+------------------+------------------+------------------+
|            features|              TMAX|              TMIN|        prediction|
+--------------------+------------------+------------------+------------------+
|(11,[0,6,7,8,9,10...|26.820138888888895|10.126173371647512|25.715864729074827|
|(11,[1,6,7,8,9,10...| 36.82249351863658|  21.3342431545713| 39.61775247674409|
|(11,[1,6,7,8,9,10...|22.096496106785313| 7.093614116695317|24.102587754612358|
|(11,[2,6,7,8,9,10...| 39.67297851074463|21.075180966064586|37.302877916617085|
|(11,[3,6,7,8,9,10...| 34.18022977758433|17.782702499556674|36.598142006856385|
+--------------------+------------------+------------------+------------------+
only showing top 5 rows



In [12]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction",labelCol="TMAX",metricName="mae")
lr_evaluator.evaluate(predictions)

2.1904977078212995

## Train on Minimum temperature

In [13]:
from pyspark.ml.regression import RandomForestRegressor
rf_min = RandomForestRegressor(featuresCol='features',labelCol='TMIN')
rf_min_model = rf_min.fit(X_train)
predictions = rf_min_model.transform(X_val)
predictions.show(5)

+--------------------+------------------+------------------+------------------+
|            features|              TMAX|              TMIN|        prediction|
+--------------------+------------------+------------------+------------------+
|(11,[0,6,7,8,9,10...|26.820138888888895|10.126173371647512| 9.990956619569086|
|(11,[1,6,7,8,9,10...| 36.82249351863658|  21.3342431545713|23.099525048749722|
|(11,[1,6,7,8,9,10...|22.096496106785313| 7.093614116695317|  9.61731307202829|
|(11,[2,6,7,8,9,10...| 39.67297851074463|21.075180966064586|20.727313527185125|
|(11,[3,6,7,8,9,10...| 34.18022977758433|17.782702499556674| 19.74247482058521|
+--------------------+------------------+------------------+------------------+
only showing top 5 rows



In [14]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction",labelCol="TMIN",metricName="mae")
lr_evaluator.evaluate(predictions)

1.7878727616183605

### Results
#### Using Year, State, Month, PRCP, SNOW, SNWD as independent variables
1. Mean Absolute Error on TMAX : **2.1904**
2. Mean Absolute Error on TMIN : **1.7878**