In [1]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml.feature import Word2Vec, CountVectorizer, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import numpy as np

# 1. Read Data

In [2]:
# Read Data to pyspark dataframe
# Since we want to further investigate precision and recall of negative reviews,
# we change the label 0 to 1 and label 4 to 0
PATH = 'gs://bucket-ljy/data.csv'
data = spark.read.format('com.databricks.spark.csv').options(header='false', inferschema='true').load(PATH)
data = data.selectExpr("_c0 as label", "_c1 as id", "_c2 as date", "_c3 as flag", "_c4 as user", "_c5 as text")\
            .select(['text', 'label']).replace(0, 1).replace(4, 0)

                                                                                

In [15]:
data.printSchema()

root
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



In [4]:
data.show(5)

+--------------------+-----+
|                text|label|
+--------------------+-----+
|@switchfoot http:...|    1|
|is upset that he ...|    1|
|@Kenichan I dived...|    1|
|my whole body fee...|    1|
|@nationwideclass ...|    1|
+--------------------+-----+
only showing top 5 rows



In [3]:
train_data, test_data = data.randomSplit([0.7, 0.3], seed = 100)
train_data.cache()
test_data.cache()

DataFrame[text: string, label: int]

In [5]:
train_data.groupBy('label').count().show()

+-----+------+
|label| count|
+-----+------+
|    1|559308|
|    0|559842|
+-----+------+



In [6]:
test_data.groupBy('label').count().show()

+-----+------+
|label| count|
+-----+------+
|    1|240692|
|    0|240158|
+-----+------+



In [68]:
train_data.show()

[Stage 153:>                                                        (0 + 1) / 1]

+--------------------+-----+
|                text|label|
+--------------------+-----+
|                 ...|    1|
|                 ...|    1|
|                 ...|    1|
|               I ...|    1|
|               ju...|    1|
|             i ju...|    1|
|            Miss ...|    1|
|          .. Omga...|    1|
|           FUCK YOU!|    1|
|         or i jus...|    1|
|        my head f...|    1|
|       FS keeps c...|    1|
|       Orlando Lo...|    1|
|       Sunny Agai...|    1|
|      My current ...|    1|
|      i think i'm...|    1|
|      rinitis suc...|    1|
|      this weeken...|    1|
|            #canucks|    1|
|     &gt;( &gt;( ...|    1|
+--------------------+-----+
only showing top 20 rows



                                                                                

# 2. Feature Engineering

In [4]:
# preprocess data
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

# three different feature engineering methods

# word2vec
word2Vec = Word2Vec(vectorSize=100, minCount=2, inputCol="filtered_words", outputCol="features")
# count vectorizer
count_vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="features")
# TF-IDF
hashing_tf = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="features")

# 3. Models

In [5]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier

In [6]:
# Six different models for binary classification
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)
svm = LinearSVC(featuresCol="features", labelCol="label", maxIter=10)
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=50, maxDepth=6,minInstancesPerNode=2)
# gbm = GBTClassifier(featuresCol="features", labelCol="label", maxIter=10)

# 4. Training and Evaluation

In [7]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [8]:
def train(train_data, pipelineFit, model):
    
    # feature engineering
    processed_data = pipelineFit.transform(train_data)
    
    # Fit the model with processed text
    model = model.fit(processed_data)
    
    return model

In [9]:
def evaluate(test_data, pipeline_feat, model):
    
    # feature engineering
    processed_data = pipeline_feat.transform(test_data)
    # make prediction
    predictions = model.transform(processed_data)
    
    # evaluators
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
    evaluator_auc = BinaryClassificationEvaluator()
    
    # accuracy
    evaluator.setMetricName('accuracy')
    accuracy = evaluator.evaluate(predictions)
    # precision
    evaluator.setMetricName('precisionByLabel')
    precision = evaluator.evaluate(predictions)
    # recall
    evaluator.setMetricName('recallByLabel')
    recall = evaluator.evaluate(predictions)
    # auc
    auc = evaluator_auc.evaluate(predictions)
    
    return [accuracy, precision, recall, auc]

