In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext("local[*]")
spark = SparkSession.builder.getOrCreate()
import pandas as pd  

23/02/26 16:36:02 WARN Utils: Your hostname, ubuntu-hadoop resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/02/26 16:36:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/26 16:36:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml import Pipeline
#evaluation
from pyspark.mllib.evaluation import MultilabelMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [5]:
def m_metrics_l(ml_model,test_data):
    predictions = ml_model.transform(test_data).cache()
    predictionAndLabels = predictions.select("label","prediction").rdd.map(lambda x: (float(x[0]), float(x[1]))).cache()
    # Print some predictions vs labels
    # print(predictionAndLabels.take(10))
    metrics = MulticlassMetrics(predictionAndLabels)
    # Overall statistics
    precision = metrics.precision(1.0)
    recall = metrics.recall(1.0)
    f1Score = metrics.fMeasure(1.0)
    print(f"Precision = {precision:.4f} Recall = {recall:.4f} F1 Score = {f1Score:.4f}")
    print("Confusion matrix \n", metrics.confusionMatrix().toArray().astype(int))
    return precision, recall, f1Score, metrics


In [6]:
# read the PySpark DataFrame from the Parquet file
df_training_m1 = spark.read.parquet("data/transformed_training.parquet")
df_testing_m1 = spark.read.parquet("data/transformed_test.parquet")
df_val_m1 = spark.read.parquet("data/transformed_val.parquet")
df_training_m1.cache()
df_testing_m1.cache()
df_val_m1.cache()
print(df_testing_m1.show(5))

                                                                                

