In [0]:
import warnings
warnings.filterwarnings('ignore')
import pandas as pd


from pyspark.ml import *
from pyspark.ml.classification import *
from pyspark.ml.feature import *
from pyspark.ml.param import *
from pyspark.ml.tuning import *
from pyspark.ml.evaluation import *
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import rand 
from sklearn.metrics import classification_report
from time import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row

In [0]:
df = spark.sql("select * from default.reviews_train")

#df = spark.sql("select * from default.reviews_train_sample")

# Always a good idea to cache() a Spark dataframe
df = df.cache()

print((df.count(), len(df.columns)))

In [0]:
df.head()

In [0]:
# convert the distinct labels in the input dataset to index values
labelIndexer = StringIndexer(inputCol="label",
                             outputCol="indexedLabel").fit(df)

# tokenizer 
tokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words",
                           pattern="\W")
##'\w' remove none-word letters

df_tokenized = tokenizer.transform(df)

# remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
df_removed = remover.transform(df_tokenized)

# Convert to TF words vector
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures")
df_TF = hashingTF.transform(df_removed)

# Convert to TF*IDF words vector
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(df_TF)
df_idf = idfModel.transform(df_TF)
for features_label in df_idf.select("features", "label").take(3):
  print(features_label)

In [0]:
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window

df = df.withColumn("rank", percent_rank().over(Window.partitionBy().orderBy("unixReviewTime")))
trainingData = df.where("rank <= .8").drop("rank")
testData = df.where("rank > .8").drop("rank")

print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count:     " + str(testData.count()))

In [0]:
#lr = LogisticRegression()
#pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover,
#                                hashingTF, idfModel, lr])
  
#Create ParamGrid for Cross Validation 45000,0.09,0.1,9
#paramGrid = (ParamGridBuilder()
#             .addGrid(hashingTF.numFeatures, [100])
#             .addGrid(lr.regParam, [0.09])
#             .addGrid(lr.elasticNetParam, [0.1])
#             .addGrid(lr.maxIter, [9])
#             .build())

gbt = GBTClassifier(labelCol="indexedLabel")
pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, gbt])

paramGrid = (ParamGridBuilder()
             .addGrid(hashingTF.numFeatures, [1000]) 
             .addGrid(gbt.maxDepth, [30])
             .addGrid(gbt.minInstancesPerNode, [2])
             .build())

evaluator= BinaryClassificationEvaluator(metricName="areaUnderROC")

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=4)
    
########  Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(trainingData)

In [0]:
pred = cvModel.transform(df)

In [0]:
test_df = spark.sql("select * from default.reviews_test")
display(test_df)

In [0]:
print((test_df.count(), len(test_df.columns)))

In [0]:
########  Make predictions on on the test data
prediction = cvModel.transform(test_df)

In [0]:
from pyspark.sql.types import FloatType
secondelement=udf(lambda v:float(v[1]),FloatType())

prediction = prediction.withColumn("Predicted",  secondelement('probability'))

display(prediction.select(["id", "rawPrediction", 'probability', 'prediction', 'Predicted']))

In [0]:
def Data_modeling(train, test, pipeline, paramGrid):
    evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=4)
    
    ########  Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(train)
    
    ########  Make predictions on on the test data
    prediction = cvModel.transform(test)
    average_score = cvModel.avgMetrics
    print ('average cross-validation accuracy = {}'.format(average_score[0]))
    ######## Calculate accuracy of the prediction of the test data
    evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
    accuracy_score=evaluator.evaluate(prediction)
    # another way to calculate accuracy 
    #correct=prediction.filter(prediction['label']== prediction['prediction']).select("label","prediction")
    #accuracy_score = correct.count() / float(test.count())  
    print ('Accuracy in the test data = {}'.format(accuracy_score))
    
    ######## calculate F1 score of the prediction of the test data
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
    f1_score=evaluator.evaluate(prediction)
    print ('F1 score in the test data = {}'.format(f1_score))
    # Calculate area under ROC for the prediction of the test data
    #evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
    #ROC_score=evaluator.evaluate(prediction)
    #print 'areaUnderROC in the test data = {}'.format(ROC_score)
    
    ######## Print classification_report
    prediction_and_labels=prediction.select("label","prediction")
    y_true = []
    y_pred = []
    for x in prediction_and_labels.collect():
        xx = list(x)
        try:
            tt = int(xx[1])
            pp = int(xx[0])
            y_true.append(tt)
            y_pred.append(pp)
        except:
            continue

    target_names = ['neg 0', 'pos 1']
    print (classification_report(y_true, y_pred, target_names=target_names))
    return 

In [0]:
lr = LogisticRegression()
pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover,
                                hashingTF, idfModel, lr])
  
#Create ParamGrid for Cross Validation 45000,0.09,0.1,9
paramGrid = (ParamGridBuilder()
             .addGrid(hashingTF.numFeatures, [100])
             .addGrid(lr.regParam, [0.09])
             .addGrid(lr.elasticNetParam, [0.1])
             .addGrid(lr.maxIter, [9])
             .build())

evaluator= BinaryClassificationEvaluator(metricName="areaUnderROC")

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=4)
    
########  Run cross-validation, and choose the best set of parameters.
Data_modeling(trainingData, testData, pipeline, paramGrid)

In [0]:
display(prediction.select(["Id", "Predicted"]))