In [10]:
def crossvalidation(train_data, test_data, model, paramGrid, evaluator, pipelineFit):
    processed_train = pipelineFit.transform(train_data)
    processed_test = pipelineFit.transform(test_data)
    #evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="accuracy")
    
    cv = CrossValidator(estimator=model,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)
    cvModel = cv.fit(processed_train)
    
    predictions = cvModel.transform(processed_test)
    
    # average metric(use accuracy)
    print(cvModel.avgMetrics)
    # parameter map
    print(cvModel.getEstimatorParamMaps())
    # parameter with highest average metric
    print(cvModel.getEstimatorParamMaps()[np.argmax(cvModel.avgMetrics)])
    
    
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
    evaluator_auc = BinaryClassificationEvaluator()
    
    # accuracy
    
    evaluator.setMetricName('accuracy')
    accuracy = evaluator.evaluate(predictions)
    # precision
    evaluator.setMetricName('precisionByLabel')
    precision = evaluator.evaluate(predictions)
    # recall
    evaluator.setMetricName('recallByLabel')
    recall = evaluator.evaluate(predictions)
    # auc
    auc = evaluator_auc.evaluate(predictions)
    
    return [accuracy, precision, recall, auc]

In [11]:
cv_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="accuracy")

In [12]:
nb_model = NaiveBayes(modelType="multinomial")
svm_model = LinearSVC(featuresCol="features", labelCol="label")
lr_model = LogisticRegression(featuresCol="features", labelCol="label")
dt_model = DecisionTreeClassifier(featuresCol="features", labelCol="label")

In [16]:
nb_grid = ParamGridBuilder() \
    .addGrid(nb_model.smoothing, [0.0, 0.5, 1.0, 2.0, 3.0, 5.0]) \
    .build()
svm_grid = ParamGridBuilder() \
    .addGrid(svm_model.maxIter, [5,10,20])\
    .addGrid(svm_model.regParam, [0.0, 0.1, 0.2])\
    .build()
# .addGrid(svm_model.regParam, [0.1, 0.3, 0.5])\

lr_grid = (ParamGridBuilder()
#          .addGrid(lr_model.regParam, [0.0, 0.1, 0.2]) # regularization parameter
           .addGrid(lr_model.elasticNetParam, [0.0, 0.1, 0.5])
           .addGrid(lr.maxIter, [10, 20, 50])
           .build())
#             .addGrid(lr_model.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#             .addGrid(lr.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features


In [17]:
metrics = ['accuracy', 'precision', 'recall', 'auc']

## 4.1 Count Vectorizer

In [23]:
feature_cv = [count_vectorizer]
# define pipeline that does feature engineering
pipeline_feat = Pipeline(stages=[tokenizer, remover]+feature_cv)
    
# Fit the pipeline to training text
pipeline_cv = pipeline_feat.fit(train_data)

                                                                                

### 4.1.1 Naive Bayes

In [17]:
model_nb = train(train_data, pipeline_cv, nb)
res = evaluate(test_data, pipeline_cv, model_nb)
for metric, score in zip(metrics, res):
    print(metric, score)

23/04/16 21:50:50 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/16 21:51:12 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/16 21:51:14 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 7.0 MiB
23/04/16 21:51:29 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 7.0 MiB
23/04/16 21:51:37 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 7.0 MiB
23/04/16 21:51:45 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 7.0 MiB
[Stage 25:>                                                         (0 + 2) / 2]

accuracy 0.7706577993349502
precision 0.7828976864648399
recall 0.7492283147059681
auc 0.5030065611725745


                                                                                

In [18]:
# cross validation
res = crossvalidation(train_data, test_data, nb_model, nb_grid, cv_evaluator, pipeline_cv)
for metric, score in zip(metrics, res):
    print(metric, score)

23/04/16 21:52:00 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/16 21:52:20 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/16 21:52:21 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 7.0 MiB
23/04/16 21:52:35 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/16 21:52:37 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/16 21:52:38 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 7.0 MiB
23/04/16 21:52:39 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/16 21:52:41 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/16 21:52:42 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary w