+--------------------+-------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|emotion|label|              text_c|               words|            filtered|         rawFeatures|         featuresIDF|
+--------------------+-------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|im feeling rather...|sadness|    1|  feel rather rot...|[, , feel, rather...|[, , feel, rather...|(1000,[0,1,42,102...|(1000,[0,1,42,102...|
|im updating my bl...|sadness|    1|  update   blog b...|[, , update, , , ...|[, , update, , , ...|(1000,[0,1,117,34...|(1000,[0,1,117,34...|
|i never make her ...|sadness|    1|  never make she ...|[, , never, make,...|[, , never, make,...|(1000,[0,1,2,4,9,...|(1000,[0,1,2,4,9,...|
|i left with my bo...|    joy|    0|  leave with   bo...|[, , leave, with,...|[, , leave, , , b...|(1000,[0,1,39,249...|(1000,[0,1,39,249...|
|i was

Linear SVM - OneVSrest

In [6]:
import time
def liniearSVMMaker(df, df_val):
    classifier = LinearSVC(maxIter=10, regParam=0.1, featuresCol = "featuresIDF", weightCol="weight", labelCol="label")
    # Define OneVsRest strategy
    ovr = OneVsRest(classifier=classifier, labelCol="label", featuresCol="featuresIDF", weightCol="weight")
    start = time.time()
    pipeline = Pipeline(stages=[ovr])
    model = pipeline.fit(df)
    training_time = time.time()-start
    precision_svm, recall_svm , f1Score_svm,  metrics = m_metrics_l(model,df_val)
    return precision_svm, recall_svm, f1Score_svm, metrics, training_time

In [7]:
precision_svm, recall_svm, f1Score_svm, metrics, training_time_svm = liniearSVMMaker(df= df_training_m1, df_val=df_val_m1)

[Stage 8:>                                                          (0 + 1) / 1]

23/02/26 15:32:29 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/02/26 15:32:29 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

23/02/26 15:32:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/02/26 15:32:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


[Stage 164:>                                                        (0 + 1) / 1]

Precision = 0.8836 Recall = 0.8556 F1 Score = 0.8694
Confusion matrix 
 [[671  47  43  30  79  23]
 [ 22 486  25  19   6  10]
 [  6   8 205   6   3   2]
 [  3   7   2 157   1  13]
 [  2   2   0   0  88   0]
 [  0   0   0   0   1  33]]


                                                                                

In [8]:
training_time_svm

42.2072913646698

In [9]:
print(type(training_time_svm))
print(type(precision_svm))

<class 'float'>
<class 'float'>


In [10]:
results_df = spark.createDataFrame([("countVectorizer+iDF",
                                     "svm", 
                                    round(training_time_svm,2),
                                    round(precision_svm,2), 
                                     round(recall_svm,2), 
                                     round(f1Score_svm,2))], 
                                     ["dataModel", "modelName", "trainingTime", "precision","recall", "f1Score"])

results_df.show()

+-------------------+---------+------------+---------+------+-------+
|          dataModel|modelName|trainingTime|precision|recall|f1Score|
+-------------------+---------+------------+---------+------+-------+
|countVectorizer+iDF|      svm|       42.21|     0.88|  0.86|   0.87|
+-------------------+---------+------------+---------+------+-------+



                                                                                

In [11]:
from pyspark.ml.classification import LogisticRegression
def logRegmaker(df, df_val):
    cassifier = LogisticRegression(maxIter=10, regParam=0.1, featuresCol = "featuresIDF", weightCol="weight")
    start = time.time()
    pipeline = Pipeline(stages=[cassifier])
    print(f"Training started.")
    model = pipeline.fit(df)
    training_time = time.time()-start
    precision, recall , f1Score,  metrics = m_metrics_l(model,df_val)
    return precision, recall, f1Score, metrics, training_time
    

In [12]:
precision_lr, recall_lr , f1Score_lr , metrics_lr, training_time_lr = logRegmaker(df= df_training_m1, df_val=df_val_m1)

Training started.


                                                                                

Precision = 0.8964 Recall = 0.8016 F1 Score = 0.8464
Confusion matrix 
 [[678  47  47  32  74  31]
 [ 21 493  39  28  18  16]
 [  3   4 188   6   4   1]
 [  1   4   1 146   1  10]
 [  1   2   0   0  80   0]
 [  0   0   0   0   1  23]]


In [13]:
df= results_df.toPandas()

In [14]:
df.loc[len(df.index)]= ["countVectorizer+iDF",
                                     "lr", training_time_lr, precision_lr, recall_lr, f1Score_lr]

In [15]:
df

Unnamed: 0,dataModel,modelName,trainingTime,precision,recall,f1Score
0,countVectorizer+iDF,svm,42.21,0.88,0.86,0.87
1,countVectorizer+iDF,lr,3.403402,0.896364,0.801626,0.846352


In [16]:
from pyspark.ml.classification import NaiveBayes
def nBMaker(df, df_val):
    nb = NaiveBayes(smoothing=1.0, modelType="multinomial", featuresCol = "featuresIDF", weightCol="weight")
    # nb_model = nb.fit(df_training_m1)
    start = time.time()
    pipeline = Pipeline(stages=[nb])
    print(f"Training started.")
    model = pipeline.fit(df)
    training_time = time.time()-start
    precision, recall , f1Score,  metrics = m_metrics_l(model,df_val)
    return precision, recall, f1Score, metrics, training_time
    

In [17]:
precision_nb, recall_nb, f1Score_nb, metrics_nb, training_time_nb = nBMaker(df=df_training_m1, df_val=df_val_m1)

Training started.




Precision = 0.8527 Recall = 0.8257 F1 Score = 0.8390
Confusion matrix 
 [[586  23  21   6  30   7]
 [ 34 469  29  17  11   8]
 [ 14  15 209   8   5   2]
 [ 23  16  10 169   3  13]
 [ 35  16   3   4 128   0]
 [ 12  11   3   8   1  51]]


In [18]:
df.loc[len(df.index)]= ["countVectorizer+iDF",
                                     "nb", training_time_nb, precision_nb, recall_nb, f1Score_nb]

In [19]:
df

Unnamed: 0,dataModel,modelName,trainingTime,precision,recall,f1Score
0,countVectorizer+iDF,svm,42.21,0.88,0.86,0.87
1,countVectorizer+iDF,lr,3.403402,0.896364,0.801626,0.846352
2,countVectorizer+iDF,nb,2.511171,0.852727,0.825704,0.838998


In [None]:
# # import time
#     classifier = LinearSVC(maxIter=10, regParam=0.1, featuresCol = "featuresIDF", weightCol="weight", labelCol="label")
#     # Define OneVsRest strategy
#     ovr = OneVsRest(classifier=classifier, labelCol="label", featuresCol="featuresIDF", weightCol="weight")
#     pipeline = Pipeline(stages=[ovr])
#     model = pipeline.fit(df_training_m1)
#     m_metrics_l(model,df_val_m1)

#     evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="f1")

#     # Define the hyperparameter grid
#     param_grid = ParamGridBuilder() \
#      .addGrid(classifier.maxIter, [10, 50, 100]) \
#      .addGrid(classifier.regParam,  [0.01, 0.1, 1.0])\
#         .build()
    
#     # Define the cross-validator
#     cv = CrossValidator(estimator=pipeline, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

#     # Fit the cross-validator to the training set
#     cv_model = cv.fit(df_training_m1)

#     # Evaluate the best model on the validation set
#     best_model = cv_model.bestModel
#     predictions = best_model.transform(df_testing_m1)
#     f1 = evaluator.evaluate(predictions)
#     print("f1 on validation set for best model:", f1)

data cleaning model 2

In [9]:
df_training_m2 = spark.read.parquet("data/transformed_training_p2.parquet")
df_testing_m2 = spark.read.parquet("data/transformed_test_p2.parquet")
df_val_m2 = spark.read.parquet("data/transformed_val_p2.parquet")
df_training_m2.cache()
df_testing_m2.cache()
df_val_m2.cache()
print(df_testing_m2.label)

23/02/26 17:17:23 WARN CacheManager: Asked to cache already cached data.
23/02/26 17:17:23 WARN CacheManager: Asked to cache already cached data.
23/02/26 17:17:23 WARN CacheManager: Asked to cache already cached data.
Column<'label'>


In [12]:
classifier = LinearSVC(maxIter=10, regParam=0.1, featuresCol = "features", weightCol="weight", labelCol="label")
# Define OneVsRest strategy
ovr = OneVsRest(classifier=classifier, labelCol="label", featuresCol="features", weightCol="weight")

pipeline = Pipeline(stages=[ovr])
model = pipeline.fit(df_training_m2)

# m_metrics_l(model,df_val_m2)

                                                                                

In [13]:
m_metrics_l(model,df_val_m2)

                                                                                

Precision = 0.1527 Recall = 0.5091 F1 Score = 0.2350
Confusion matrix 
 [[653 439 153  85 146  39]
 [ 20  84  22  20   5  14]
 [ 11   8  97   7   5   3]
 [ 17  16   3 100   1  13]
 [  3   2   0   0  21   0]
 [  0   1   0   0   0  12]]


(0.15272727272727274,
 0.509090909090909,
 0.234965034965035,
 <pyspark.mllib.evaluation.MulticlassMetrics at 0x7fc0b88cbf40>)