In [109]:
import os
import sys

#Set the path for Spark installation
os.environ['SPARK_HOME'] = "C:/spark"

#append to python path so that pyspark could be found
sys.path.append("C:/spark/bin")
sys.path.append("C:/spark/python")
sys.path.append("C:/spark/python/pyspark")
sys.path.append("C:/spark/python/lib")
sys.path.append("C:/spark/python/lib/pyspark.zip")
sys.path.append("C:/spark/python/lib/py4j-0.10.4-src.zip")
sys.path.append("C:/java/bin")

In [111]:
from __future__ import print_function
import pandas as panda
import numpy as nump
from matplotlib import pyplot as matplt
import seaborn as sb
%matplotlib inline

In [112]:
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark import rdd
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml import pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [113]:
spark = SparkSession.builder\
        .master("local")\
        .appName("DenAI")\
        .config(conf=SparkConf())\
        .getOrCreate()

In [114]:
path1 = "/Users/dheer/Documents/DengAI/dengue_features_train.csv"
train_features = spark.read.option("header", "true").option("inferSchema", "true").csv(path1)

path2 = "/Users/dheer/Documents/DengAI/dengue_labels_train.csv"
train_labels = spark.read.option("header", "true").option("inferSchema", "true").csv(path2)

path3 = "/Users/dheer/Documents/DengAI/dengue_features_test.csv"
test_features = spark.read.option("header", "true").option("inferSchema", "true").csv(path3)

In [115]:
train_labels = train_labels.select(train_labels.city.alias("city_l"),
                            train_labels.year.alias("year_l"),
                            train_labels.weekofyear.alias("weekofyear_l"),
                            train_labels.total_cases.alias("total_cases"))

In [116]:
train_features = train_features.withColumn("month", month("week_start_date"))

In [117]:
train = train_features.join(train_labels,
                            ((train_features.city == train_labels.city_l) &
                             (train_features.year == train_labels.year_l) &
                             (train_features.weekofyear == train_labels.weekofyear_l)), 'inner')

In [118]:
train = train.drop("city_l", "year_l", "weekofyear_l", "week_start_date")

In [119]:
train_pd = train.toPandas()

In [120]:
train_pd.fillna(method = 'ffill', inplace = True)

In [121]:
train = spark.createDataFrame(train_pd)

In [122]:
sj_trainData = train.filter(train.city == 'sj')

iq_trainData = train.filter(train.city == 'iq')

In [123]:
sj = sj_trainData.select('ndvi_se', 'month','reanalysis_specific_humidity_g_per_kg', 'station_min_temp_c', 
                         'reanalysis_tdtr_k', 'reanalysis_max_air_temp_k', col('total_cases').alias('label'))

In [124]:
iq = iq_trainData.select('ndvi_se', 'month', 'reanalysis_specific_humidity_g_per_kg','station_min_temp_c', 
                         'reanalysis_tdtr_k', 'reanalysis_max_air_temp_k',col('total_cases').alias('label'))

In [125]:
assembler = VectorAssembler().setInputCols([
    'ndvi_se', 'month', 'reanalysis_specific_humidity_g_per_kg',
    'station_min_temp_c', 'reanalysis_tdtr_k', 'reanalysis_max_air_temp_k']).setOutputCol('features')

In [126]:
rf = RandomForestRegressor(labelCol = 'label', featuresCol="features", numTrees=10, maxDepth=3, maxBins=10)

In [127]:
sj = assembler.transform(sj).select('label', 'features')

iq = assembler.transform(iq).select('label', 'features')

In [128]:
sj_training, sj_validation = sj.randomSplit([0.75,0.25], seed=0L)
iq_training, iq_validation = iq.randomSplit([0.75,0.25], seed=0L)

In [132]:
sjModel = rf.fit(sj_training)
iqModel = rf.fit(iq_training)

In [133]:
sj_valid_pred = sjModel.transform(sj_validation)
iq_valid_pred = iqModel.transform(iq_validation)

In [134]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")

In [135]:
sj_MeanAbsError = evaluator.evaluate(sj_valid_pred)
iq_MeanAbsError = evaluator.evaluate(iq_valid_pred)

In [136]:
print("Mean Absolute Error (MAE) on sj data = " + str(sj_MeanAbsError))

Mean Absolute Error (MAE) on sj data = 25.1474411653


In [137]:
print("Mean Absolute Error (MAE) on iq data = " + str(iq_MeanAbsError))

Mean Absolute Error (MAE) on iq data = 7.96109714168


In [138]:
trainData = assembler.transform(train).select("city", col("total_cases").alias("label"), "features")

In [139]:
training, validation = trainData.randomSplit([0.7,0.3], seed=0L)

In [140]:
sj_training = training.filter(training.city == "sj")
iq_training = training.filter(training.city == "iq")

In [141]:
sj_validation = validation.filter(validation.city == "sj")
iq_validation = validation.filter(validation.city == "iq")

In [143]:
sjModel = rf.fit(sj_training)
iqModel = rf.fit(iq_training)