In [0]:
def grid_search(p1,p2):
    nb = NaiveBayes()
    pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, nb])
  
    #Create ParamGrid for Cross Validation
    paramGrid = (ParamGridBuilder()
                 .addGrid(hashingTF.numFeatures, [p1])
                 .addGrid(nb.smoothing, [p2])
                 .build())
    evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=4)
    
    ########  Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(trainingData)
    # average cross-validation accuracy metric/s on all folds
    average_score = cvModel.avgMetrics
    print ('average cross-validation accuracy = {}'.format(average_score[0]))
    return average_score[0]

In [0]:
nb = NaiveBayes()

# Build a pipeline
pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, nb])

# Create ParamGrid for Cross Validation 
paramGrid = (ParamGridBuilder()
             .addGrid(hashingTF.numFeatures, [40000])
             .addGrid(nb.smoothing, [1.0])
             .build())

# Execute 4-folds cross validation for hyperparameter tuning, model prediction and model evaluation.
Data_modeling(trainingData, testData, pipeline, paramGrid)

In [0]:
def grid_search(p1,p2,p3):
    # trained by a Decision Tree 
    dt = DecisionTreeClassifier(labelCol="indexedLabel",impurity="entropy")
    pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, dt])
  
    #Create ParamGrid for Cross Validation
    paramGrid = (ParamGridBuilder()
                 .addGrid(hashingTF.numFeatures, [p1])
                 .addGrid(dt.maxDepth, [p2])
                 .addGrid(dt.minInstancesPerNode, [p3])
                 .build())
    evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=4)
    
    ########  Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(trainingData)
    # average cross-validation accuracy metric/s on all folds
    average_score = cvModel.avgMetrics
    print ('average cross-validation accuracy = {}'.format(average_score[0]))
    return average_score[0]


In [0]:
dt = DecisionTreeClassifier(labelCol="indexedLabel",impurity="entropy")

# Build a pipeline
pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, dt])

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(hashingTF.numFeatures, [70000])
             .addGrid(dt.maxDepth, [25])
             .addGrid(dt.minInstancesPerNode, [4])
             .build())

# Execute 4-folds cross validation for hyperparameter tuning, model prediction and model evaluation.
Data_modeling(trainingData, testData, pipeline, paramGrid)

In [0]:
def grid_search(p1,p2,p3,p4):
    rf = RandomForestClassifier(labelCol="indexedLabel",impurity="entropy", seed=5043)
    pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, rf])
  
    #Create ParamGrid for Cross Validation
    paramGrid = (ParamGridBuilder()
                 .addGrid(hashingTF.numFeatures, [p1])
                 .addGrid(rf.numTrees, [p2])
                 .addGrid(rf.maxDepth, [p3])
                 .addGrid(rf.minInstancesPerNode, [p4])
                 .build())
    evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=4)
    
    ########  Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(trainingData)
    # average cross-validation accuracy metric/s on all folds
    average_score = cvModel.avgMetrics
    print ('average cross-validation accuracy = {}'.format(average_score[0]))
    return average_score[0]

In [0]:
rf = RandomForestClassifier(labelCol="indexedLabel",impurity="entropy", seed=5043)

# Build a pipeline
pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, rf])

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(hashingTF.numFeatures, [50000])
             .addGrid(rf.numTrees, [31])
             .addGrid(rf.maxDepth, [29])
             .addGrid(rf.minInstancesPerNode, [1])
             .build())

# Execute 4-folds cross validation for hyperparameter tuning, model prediction and model evaluation.
Data_modeling(trainingData, testData, pipeline, paramGrid)

In [0]:
def grid_search(p1,p2,p3,p4):
    gbt = GBTClassifier(labelCol="indexedLabel")
    pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, gbt])
  
    #Create ParamGrid for Cross Validation
    paramGrid = (ParamGridBuilder()
                 .addGrid(hashingTF.numFeatures, [p1])
                 .addGrid(gbt.maxDepth, [p3])
                 .addGrid(gbt.minInstancesPerNode, [p4])
                 .build())
    evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=4)
    
    ########  Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(train)
    # average cross-validation accuracy metric/s on all folds
    average_score = cvModel.avgMetrics
    print ('average cross-validation accuracy = {}'.format(average_score[0]))
    return average_score[0]

In [0]:
gbt = GBTClassifier(labelCol="indexedLabel")
# Build a pipeline
pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, gbt])
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(hashingTF.numFeatures, [1000]) 
             .addGrid(gbt.maxDepth, [30])
             .addGrid(gbt.minInstancesPerNode, [2])
             .build())
Data_modeling(trainingData, testData, pipeline, paramGrid)

In [0]:
import matplotlib.pyplot as plt
classifier_names=['Logistic_Regression', 'Naive_Bayes', 'Decision_Tree', 'Random_Forest', 'Gradient_Boosted_Tree']
time=[6.41,2.893,141,133,3179]
accuracy=[0.8385,0.8177,0.78125,0.8125,0.8229]
fig, ax = plt.subplots(nrows=1,ncols=2,figsize=(18,5), facecolor='white')
ax[0].barh(np.arange(0, 5),time)
ax[0].set_yticks(np.arange(0.5, 5.5))
ax[0].set_yticklabels(classifier_names)
ax[0].grid(color='b', linestyle='--', linewidth=1)
ax[0].set_title('Model training time')
ax[0].set_xlabel('Time (sec)')
ax[0].set_xscale('log')

ax[1].barh(range(0, len(classifier_names)),accuracy)
ax[1].set_xlim([0.75,0.85])
ax[1].set_title('Model evaluation on the test set')
ax[1].set_xlabel('Accuracy')
ax[1].set_yticklabels([])
ax[1].grid(color='b', linestyle='--', linewidth=1)
display(fig)
