## Model Training

### Spark Session Setup

In [1]:
import findspark

findspark.init()

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('modelTraining').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/16 19:31:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/16 19:31:35 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [3]:
#data loading will change once data is hosted on hadoop? loading from local for now
from pyspark import SparkFiles
df = spark.read.parquet('prepped_data.parquet')

### Train Test Split

In [4]:
train_data, test_data = df.randomSplit([0.7, 0.3], seed = 101)

### Model Importing and Setup

In [5]:
#import models
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, LogisticRegression, GBTClassifier

#set up models with default settings
nb = NaiveBayes()
rf = RandomForestClassifier(seed = 101)
lr = LogisticRegression()
gb = GBTClassifier(seed = 101)

In [6]:
#eval set up
from pyspark.ml.evaluation import BinaryClassificationEvaluator

acc_evaluator = BinaryClassificationEvaluator()

### Hyperparameter Tuning

In [7]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#naive bayes
grid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5]).build()
nb_cv = CrossValidator(estimator = nb, estimatorParamMaps = grid, evaluator = acc_evaluator, numFolds = 5, parallelism = 4, seed = 101)

#random forest
grid = ParamGridBuilder().addGrid(rf.numTrees, range(10, 60, 10)).addGrid(rf.maxDepth, range(1, 11)).build()
rf_cv = CrossValidator(estimator = rf, estimatorParamMaps = grid, evaluator = acc_evaluator, numFolds = 5, parallelism = 4, seed = 101)

#logistic regression
grid = ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.1, 0.5, 1.0]).addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).build()
lr_cv = CrossValidator(estimator = lr, estimatorParamMaps = grid, evaluator = acc_evaluator, numFolds = 5, parallelism = 4, seed = 101)

#gradiant boosted trees
grid = ParamGridBuilder().addGrid(gb.maxDepth, [3, 5, 7]).addGrid(gb.maxIter, [10, 20, 30]).addGrid(gb.stepSize, [0.05, 0.1, 0.2]).build()
gb_cv = CrossValidator(estimator = gb, estimatorParamMaps = grid, evaluator = acc_evaluator, numFolds = 5, parallelism = 4, seed = 101)

In [8]:
#run predictions
nb_cv_model = nb_cv.fit(train_data)
nb_cv_pred = nb_cv_model.transform(test_data)

25/04/16 19:56:42 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/04/16 19:56:42 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

In [13]:
rf_cv_model = rf_cv.fit(train_data)
rf_cv_pred = rf_cv_model.transform(test_data)

25/04/16 20:08:57 WARN DAGScheduler: Broadcasting large task binary with size 1107.0 KiB
25/04/16 20:09:07 WARN DAGScheduler: Broadcasting large task binary with size 1030.9 KiB
25/04/16 20:09:09 WARN DAGScheduler: Broadcasting large task binary with size 1030.9 KiB
25/04/16 20:09:10 WARN DAGScheduler: Broadcasting large task binary with size 1393.0 KiB
25/04/16 20:09:22 WARN DAGScheduler: Broadcasting large task binary with size 1292.7 KiB
25/04/16 20:09:24 WARN DAGScheduler: Broadcasting large task binary with size 1292.7 KiB
25/04/16 20:09:24 WARN DAGScheduler: Broadcasting large task binary with size 1737.9 KiB
25/04/16 20:09:24 WARN DAGScheduler: Broadcasting large task binary with size 1160.4 KiB
25/04/16 20:09:55 WARN DAGScheduler: Broadcasting large task binary with size 1141.7 KiB
25/04/16 20:10:04 WARN DAGScheduler: Broadcasting large task binary with size 1058.0 KiB
25/04/16 20:10:05 WARN DAGScheduler: Broadcasting large task binary with size 1058.0 KiB
25/04/16 20:10:06 WAR

In [16]:
lr_cv_model = lr_cv.fit(train_data)
lr_cv_pred = lr_cv_model.transform(test_data)

In [19]:
gb_cv_model = gb_cv.fit(train_data)
gb_cv_pred = gb_cv_model.transform(test_data)