In [144]:
sj_valid_pred = sjModel.transform(sj_validation)
iq_valid_pred = iqModel.transform(iq_validation)

In [145]:
pred = sj_valid_pred.unionAll(iq_valid_pred).select("label", "prediction")

In [146]:
eval = evaluator.evaluate(pred)

In [147]:
print("Mean Absolute Error (MAE) on Validation data = " + str(eval))

Mean Absolute Error (MAE) on Validation data = 19.4348778419


In [148]:
from pyspark.ml.feature import PCA

In [149]:
features_assembler = VectorAssembler().setInputCols(train.columns[3:24]).setOutputCol('features')

In [150]:
train_pca = features_assembler.transform(train).select("city", col("total_cases").alias("label"), "features")

In [151]:
dimpca = PCA(k=10, inputCol="features", outputCol="pca_features")

In [152]:
train_pca_model = dimpca.fit(train_pca)

In [153]:
train_pca_data = train_pca_model.transform(train_pca)

In [154]:
train_pca_data = train_pca_data.select("city", "label", col("pca_features").alias("features"))

In [155]:
pca_training, pca_validation = train_pca_data.randomSplit([0.7, 0.3], seed=0L)

In [156]:
sj_pca_training = pca_training.filter(pca_training.city == "sj")
iq_pca_training = pca_training.filter(pca_training.city == "iq")

In [157]:
sj_pca_validation = pca_validation.filter(pca_validation.city == "sj")
iq_pca_validation = pca_validation.filter(pca_validation.city == "iq")

In [158]:
sj_pca_model = rf.fit(sj_pca_training)
iq_pca_model = rf.fit(iq_pca_training)

In [159]:
sj_pca_pred = sj_pca_model.transform(sj_pca_validation)
iq_pca_pred = iq_pca_model.transform(iq_pca_validation)

In [160]:
pca_pred = sj_pca_pred.unionAll(iq_pca_pred)

In [161]:
pca_MAE = evaluator.evaluate(pca_pred)

In [162]:
print("Mean Absolute Error (MAE) on PCA transformed data = " + str(pca_MAE))

Mean Absolute Error (MAE) on PCA transformed data = 19.5247280502


In [163]:
MAE = []

In [164]:
for i in range(1,22):
    dimpca = PCA(k=i, inputCol="features", outputCol="pca_features")
    
    train_pca_model = dimpca.fit(train_pca)
    train_pca_data = train_pca_model.transform(train_pca)
    
    train_pca_data = train_pca_data.select("city", "label", col("pca_features").alias("features"))
    pca_training, pca_validation = train_pca_data.randomSplit([0.7, 0.3], seed=0L)
    
    sj_pca_training = pca_training.filter(pca_training.city == "sj")
    iq_pca_training = pca_training.filter(pca_training.city == "iq")
    sj_pca_validation = pca_validation.filter(pca_validation.city == "sj")
    iq_pca_validation = pca_validation.filter(pca_validation.city == "iq")
    
    sj_pca_model = rf.fit(sj_pca_training)
    iq_pca_model = rf.fit(iq_pca_training)
    
    sj_pca_pred = sj_pca_model.transform(sj_pca_validation)
    iq_pca_pred = iq_pca_model.transform(iq_pca_validation)
    
    pca_pred = sj_pca_pred.unionAll(iq_pca_pred)
    MAE.append(evaluator.evaluate(pca_pred))

In [52]:
MAE

[21.939140878284103,
 21.63323181077667,
 21.589274828398974,
 21.587911462607373,
 20.616462184966732,
 19.683459151290602,
 18.981344766286174,
 19.18494246753609,
 19.091503999986976,
 19.52472805018948,
 19.40112005001866,
 19.323960124060505,
 19.3999706509447,
 19.248330946845467,
 19.407245624578813,
 19.511210364709754,
 19.440189834479682,
 19.46659648379723,
 19.68984212491797,
 19.22605004248523,
 19.392898552092838]

In [165]:
MAE[6]

18.981344766286174

In [166]:
from pyspark.ml.regression import GeneralizedLinearRegression

In [167]:
glr = GeneralizedLinearRegression(family="poisson", link="sqrt", maxIter=5, regParam=0.3)

sj_glmmodel = glr.fit(sj_training)
iq_glmmodel = glr.fit(iq_training)

sj_glmPred = sj_glmmodel.transform(sj_validation)
iq_glmPred = iq_glmmodel.transform(iq_validation)

glmPred = sj_glmPred.unionAll(iq_glmPred)

glmMae = evaluator.evaluate(glmPred)

print("Mean Absolute Error (MAE) on GLM model data = " + str(glmMae))

Mean Absolute Error (MAE) on GLM model data = 20.8204295766


In [168]:
glm_MAE = []

In [169]:
for i in [0.1,0.3,0.5,0.7,0.9]:
    glr = GeneralizedLinearRegression(family="poisson", link="sqrt", maxIter=5, regParam=i)
    
    sj_glmmodel = glr.fit(sj_training)
    iq_glmmodel = glr.fit(iq_training)
    
    sj_glmPred = sj_glmmodel.transform(sj_validation)
    iq_glmPred = iq_glmmodel.transform(iq_validation)
    
    glmPred = sj_glmPred.unionAll(iq_glmPred)
    glm_MAE.append(evaluator.evaluate(glmPred))

