<div align="left">
   <img src="https://usfdons.com/images/responsive/footer_usfca.png" style="width:250px;">
</div>
<div align="center"><b>MSDS697 Distributed Data Systems</b></div>

**Authors:** Anush Kocharyan, Jyoti Prakash Maheswari, Viviana Márquez, Wenkun Xiao<br>
<hr>

# Running ML models on feature data on m3.xlarge instance

###  Load libraries

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor

from pyspark.ml.evaluation import RegressionEvaluator

# n-fold validation and the results.
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

### Getting data from Mongodb

In [2]:
import numpy as np
import pandas as pd
import os
pyspark_submit_args = '--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0 pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args
from pyspark.sql import SparkSession

spark = SparkSession \
  .builder \
  .appName("myApp") \
  .config("spark.mongodb.input.uri", "mongodb://18.221.66.227/project.test")\
  .getOrCreate()

###  The feature data consists of 340000 rows

In [3]:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

In [4]:
df.count()

340000

###  Splitting the data into training and test sets

In [5]:
df_train, df_test = df.randomSplit([0.8,0.2])

In [6]:
df_train.show(1)

+--------------------+-------+-------+---------------+---------------+--------------+--------------+---------+-----+---------------+---------------+--------------+--------------+------------------+---------------+-----------------+--------------------+-----------------+----------------------------+----------------------------+---------------------------+---------------------------+------+---------------+---------------+--------------+--------------+---+---+---+---+---+---+---+----+----+-----------------+------------------+-----------------+------------------+-----------------+--------+------------------+
|                 _id|abs_max|abs_min|avg_first_10000|avg_first_50000|avg_last_10000|avg_last_50000|count_big| max1|max_first_10000|max_first_50000|max_last_10000|max_last_50000|        max_to_min|max_to_min_diff|             mean|     mean_change_abs| mean_change_rate|mean_change_rate_first_10000|mean_change_rate_first_50000|mean_change_rate_last_10000|mean_change_rate_last_50000|  mi

In [7]:
feature_names = df_train.columns[1:-1]
feature_names

['abs_max',
 'abs_min',
 'avg_first_10000',
 'avg_first_50000',
 'avg_last_10000',
 'avg_last_50000',
 'count_big',
 'max1',
 'max_first_10000',
 'max_first_50000',
 'max_last_10000',
 'max_last_50000',
 'max_to_min',
 'max_to_min_diff',
 'mean',
 'mean_change_abs',
 'mean_change_rate',
 'mean_change_rate_first_10000',
 'mean_change_rate_first_50000',
 'mean_change_rate_last_10000',
 'mean_change_rate_last_50000',
 'min1',
 'min_first_10000',
 'min_first_50000',
 'min_last_10000',
 'min_last_50000',
 'q60',
 'q65',
 'q70',
 'q75',
 'q80',
 'q85',
 'q90',
 'q95',
 'q99',
 'std',
 'std_first_10000',
 'std_first_50000',
 'std_last_10000',
 'std_last_50000',
 'sum1']

### Creating a feature vector assembler

In [8]:
va = VectorAssembler(outputCol="features", inputCols=df_train.columns[1:-1])
df_train = va.transform(df_train).select("features", "y_target").persist()
df_test = va.transform(df_test).select("features", "y_target").persist()

In [9]:
df_train.show(1)

