# Big Data Project 20211

## Data Collection

In [1]:
#Importing librairies

import pandas as pd 
import numpy as np

# Scikit-learn library: For SVM
from sklearn import preprocessing
from sklearn.metrics import confusion_matrix
from sklearn import svm

import itertools

# Matplotlib library to plot the charts
import matplotlib.pyplot as plt
import matplotlib.mlab as mlab

# Library for the statistic data vizualisation
import seaborn

%matplotlib inline

## Exploratory Data Analysis

In [None]:
from matplotlib import pyplot as plt
%matplotlib inline
 
responses = df.groupBy('feedback').count().collect()
categories = [i[0] for i in responses]
counts = [i[1] for i in responses]
 
ind = np.array(range(len(categories)))
width = 0.35
plt.bar(ind, counts, width=width, color='r')
 
plt.ylabel('counts')
plt.title('Response distribution')
plt.xticks(ind + width/2., categories)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
 
binarize = lambda x: 'Negative' if x == 'Neutral' else x
 
udfValueToCategory = udf(binarize, StringType())
df = df.withColumn("binary_response", udfConvertResponse("feedback"))

## Missing Values

In [None]:
cols_select = ['prod_price',
               'prod_feat_1',
               'prod_feat_2',
               'cust_age',
               'prod_feat_3',
               'cust_region',
               'prod_type',
               'cust_sex',
               'cust_title',
               'feedback',
               'binary_response']
 
df = df.select(df.prod_price.cast('float'), # convert numeric cols (int or float) into a 'int' or 'float'
               df.prod_feat_1.cast('float'),
               df.prod_feat_2.cast('float'),
               df.cust_age.cast('int'),
               *cols_select[4:])
 
df = df.fillna({'cust_region': 'NA', 'cust_title': 'NA', 'prod_type': 'NA'}) # fill in 'N/A' entries for certain cols

## Categorical Features

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
 
COUNT_THRESHOLD = 150 # threshold to filter 
 
# create a temporary col "count" as counting for each value of "prod_feat_3"
prodFeat3Count = df.groupBy("prod_feat_3").count()
df = df.join(prodFeat3Count, "prod_feat_3", "inner")
 
def convertMinority(originalCol, colCount):
    if colCount > COUNT_THRESHOLD:
        return originalCol
    else:
        return 'MinorityCategory'
createNewColFromTwo = udf(convertMinority, StringType())
df = df.withColumn('prod_feat_3_reduced', createNewColFromTwo(df['prod_feat_3'], df['count']))
df = df.drop('prod_feat_3')
df = df.drop('count')

In [None]:
# one-hot encoding
column_vec_in = ['prod_feat_3_reduced', 'cust_region', 'prod_type', 'cust_sex', 'cust_title']
column_vec_out = ['prod_feat_3_reduced_catVec','cust_region_catVec', 'prod_type_catVec','cust_sex_catVec',
'cust_title_catVec']
 
indexers = [StringIndexer(inputCol=x, outputCol=x+'_tmp')
            for x in column_vec_in ]
 
encoders = [OneHotEncoder(dropLast=False, inputCol=x+"_tmp", outputCol=y)
for x,y in zip(column_vec_in, column_vec_out)]
tmp = [[i,j] for i,j in zip(indexers, encoders)]
tmp = [i for sublist in tmp for i in sublist]

In [None]:
# prepare labeled sets
cols_now = ['prod_price',
            'prod_feat_1',
            'prod_feat_2',
            'cust_age',
            'prod_feat_3_reduced_catVec',
            'cust_region_catVec',
            'prod_type_catVec',
            'cust_sex_catVec',
            'cust_title_catVec']
assembler_features = VectorAssembler(inputCols=cols_now, outputCol='features')
labelIndexer = StringIndexer(inputCol='binary_response', outputCol="label")
tmp += [assembler_features, labelIndexer]
pipeline = Pipeline(stages=tmp)

## Training and Test

In [None]:
allData = pipeline.fit(df).transform(df)
allData.cache()
trainingData, testData = allData.randomSplit([0.8,0.2], seed=0) # need to ensure same split for each time
print("Distribution of Pos and Neg in trainingData is: ", trainingData.groupBy("label").count().take(3))

## Prediction