In [170]:
glm_MAE

[20.834510978879216,
 20.8204295765504,
 20.801766427710938,
 20.779624283328385,
 20.76202379948141]

In [171]:
# x=glmPred.select("city", "prediction")

In [172]:
# x.write.csv("result")

In [173]:
dimpca = PCA(k=7, inputCol="features", outputCol="pca_features")

train_pca_model = dimpca.fit(train_pca)
train_pca_data = train_pca_model.transform(train_pca)

train_pca_data = train_pca_data.select("city", "label", col("pca_features").alias("features"))
pca_training, pca_validation = train_pca_data.randomSplit([0.7, 0.3], seed=0L)
    
sj_pca_training = pca_training.filter(pca_training.city == "sj")
iq_pca_training = pca_training.filter(pca_training.city == "iq")
sj_pca_validation = pca_validation.filter(pca_validation.city == "sj")
iq_pca_validation = pca_validation.filter(pca_validation.city == "iq")

In [174]:
glr = GeneralizedLinearRegression(family="poisson", link="sqrt", maxIter=5, regParam=0.9)

sj_glmmodel = glr.fit(sj_pca_training)
iq_glmmodel = glr.fit(iq_pca_training)

sj_glmPred = sj_glmmodel.transform(sj_pca_validation)
iq_glmPred = iq_glmmodel.transform(iq_pca_validation)

glmPred = sj_glmPred.unionAll(iq_glmPred)

glmMae = evaluator.evaluate(glmPred)

print("Mean Absolute Error (MAE) on GLM model data = " + str(glmMae))

Mean Absolute Error (MAE) on GLM model data = 19.64218437


In [175]:
training = features_assembler.transform(train).select("city", col("total_cases").alias("label"), "features")

In [176]:
dimpca = PCA(k=7, inputCol="features", outputCol="pca_features")

In [177]:
train_pca_model = dimpca.fit(training)

In [178]:
train_pca_data = train_pca_model.transform(training)

In [179]:
train_pca_data = train_pca_data.select("city", "label", col("pca_features").alias("features"))

In [180]:
sj_pca_training = train_pca_data.filter(train_pca_data.city == "sj")
iq_pca_training = train_pca_data.filter(train_pca_data.city == "iq")

In [181]:
test_features = test_features.withColumn("month", month("week_start_date"))

In [182]:
test_features = test_features.drop("week_start_date")

In [183]:
test_pd = test_features.toPandas()

In [184]:
test_pd.fillna(method = 'ffill', inplace = True)

In [185]:
test_features = spark.createDataFrame(test_pd)

In [186]:
testing = features_assembler.transform(test_features).select("city", "year", "weekofyear", "features")

In [187]:
test_pca_model = dimpca.fit(testing)
test_pca_data = train_pca_model.transform(testing)

In [188]:
test_pca_data = test_pca_data.select("city", "year", "weekofyear",col("pca_features").alias("features"))

In [189]:
sj_pca_testing = test_pca_data.filter(test_pca_data.city == "sj")
iq_pca_testing = test_pca_data.filter(test_pca_data.city == "iq")

In [190]:
sj_pca_model = rf.fit(sj_pca_training)
iq_pca_model = rf.fit(iq_pca_training)

In [191]:
sj_pca_pred = sj_pca_model.transform(sj_pca_testing)
iq_pca_pred = iq_pca_model.transform(iq_pca_testing)

In [192]:
pca_predict = sj_pca_pred.unionAll(iq_pca_pred)

In [193]:
y = pca_predict.withColumn("total_cases", round(col("prediction"))).select("city", "year", "weekofyear", "total_cases")

In [194]:
# y.coalesce(1).write.csv("final")

In [195]:
# y.coalesce(1).write.option("header", "true").csv("result")

In [196]:
y_pd = y.toPandas()

In [197]:
y_pd.total_cases = y_pd.total_cases.astype(int)

In [198]:
y_pd.to_csv("RFresult.csv", index=False)

In [199]:
sj_pca_glmmodel = glr.fit(sj_pca_training)
iq_pca_glmmodel = glr.fit(iq_pca_training)

In [200]:
sj_pca_glmpred = sj_pca_glmmodel.transform(sj_pca_testing)
iq_pca_glmpred = iq_pca_glmmodel.transform(iq_pca_testing)

In [201]:
pca_glmpredict = sj_pca_glmpred.unionAll(iq_pca_glmpred)

In [202]:
z = pca_glmpredict.withColumn("total_cases", round(col("prediction"))).select("city", "year", "weekofyear", "total_cases")

In [203]:
z_pd = z.toPandas()

In [204]:
z_pd.total_cases = z_pd.total_cases.astype(int)

In [205]:
z_pd.to_csv("GLMresult.csv", index=False)