[0.7243932314077098, 0.7648518420823351, 0.7687884132408596, 0.770805248242312, 0.7710811446992919, 0.7704748621389808]
[{Param(parent='NaiveBayes_dec1da326641', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 0.0}, {Param(parent='NaiveBayes_dec1da326641', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 0.5}, {Param(parent='NaiveBayes_dec1da326641', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 1.0}, {Param(parent='NaiveBayes_dec1da326641', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 2.0}, {Param(parent='NaiveBayes_dec1da326641', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 3.0}, {Param(parent='NaiveBayes_dec1da326641', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 5.0}]
{Param(parent='NaiveBayes_dec1da326641', name='smoothing', doc='The smoothing parameter, should be >= 0, 

23/04/16 21:56:35 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 7.0 MiB
23/04/16 21:56:42 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 7.0 MiB
23/04/16 21:56:48 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 7.0 MiB
23/04/16 21:56:55 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 7.0 MiB
                                                                                

accuracy 0.772341258927753
precision 0.7883819074203918
recall 0.7447294206840819
auc 0.5016353180349614


                                                                                

### 4.1.2 SVM

In [61]:
model_svm = train(train_data, pipeline_cv, svm)
res = evaluate(test_data, pipeline_cv, model_svm)
for metric, score in zip(metrics, res):
    print(metric, score)

23/04/17 10:11:22 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 10:11:33 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 10:11:44 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 10:11:44 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 10:11:45 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 10:11:45 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 10:11:46 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 10:11:46 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 10:11:47 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary w

accuracy 0.7673846371811219
precision 0.7675896675029799
recall 0.7672155595082876
auc 0.8386128521695796


                                                                                

In [None]:
# cross validation
res = crossvalidation(train_data, test_data, svm_model, svm_grid, cv_evaluator, pipeline_cv)
for metric, score in zip(metrics, res):
    print(metric, score)

23/04/17 10:12:20 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 10:12:34 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 10:12:35 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 10:12:35 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 10:12:36 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 10:12:36 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 10:12:37 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 10:12:37 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 10:12:37 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary w

[0.7568825524333986, 0.7552343956032055, 0.7547227189969574, 0.7582795097839823, 0.7561861119401861, 0.7561119924053147, 0.7585768213130752, 0.7578092564961432, 0.7570430958556912]
[{Param(parent='LinearSVC_27e283c066f1', name='maxIter', doc='max number of iterations (>= 0).'): 5, Param(parent='LinearSVC_27e283c066f1', name='regParam', doc='regularization parameter (>= 0).'): 0.0}, {Param(parent='LinearSVC_27e283c066f1', name='maxIter', doc='max number of iterations (>= 0).'): 5, Param(parent='LinearSVC_27e283c066f1', name='regParam', doc='regularization parameter (>= 0).'): 0.1}, {Param(parent='LinearSVC_27e283c066f1', name='maxIter', doc='max number of iterations (>= 0).'): 5, Param(parent='LinearSVC_27e283c066f1', name='regParam', doc='regularization parameter (>= 0).'): 0.2}, {Param(parent='LinearSVC_27e283c066f1', name='maxIter', doc='max number of iterations (>= 0).'): 10, Param(parent='LinearSVC_27e283c066f1', name='regParam', doc='regularization parameter (>= 0).'): 0.0}, {Para

23/04/17 10:25:02 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.0 MiB
23/04/17 10:25:07 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.0 MiB
23/04/17 10:25:12 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.0 MiB
23/04/17 10:25:18 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.0 MiB
                                                                                

accuracy 0.7668325124384329
precision 0.7674286596741948
recall 0.7659325415835274
auc 0.8377534582636721


                                                                                

### 4.1.3 Logistic Regression

In [73]:
model_lr = train(train_data, pipeline_cv, lr)
res = evaluate(test_data, pipeline_cv, model_lr)
for metric, score in zip(metrics, res):
    print(metric, score)

23/04/17 12:19:05 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 12:19:15 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 12:19:27 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 12:19:28 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 12:19:28 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 12:19:29 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 12:19:29 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 12:19:30 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 12:19:30 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary w

accuracy 0.749368702652699
precision 0.7467763404613058
recall 0.7548644291611646
auc 0.8089913684604183


In [None]:
# cross validation
res = crossvalidation(train_data, test_data, lr_model, lr_grid, cv_evaluator, pipeline_cv)
for metric, score in zip(metrics, res):
    print(metric, score)

23/04/17 11:29:14 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 11:29:28 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 11:29:30 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 11:29:30 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 11:29:31 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 11:29:31 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 11:29:32 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 11:29:32 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/17 11:29:33 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary w

[0.7185075099456844, 0.7185075099456844, 0.7185075099456844, 0.7185075099456844, 0.7185075099456844, 0.7185075099456844, 0.7185075099456844, 0.7185075099456844, 0.7185075099456844]
[{Param(parent='LogisticRegression_fae272cd68a7', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_b81f07f04667', name='maxIter', doc='max number of iterations (>= 0).'): 10}, {Param(parent='LogisticRegression_fae272cd68a7', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_b81f07f04667', name='maxIter', doc='max number of iterations (>= 0).'): 20}, {Param(parent='LogisticRegression_fae272cd68a7', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 

23/04/17 12:09:16 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.0 MiB
23/04/17 12:09:21 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.0 MiB
23/04/17 12:09:27 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.0 MiB
23/04/17 12:09:32 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.0 MiB
                                                                                

accuracy 0.7410826645331733
precision 0.7384195960860541
recall 0.7469247143410578
auc 0.7780068951615077


### 4.1.4 Decision Tree

In [None]:
model_dt = train(train_data, pipeline_cv, dt)
res = evaluate(test_data, pipeline_cv, model_dt)
for metric, score in zip(metrics, res):
    print(metric, score)

23/04/25 16:04:02 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/25 16:04:10 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/25 16:04:31 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.5 MiB
23/04/25 16:06:15 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1034.3 KiB
23/04/25 16:06:16 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 6.5 MiB
23/04/25 16:12:58 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 6.5 MiB
23/04/25 16:36:05 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 6.5 MiB
23/04/25 17:00:08 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 6.5 MiB
23/04/25 17:15:50 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binar

accuracy 0.5484411666107945
precision 0.7544827586206897
recall 0.14351932831904846
auc 0.5244815822254475


### 4.1.5 Random Forest

In [None]:
model_rf = train(train_data, pipeline_cv, rf)
res = evaluate(test_data, pipeline_cv, model_rf)
for metric, score in zip(metrics, res):
    print(metric, score)

23/04/25 18:35:18 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/25 18:35:20 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/04/25 18:35:35 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.5 MiB
23/04/25 18:37:11 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1034.3 KiB
23/04/25 18:37:12 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 6.6 MiB
23/04/25 19:29:34 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 6.7 MiB
23/04/25 19:44:48 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 6.8 MiB
23/04/25 20:00:16 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 6.9 MiB
23/04/25 20:15:57 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binar

KeyboardInterrupt: 

## 4.2 TF-IDF

In [20]:
feature_tfidf = [hashing_tf, idf]
# define pipeline that does feature engineering
pipeline_feat = Pipeline(stages=[tokenizer, remover]+feature_tfidf)
    
# Fit the pipeline to training text
pipeline_tfidf = pipeline_feat.fit(train_data)

                                                                                

### 4.2.1 Naive Bayes

In [20]:
model_nb = train(train_data, pipeline_tfidf, NaiveBayes(smoothing=2.0, modelType="multinomial"))
res = evaluate(test_data, pipeline_tfidf, model_nb)
for metric, score in zip(metrics, res):
    print(metric, score)

                                                                                

accuracy 0.725648173613021
precision 0.7248361131856278
recall 0.7277377644662354
auc 0.5162181392990665


                                                                                

In [58]:
# cross validation
res = crossvalidation(train_data, test_data, nb_model, nb_grid, cv_evaluator, pipeline_tfidf)
for metric, score in zip(metrics, res):
    print(metric, score)

                                                                                

[0.7235561146479446, 0.7235561235626111, 0.7235614779567637, 0.7235757669304754, 0.7235704361727662, 0.7235998786096178]
[{Param(parent='NaiveBayes_b04f27b8850e', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 0.0}, {Param(parent='NaiveBayes_b04f27b8850e', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 0.5}, {Param(parent='NaiveBayes_b04f27b8850e', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 1.0}, {Param(parent='NaiveBayes_b04f27b8850e', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 2.0}, {Param(parent='NaiveBayes_b04f27b8850e', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 3.0}, {Param(parent='NaiveBayes_b04f27b8850e', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 5.0}]
{Param(parent='NaiveBayes_b04f27b8850e', name='smoothing', doc='The smoothing parameter, should be >= 0,

                                                                                

accuracy 0.7256856764257319
precision 0.7249275819423809
recall 0.7276544516139782
auc 0.5162080762348076


### 4.2.2 SVM

In [18]:
model_svm = train(train_data, pipeline_tfidf, svm)
res = evaluate(test_data, pipeline_tfidf, model_svm)
for metric, score in zip(metrics, res):
    print(metric, score)

                                                                                

accuracy 0.738276204048637
precision 0.7321666943922617
recall 0.7516985407753927
auc 0.8107737877241844


                                                                                

In [None]:
# cross validation
res = crossvalidation(train_data, test_data, svm_model, svm_grid, cv_evaluator, pipeline_tfidf)
for metric, score in zip(metrics, res):
    print(metric, score)

                                                                                

[0.7350326692746728, 0.7348291076077214, 0.7346505204953356, 0.7363621699088638, 0.7360717010200827, 0.735457429592322, 0.7361673100101628, 0.7355629429348551, 0.7352718474494369]
[{Param(parent='LinearSVC_27e283c066f1', name='maxIter', doc='max number of iterations (>= 0).'): 5, Param(parent='LinearSVC_27e283c066f1', name='regParam', doc='regularization parameter (>= 0).'): 0.0}, {Param(parent='LinearSVC_27e283c066f1', name='maxIter', doc='max number of iterations (>= 0).'): 5, Param(parent='LinearSVC_27e283c066f1', name='regParam', doc='regularization parameter (>= 0).'): 0.1}, {Param(parent='LinearSVC_27e283c066f1', name='maxIter', doc='max number of iterations (>= 0).'): 5, Param(parent='LinearSVC_27e283c066f1', name='regParam', doc='regularization parameter (>= 0).'): 0.2}, {Param(parent='LinearSVC_27e283c066f1', name='maxIter', doc='max number of iterations (>= 0).'): 10, Param(parent='LinearSVC_27e283c066f1', name='regParam', doc='regularization parameter (>= 0).'): 0.0}, {Param

                                                                                

accuracy 0.738276204048637
precision 0.7321666943922617
recall 0.7516985407753927
auc 0.8107732264680664


                                                                                

### 4.2.3 Logistic Regression

In [74]:
model_lr = train(train_data, pipeline_tfidf, lr)
res = evaluate(test_data, pipeline_tfidf, model_lr)
for metric, score in zip(metrics, res):
    print(metric, score)

                                                                                

accuracy 0.7381699460792893
precision 0.7302517693094258
recall 0.7556309074019304
auc 0.8124305440290557


                                                                                

In [None]:
# cross validation
res = crossvalidation(train_data, test_data, lr_model, lr_grid, cv_evaluator, pipeline_tfidf)
for metric, score in zip(metrics, res):
    print(metric, score)

                                                                                

[0.7368459249301867, 0.7368459249301867, 0.7368459249301867, 0.7368459249301867, 0.7368459249301867, 0.7368459249301867, 0.7368459249301867, 0.7368459249301867, 0.7368459249301867]
[{Param(parent='LogisticRegression_fae272cd68a7', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_b81f07f04667', name='maxIter', doc='max number of iterations (>= 0).'): 10}, {Param(parent='LogisticRegression_fae272cd68a7', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_b81f07f04667', name='maxIter', doc='max number of iterations (>= 0).'): 20}, {Param(parent='LogisticRegression_fae272cd68a7', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 

                                                                                

accuracy 0.7381407772249585
precision 0.7300772449308014
recall 0.7559308336700561
auc 0.8124262914202638


                                                                                

### 4.2.4 Decision Tree

In [21]:
model_dt = train(train_data, pipeline_tfidf, dt)
res = evaluate(test_data, pipeline_tfidf, model_dt)
for metric, score in zip(metrics, res):
    print(metric, score)

                                                                                

accuracy 0.5481621528204338
precision 0.7498324577901723
recall 0.1444522185295318
auc 0.5242643112330101


[Stage 23:>                                                         (0 + 2) / 2]

### 4.2.5 Random Forest

In [22]:
model_rf = train(train_data, pipeline_tfidf, rf)
res = evaluate(test_data, pipeline_tfidf, model_rf)
for metric, score in zip(metrics, res):
    print(metric, score)

                                                                                

accuracy 0.6658726572732024
precision 0.6330232325088103
recall 0.7892292827573569
auc 0.7340144358999441


[Stage 23:>                                                         (0 + 2) / 2]