25/04/16 20:39:51 WARN BlockManager: Asked to remove block broadcast_36734_piece0, which does not exist
25/04/16 20:41:58 WARN BlockManager: Asked to remove block broadcast_45033, which does not exist


In [9]:
#results
print(nb_cv_model.bestModel)

nb_accuracy = acc_evaluator.evaluate(nb_cv_pred)
print(f"Naive Bayes Accuracy: {nb_accuracy}")

NaiveBayesModel: uid=NaiveBayes_be54f111b6de, modelType=multinomial, numClasses=2, numFeatures=4
Naive Bayes Accuracy: 0.440370162795473


In [14]:
print(rf_cv_model.bestModel)

rf_accuracy = acc_evaluator.evaluate(rf_cv_pred)
print(f"Random Forest Accuracy: {rf_accuracy}")

RandomForestClassificationModel: uid=RandomForestClassifier_eafe4c4451a0, numTrees=50, numClasses=2, numFeatures=4


25/04/16 20:15:14 WARN DAGScheduler: Broadcasting large task binary with size 1189.2 KiB


Random Forest Accuracy: 0.9921085607065917


In [17]:
print(lr_cv_model.bestModel)

lr_accuracy = acc_evaluator.evaluate(lr_cv_pred)
print(f"Logistic Regression Accuracy: {lr_accuracy}")

LogisticRegressionModel: uid=LogisticRegression_59d371609159, numClasses=2, numFeatures=4
Logistic Regression Accuracy: 0.8421164330874442


In [20]:
print(gb_cv_model.bestModel)

gb_accuracy = acc_evaluator.evaluate(gb_cv_pred)
print(f"Gradient Boosted Accuracy: {gb_accuracy}")

GBTClassificationModel: uid = GBTClassifier_3bea227d58d4, numTrees=30, numClasses=2, numFeatures=4
Gradient Boosted Accuracy: 0.9922825767164476


In [10]:
#save predictions for gradio

#udf to expand features vector and get correct confidence
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType

vector_to_array_udf = udf(lambda vec: vec.toArray().tolist(), ArrayType(DoubleType()))

def get_confidence(probability_vector, prediction):
    return float(probability_vector[int(prediction)])
confidence_udf = udf(get_confidence, DoubleType())

In [11]:
#save nb
nb_pred_expanded = nb_cv_pred.withColumn('features', vector_to_array_udf(col('features')))
feature_size = len(nb_pred_expanded.select('features').first()['features'])
for i in range(feature_size):
    nb_pred_expanded = nb_pred_expanded.withColumn(f"feature_{i}", col('features')[i])

nb_pred_final = nb_pred_expanded.withColumn('confidence', confidence_udf(col('probability'), col('prediction'))).drop('features', 'probability', 'rawPrediction')

nb_pred_final.coalesce(1).write.csv('nb_predictions.csv', header = True, mode = 'overwrite')

                                                                                

In [15]:
#save rf
rf_pred_expanded = rf_cv_pred.withColumn('features', vector_to_array_udf(col('features')))
feature_size = len(rf_pred_expanded.select('features').first()['features'])
for i in range(feature_size):
    rf_pred_expanded = rf_pred_expanded.withColumn(f"feature_{i}", col('features')[i])

rf_pred_final = rf_pred_expanded.withColumn('confidence', confidence_udf(col('probability'), col('prediction'))).drop('features', 'probability', 'rawPrediction')

rf_pred_final.coalesce(1).write.csv('rf_predictions.csv', header = True, mode = 'overwrite')

25/04/16 20:15:19 WARN DAGScheduler: Broadcasting large task binary with size 1415.3 KiB
                                                                                

In [18]:
#save lr
lr_pred_expanded = lr_cv_pred.withColumn('features', vector_to_array_udf(col('features')))
feature_size = len(lr_pred_expanded.select('features').first()['features'])
for i in range(feature_size):
    lr_pred_expanded = lr_pred_expanded.withColumn(f"feature_{i}", col('features')[i])

lr_pred_final = lr_pred_expanded.withColumn('confidence', confidence_udf(col('probability'), col('prediction'))).drop('features', 'probability', 'rawPrediction')