+--------------------+------------------+
|            features|          y_target|
+--------------------+------------------+
|[116.0,0.0,4.7647...|12.911298210999998|
+--------------------+------------------+
only showing top 1 row



### Set one evaluator for all algrithms, metric: RMSE on validation set

In [10]:
evaluator=RegressionEvaluator(labelCol="y_target",predictionCol="prediction", metricName="rmse")

### Decision Tree Regressor 

In [26]:
dt_reg = DecisionTreeRegressor(featuresCol = 'features', labelCol='y_target')
dt_regModel = dt_reg.fit(df_train)
dt_reg_preds = dt_regModel.transform(df_test)

In [13]:
dt_reg_feature_importance = zip([feature_names[i] for i in dt_regModel.featureImportances.indices],\
                                dt_regModel.featureImportances.values)
dt_reg_feature_importance.sort(key = lambda x: abs(x[1]),reverse=True)
[(x[0],round(x[1],4))for x in dt_reg_feature_importance]

[('q99', 0.6989),
 ('q95', 0.1568),
 ('min1', 0.0498),
 ('mean', 0.0467),
 ('max1', 0.0101),
 ('q85', 0.008),
 ('std', 0.0078),
 ('q90', 0.005),
 ('max_to_min_diff', 0.0041),
 ('abs_max', 0.0039),
 ('q80', 0.0032),
 ('max_first_50000', 0.0024),
 ('max_last_50000', 0.0019),
 ('mean_change_rate', 0.0013)]

In [23]:
dt_reg_RMSE_test = evaluator.evaluate(dt_reg_preds)
print("Decision tree model RMSE on test set = "+ '{:.4f}'.format(dt_reg_RMSE_test))

Decision tree model RMSE on test set = 2.2827


### Linear Regression

In [27]:
lr = LinearRegression(featuresCol = 'features', labelCol='y_target',
                      maxIter=10, regParam=0.3, elasticNetParam=0.8)
lrModel = lr.fit(df_train)
lr_preds = lrModel.transform(df_test)

In [16]:
# feature_names x coefficients
beta0 = [('intercept',round(lrModel.intercept,4))]
beta1 = [(x[0],round(x[1],4)) for x in zip(feature_names,lrModel.coefficients) if abs(x[1])>0.0001]
beta1.sort(key = lambda x: abs(x[1]),reverse=True)
beta0+beta1

[('intercept', 25.6455),
 ('q80', -3.067),
 ('mean', 2.1892),
 ('q85', -1.0524),
 ('q75', -0.6852),
 ('q90', -0.6503),
 ('avg_last_50000', 0.4187),
 ('avg_first_50000', 0.3158),
 ('count_big', 0.0095),
 ('std_first_50000', 0.0071),
 ('std_last_50000', -0.0013),
 ('max_last_10000', -0.0011),
 ('min_last_50000', 0.0004),
 ('std_last_10000', -0.0004),
 ('max_first_50000', 0.0003),
 ('min_last_10000', 0.0003),
 ('min_first_50000', -0.0002),
 ('max_last_50000', -0.0001)]

In [22]:
lr_preds = lrModel.transform(df_test)
lr_RMSE_test = evaluator.evaluate(lr_preds)
print("Linear model RMSE on test set = "+ '{:.4f}'.format(lr_RMSE_test))

Linear model RMSE on test set = 2.9032


### Random Forest Regression

In [28]:
rf_reg = RandomForestRegressor(featuresCol = 'features', labelCol='y_target')
rf_regModel = rf_reg.fit(df_train)
rf_reg_preds = rf_regModel.transform(df_test)

In [24]:
rf_reg_feature_importance = zip([feature_names[i] for i in rf_regModel.featureImportances.indices],\
                                rf_regModel.featureImportances.values)
rf_reg_feature_importance.sort(key = lambda x: abs(x[1]),reverse=True)
[(x[0],round(x[1],4)) for x in rf_reg_feature_importance if x[1]>0.001]



[('q99', 0.2708),
 ('q95', 0.2352),
 ('q90', 0.2287),
 ('std', 0.0506),
 ('q80', 0.036),
 ('q85', 0.0298),
 ('sum1', 0.0277),
 ('mean', 0.0274),
 ('std_first_50000', 0.0195),
 ('std_first_10000', 0.0142),
 ('min1', 0.0102),
 ('max_to_min_diff', 0.01),
 ('abs_max', 0.0082),
 ('max1', 0.0069),
 ('avg_first_50000', 0.0066),
 ('max_to_min', 0.0047),
 ('avg_last_50000', 0.0045),
 ('max_last_50000', 0.0011),
 ('avg_first_10000', 0.0011),
 ('avg_last_10000', 0.0011),
 ('q75', 0.001)]

In [25]:
rf_reg_preds = rf_regModel.transform(df_test)
rf_reg_RMSE_test = evaluator.evaluate(rf_reg_preds)
print("Random Forest model RMSE on test set = "+ '{:.4f}'.format(rf_reg_RMSE_test))

Random Forest model RMSE on test set = 2.2273
