In [1]:
import time

time_start = time.time()

# loading t
train = sc.textFile("/FileStore/tables/train_ft-bfd00.txt", 2)
test = sc.textFile("/FileStore/tables/test_ft-41af5.txt", 2)
df = sc.union([train, test])

print('Time elapsed: {:.4f} seconds.'.format(time.time() - time_start))

In [2]:
df.take(2)

In [3]:
# cleaning to get label numbers and reviews 
df = df.map(lambda x: x.split('__label__')[1].split(" ", 1))
print("Size of dataset: ", df.count())
df.take(2)

In [4]:
# label column - distinct values
df.map(lambda x: x[0]).distinct().collect()

In [5]:
# convert PipelineRDD to DataFrame
df1 = df.toDF(["Label", "Review"])
df1.show(5)

In [6]:
df1.groupby("Label").count().show()

In [7]:
# random split
train_set, test_set = df1.randomSplit([0.9, 0.1], seed=5)

# debug only
size = None
if size is not None:
  train_set = train_set.limit(size)
  test_set = test_set.limit(size)
  
# first two rows of train data
train_set.show(2)

# train data: label distribution
train_set.groupby("Label").count().show()

In [8]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer, StopWordsRemover, StandardScaler 
from pyspark.ml import Pipeline

# https://spark.apache.org/docs/latest/ml-features.html
time_start = time.time()

# features
tokenizer = Tokenizer(inputCol="Review", outputCol="Tokenizer")
remover = StopWordsRemover(inputCol="Tokenizer", outputCol="StopWordsRemover")
hashtf = HashingTF(numFeatures=2**16, inputCol="StopWordsRemover", outputCol='HashingTF')
idf = IDF(inputCol='HashingTF', outputCol="IDF", minDocFreq=5)

# label
label_stringIdx = StringIndexer(inputCol = "Label", outputCol = "StringIndexer")

# pipe
pipeline = Pipeline(stages=[tokenizer, remover, hashtf, idf, label_stringIdx])
pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
test_df = pipelineFit.transform(test_set)

print('Time elapsed: {:.4f} seconds.'.format(time.time() - time_start))

train_df.show(2)

In [9]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes

# variables
featuresCol = "IDF"
labelCol = "StringIndexer"
predictionCol = "prediction"

lr = LogisticRegression(maxIter=100, regParam=0.01, featuresCol=featuresCol, labelCol=labelCol, predictionCol=predictionCol)
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", featuresCol=featuresCol, labelCol=labelCol, predictionCol=predictionCol)

models = [lr, nb]

model_results = []
for model in models:
  time_start = time.time()
  
  _fit = model.fit(train_df)
  _predictions_train = _fit.transform(train_df)
  _predictions_test = _fit.transform(test_df)
  model_results.append((_fit, _predictions_train, _predictions_test))
  
  print('Time elapsed: {:.4f} seconds.'.format(time.time() - time_start))

In [10]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

model_names = ["Logistic", "NaiveBys"]
for i, (model, predictions_train, predictions_test) in enumerate(model_results):
  time_start = time.time()
  evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol=labelCol)
  roc_auc = evaluator.evaluate(predictions_train)
  accuracy = predictions_train.filter(predictions_train.StringIndexer == predictions_train.prediction).count() / float(train_df.count())
  print("train data : model: {} roc: {:.4f} acc: {:.4f}".format(model_names[i], roc_auc, accuracy))
  print('Time elapsed: {:.4f} seconds.'.format(time.time() - time_start))
  time_start = time.time()
  roc_auc = evaluator.evaluate(predictions_test)
  accuracy = predictions_test.filter(predictions_test.StringIndexer == predictions_test.prediction).count() / float(test_df.count())
  print("test data  : model: {} roc: {:.4f} acc: {:.4f}".format(model_names[i], roc_auc, accuracy))
  print('Time elapsed: {:.4f} seconds.'.format(time.time() - time_start))

In [11]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer, StopWordsRemover, StandardScaler 
from pyspark.ml import Pipeline

# https://spark.apache.org/docs/latest/ml-features.html

time_start = time.time()