In [None]:
rf = RF(labelCol='label', featuresCol='features',numTrees=200)
fit = rf.fit(trainingData)
transformed = fit.transform(testData)

## AUC

In [None]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
results = transformed.select(['probability', 'label'])
 
## prepare score-label set
results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]
scoreAndLabels = sc.parallelize(results_list)
 
metrics = metric(scoreAndLabels)
print("The ROC score is (@numTrees=200): ", metrics.areaUnderROC)

In [None]:
# Plot AUC
from sklearn.metrics import roc_curve, auc
 
fpr = dict()
tpr = dict()
roc_auc = dict()
 
y_test = [i[1] for i in results_list]
y_score = [i[0] for i in results_list]
 
fpr, tpr, _ = roc_curve(y_test, y_score)
roc_auc = auc(fpr, tpr)
 
%matplotlib inline
plt.figure()
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic example')
plt.legend(loc="lower right")
plt.show()

## Down Sampling

In [None]:
from numpy.random import randint
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
 
RATIO_ADJUST = 2.0 ## ratio of pos to neg in the df_subsample
 
counts = trainingData.select('binary_response').groupBy('binary_response').count().collect()
higherBound = counts[0][1]
TRESHOLD_TO_FILTER = int(RATIO_ADJUST * float(counts[1][1]) / counts[0][1] * higherBound)
 
randGen = lambda x: randint(0, higherBound) if x == 'Positive' else -1
 
udfRandGen = udf(randGen, IntegerType())
trainingData = trainingData.withColumn("randIndex", udfRandGen("binary_response"))
df_subsample = trainingData.filter(trainingData['randIndex'] < TRESHOLD_TO_FILTER)
df_subsample = df_subsample.drop('randIndex')
 
print("Distribution of Pos and Neg cases of the down-sampled training data are: \n", df_subsample.groupBy("label").count().take(3))

In [None]:
## training and prediction
rf = RF(labelCol='label', featuresCol='features',numTrees=200)
fit = rf.fit(df_subsample)
transformed = fit.transform(testData)

In [None]:
## results and evaluation
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
results = transformed.select(['probability', 'label'])
 
results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]
scoreAndLabels = sc.parallelize(results_list)
 
metrics = metric(scoreAndLabels)
print("The ROC score is (@numTrees=200): ", metrics.areaUnderROC)

## Ensemble of Down-samplings

In [None]:
from numpy.random import randint
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
 
RATIO_ADJUST = 3.0 ## ratio of pos to neg in the df_subsample
TOTAL_MODELS = 10
total_results = None
final_result = None
 
#counts = trainingData.select('binary_response').groupBy('binary_response').count().collect()
highestBound = counts[0][1]
TRESHOLD_TO_FILTER = int(RATIO_ADJUST * float(counts[1][1]) / counts[0][1] * highestBound)
## UDF
randGen = lambda x: randint(0, highestBound) if x == 'Positive' else -1
udfRandGen = udf(randGen, IntegerType())
 
## ensembling
for N in range(TOTAL_MODELS):
    print("Round: ", N)
    trainingDataIndexed = trainingData.withColumn("randIndex", udfRandGen("binary_response"))
    df_subsample = trainingDataIndexed.filter(trainingDataIndexed['randIndex'] < TRESHOLD_TO_FILTER).drop('randIndex')
    ## training and prediction
    rf = RF(labelCol='label', featuresCol='features',numTrees=200)
    fit = rf.fit(df_subsample)
    transformed = fit.transform(testData)
    result_pair = transformed.select(['probability', 'label'])
    result_pair = result_pair.collect()
    this_result = np.array([float(i[0][1]) for i in result_pair])
    this_result = list(this_result.argsort().argsort() / (float(len(this_result) + 1)))
 
    ## sum up all the predictions, and average to get final_result
    if total_results is None:
       total_results = this_result
    else:
       total_results = [i+j for i, j in zip(this_result, total_results)]
    final_result = [i/(N+1) for i in total_results]
 
    results_list = [(float(i), float(j[1])) for i, j in zip(final_result, result_pair)]
    scoreAndLabels = sc.parallelize(results_list)
 
    metrics = metric(scoreAndLabels)
print("The ROC score is (@numTrees=200): ", metrics.areaUnderROC)