lr_pred_final.coalesce(1).write.csv('lr_predictions.csv', header = True, mode = 'overwrite')

                                                                                

In [21]:
#save gb
gb_pred_expanded = gb_cv_pred.withColumn('features', vector_to_array_udf(col('features')))
feature_size = len(gb_pred_expanded.select('features').first()['features'])
for i in range(feature_size):
    gb_pred_expanded = gb_pred_expanded.withColumn(f"feature_{i}", col('features')[i])

gb_pred_final = gb_pred_expanded.withColumn('confidence', confidence_udf(col('probability'), col('prediction'))).drop('features', 'probability', 'rawPrediction')

gb_pred_final.coalesce(1).write.csv('gb_predictions.csv', header=True, mode='overwrite')

                                                                                

In [12]:
#all resulting csv files manually renamed and moved to root project folder for easier use

In [1]:
import pandas as pd
gb_pred_df = pd.read_csv('gb_predictions.csv')
lr_pred_df = pd.read_csv('lr_predictions.csv')
nb_pred_df = pd.read_csv('nb_predictions.csv')
rf_pred_df = pd.read_csv('rf_predictions.csv')

In [2]:
gb_pred_df.head()

Unnamed: 0,label,prediction,feature_0,feature_1,feature_2,feature_3,confidence
0,0,0.0,1.08,95035.0,1354386000.0,5.0,0.978845
1,0,0.0,1.08,165556.0,1347217000.0,7.0,0.980035
2,0,0.0,1.15,653.0,1363595000.0,5.0,0.973805
3,0,0.0,1.17,2836.0,1354976000.0,3.0,0.975929
4,0,0.0,1.18,190249.0,1336133000.0,3.0,0.975929


In [3]:
new_cols = ['true', 'prediction', 'amt', 'city_pop', 'unix_time', 'category_index', 'confidence']
gb_pred_df.columns = new_cols
lr_pred_df.columns = new_cols
nb_pred_df.columns = new_cols
rf_pred_df.columns = new_cols

In [4]:
join_cols = ['true', 'amt', 'city_pop', 'unix_time', 'category_index']

gb_trimmed = gb_pred_df[join_cols + ['prediction', 'confidence']].rename(columns = {
    'prediction': 'prediction_gb',
    'confidence': 'confidence_gb'
})

lr_trimmed = lr_pred_df[join_cols + ['prediction', 'confidence']].rename(columns = {
    'prediction': 'prediction_lr',
    'confidence': 'confidence_lr'
})

nb_trimmed = nb_pred_df[join_cols + ['prediction', 'confidence']].rename(columns = {
    'prediction': 'prediction_nb',
    'confidence': 'confidence_nb'
})

rf_trimmed = rf_pred_df[join_cols + ['prediction', 'confidence']].rename(columns = {
    'prediction': 'prediction_rf',
    'confidence': 'confidence_rf'
})

pred_df = gb_trimmed.merge(lr_trimmed, on = join_cols)
pred_df = pred_df.merge(nb_trimmed, on = join_cols)
pred_df = pred_df.merge(rf_trimmed, on = join_cols)

In [5]:
pred_df.to_pickle('combined_predictions.pkl')

In [7]:
pred_df.head()

Unnamed: 0,true,amt,city_pop,unix_time,category_index,prediction_gb,confidence_gb,prediction_lr,confidence_lr,prediction_nb,confidence_nb,prediction_rf,confidence_rf
0,0,1.08,95035.0,1354386000.0,5.0,0.0,0.978845,0.0,0.684297,0.0,1.0,0.0,0.993297
1,0,1.08,165556.0,1347217000.0,7.0,0.0,0.980035,0.0,0.699808,1.0,1.0,0.0,0.970565
2,0,1.15,653.0,1363595000.0,5.0,0.0,0.973805,0.0,0.690551,0.0,1.0,0.0,0.98056
3,0,1.17,2836.0,1354976000.0,3.0,0.0,0.975929,0.0,0.66421,0.0,1.0,0.0,0.98681
4,0,1.18,190249.0,1336133000.0,3.0,0.0,0.975929,0.0,0.650739,1.0,1.0,0.0,0.98447