# features
tokenizer = Tokenizer(inputCol="Review", outputCol="Tokenizer")
remover = StopWordsRemover(inputCol="Tokenizer", outputCol="StopWordsRemover")
hashtf = HashingTF(numFeatures=2**16, inputCol="StopWordsRemover", outputCol='HashingTF')
idf = IDF(inputCol='HashingTF', outputCol="IDF", minDocFreq=5)
scaler = StandardScaler(inputCol="IDF", outputCol="StandardScaler", withStd=True, withMean=False)

# label
label_stringIdx = StringIndexer(inputCol = "Label", outputCol = "StringIndexer")

# pipe
pipeline = Pipeline(stages=[tokenizer, remover, hashtf, idf, scaler, label_stringIdx])
pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
test_df = pipelineFit.transform(test_set)

print('Time elapsed: {:.4f} seconds.'.format(time.time() - time_start))

train_df.show(2)

In [12]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LinearSVC

# variables
featuresCol = "StandardScaler"
labelCol = "StringIndexer"
predictionCol = "prediction"

lr = LogisticRegression(maxIter=100, regParam=0.01, featuresCol=featuresCol, labelCol=labelCol, predictionCol=predictionCol)
# svm = LinearSVC(maxIter=5, regParam=0.01, featuresCol=featuresCol, labelCol=labelCol)
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", featuresCol=featuresCol, labelCol=labelCol, predictionCol=predictionCol)

models = [lr, nb]

model_results = []
for model in models:
  time_start = time.time()
  
  _fit = model.fit(train_df)
  _predictions_train = _fit.transform(train_df)
  _predictions_test = _fit.transform(test_df)
  model_results.append((_fit, _predictions_train, _predictions_test))
  
  print('Time elapsed: {:.4f} seconds.'.format(time.time() - time_start))

In [13]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

model_names = ["Logistic", "NaiveBys"]
for i, (model, predictions_train, predictions_test) in enumerate(model_results):
  time_start = time.time()
  evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol=labelCol)
  roc_auc = evaluator.evaluate(predictions_train)
  accuracy = predictions_train.filter(predictions_train.StringIndexer == predictions_train.prediction).count() / float(train_df.count())
  print("train data : model: {} roc: {:.4f} acc: {:.4f}".format(model_names[i], roc_auc, accuracy))
  print('Time elapsed: {:.4f} seconds.'.format(time.time() - time_start))
  time_start = time.time()
  roc_auc = evaluator.evaluate(predictions_test)
  accuracy = predictions_test.filter(predictions_test.StringIndexer == predictions_test.prediction).count() / float(test_df.count())
  print("test data  : model: {} roc: {:.4f} acc: {:.4f}".format(model_names[i], roc_auc, accuracy))
  print('Time elapsed: {:.4f} seconds.'.format(time.time() - time_start))

In [14]:
from pyspark.sql.functions import *

# logistic regression as best model
best_model = model_results[0][0]

# roc curve
display(best_model, test_df.select(col("StandardScaler"), col("StandardScaler").alias("features"), col("StringIndexer")), 'ROC')

False Positive Rate,True Positive Rate,Threshold
0.0,0.0,0.9999352600302102
0.0,0.0161290322580645,0.9999352600302102
0.0,0.032258064516129,0.9998652601457332
0.0,0.0483870967741935,0.9998206422532102
0.0,0.064516129032258,0.99944852938922
0.0,0.0806451612903225,0.9994364689089832
0.0,0.0967741935483871,0.9993897791570268
0.0,0.1129032258064516,0.9992944860033892
0.0,0.1290322580645161,0.999282263137504
0.0,0.1451612903225806,0.9985250605257556


In [15]:
display(best_model, test_df.select(col("StandardScaler"), col("StandardScaler").alias("features"), col("StringIndexer")))

fitted values,residuals
-0.5903940936076832,0.6434555639580033
1.8366971962379184,0.1374423778067899
-8.620425483704837,-0.00018035095580142552
-2.54230816716807,-0.0729449320574752
-4.313898428881058,-0.0132045874974526
1.8388357815712224,-0.8628109591747265
-2.2002484940291334,-0.0997281764934244
7.255878184601409,0.0007055139966106738
-0.8480607975268599,-0.2998398076517808
-5.166274566158245,-0.0056734145129952
