# Text classification with pyspark

import warnings
warnings.filterwarnings('ignore')

In [1]:
 #load packages to initiate spark session
    
from pyspark import SparkContext


In [2]:
sc=SparkContext(master="local[2]")

25/04/11 11:39:22 WARN Utils: Your hostname, Ubuntu-Linux-YvonneMusinguzi resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/04/11 11:39:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/11 11:39:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
#launch UI 
sc

In [4]:
#create a spark session
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("NETFLIX_DESCRIPTION_CLASSIFICATION").getOrCreate()

In [14]:
# PySpark imports
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, NaiveBayes, DecisionTreeClassifier, GBTClassifier, LinearSVC
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col, lower, regexp_replace

# Scikit-learn and other libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc as sklearn_auc, confusion_matrix
import itertools


In [7]:
#load the dataset 
df = spark.read.csv('file:///home/ymusinguzi/Desktop/netflix_titles.csv', header=True, inferSchema=True)
df.show(5)

df.columns

df.printSchema()


                                                                                

+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|       director|                cast|      country|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|Kirsten Johnson|                NULL|United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|           NULL|Ama Qamata, Khosi...| South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglands|Julien Leclercq|Sami Bouajila, Tr...|         NULL|Septem

In [8]:
df.select("type","description")
df.select("type","description").show()

+-------+--------------------+
|   type|         description|
+-------+--------------------+
|  Movie|As her father nea...|
|TV Show|After crossing pa...|
|TV Show|To protect his fa...|
|TV Show|Feuds, flirtation...|
|TV Show|In a city of coac...|
|TV Show|The arrival of a ...|
|  Movie|Equestria's divid...|
|  Movie|On a photo shoot ...|
|TV Show|A talented batch ...|
|  Movie|A woman adjusting...|
|TV Show|"Sicily boasts a ...|
|TV Show|Struggling to ear...|
|  Movie|After most of her...|
|  Movie|When the clever b...|
|TV Show|Cameras following...|
|TV Show|"Students of colo...|
|  Movie|Declassified docu...|
|TV Show|Strangers Diego a...|
|  Movie|After a deadly ho...|
|TV Show|In the 1960s, a H...|
+-------+--------------------+
only showing top 20 rows



In [9]:
print(f"Number of rows before cleaning: {df.count()}")
print(f"Number of columns before cleaning: {len(df.columns)}")

                                                                                

Number of rows before cleaning: 8809
Number of columns before cleaning: 12


In [10]:
#value counts for TV Shows and Movies 
df.groupBy('type').count().show()

[Stage 7:>                                                          (0 + 1) / 1]

+-------------+-----+
|         type|count|
+-------------+-----+
|         NULL|    1|
|      TV Show| 2676|
|        Movie| 6131|
|William Wyler|    1|
+-------------+-----+



                                                                                

In [11]:
# Clean Dataset: Keep rows where 'description' and 'type' are not null
df_cleaned = df.filter(df['description'].isNotNull() & df['type'].isNotNull())
df_cleaned.groupBy('type').count().show()

[Stage 10:>                                                         (0 + 1) / 1]

+-------+-----+
|   type|count|
+-------+-----+
|TV Show| 2676|
|  Movie| 6130|
+-------+-----+



                                                                                

In [12]:

# Check the distribution of classes in the target column
class_counts = df_cleaned.groupBy('type').count().orderBy('count', ascending=False)

# Show the class distribution
class_counts.show()



[Stage 13:>                                                         (0 + 1) / 1]

+-------+-----+
|   type|count|
+-------+-----+
|  Movie| 6130|
|TV Show| 2676|
+-------+-----+



                                                                                

feature engineering

~feature extraction
~countVectorizer
~TFIDF
~WordEmbedding


In [15]:
# Convert description to lowercase
df_cleaned = df_cleaned.withColumn("description", lower(col("description")))

# Remove non-alphanumeric characters except spaces (keeps spaces)
df_cleaned = df_cleaned.withColumn("description", regexp_replace(col("description"), "[^a-zA-Z0-9\\s]", ""))

# Display the cleaned dataframe
df_cleaned.select("type", "description").show(truncate=False)

# Tokenization, stopword removal, and feature extraction stages for the pipeline
tokenizer = Tokenizer(inputCol="description", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="new_tokens")
vectorizer = CountVectorizer(inputCol="new_tokens", outputCol="vectorized_features")
idf = IDF(inputCol="vectorized_features", outputCol="final_features")

# Label encoding for the 'type' column
labelEncoder = StringIndexer(inputCol="type", outputCol="labels")

# Define the preprocessing pipeline (no model here)
preprocessing_stages = [
    tokenizer,
    stopword_remover,
    vectorizer,
    idf,
    labelEncoder
]

# Create the preprocessing pipeline
preprocessing_pipeline = Pipeline(stages=preprocessing_stages)

# Fit and transform the preprocessing pipeline on the cleaned data
preprocessing_model = preprocessing_pipeline.fit(df_cleaned)
df_cleaned_transformed = preprocessing_model.transform(df_cleaned)

# Check if the final_features column exists now
df_cleaned_transformed.select("final_features", "labels").show(5)


                                                                                

+-------+------------------------------------------------------------------------------------------------------------------------------------------------------+
|type   |description                                                                                                                                           |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------+
|Movie  |as her father nears the end of his life filmmaker kirsten johnson stages his death in inventive and comical ways to help them both face the inevitable|
|TV Show|after crossing paths at a party a cape town teen sets out to prove whether a privateschool swimming star is her sister who was abducted at birth      |
|TV Show|to protect his family from a powerful drug lord skilled thief mehdi and his expert team of robbers are pulled into a violent and deadly turf war      |
|TV Show|feuds flirtations and toi

[Stage 25:>                                                         (0 + 1) / 1]

+--------------------+------+
|      final_features|labels|
+--------------------+------+
|(20690,[1,19,25,4...|   0.0|
|(20690,[31,49,88,...|   1.0|
|(20690,[4,35,42,1...|   1.0|
|(20690,[3,12,55,1...|   1.0|
|(20690,[1,9,38,71...|   1.0|
+--------------------+------+
only showing top 5 rows



                                                                                

In [16]:
# dict of labels
labels_dict={'Movie': 0.0,
'TV Show':1.0}

In [17]:
# Split the dataset first (before oversampling)
(df_train, df_test) = df_cleaned_transformed.randomSplit([0.8, 0.2], seed=42)

# Show class distribution before oversampling
print("Class distribution in original training set:")
df_train.groupBy("type").count().show()


Class distribution in original training set:


[Stage 26:>                                                         (0 + 1) / 1]

+-------+-----+
|   type|count|
+-------+-----+
|TV Show| 2172|
|  Movie| 4939|
+-------+-----+



                                                                                

In [18]:
# Handle class imbalance in the training set
movie_count = df_train.filter(df_train.type == "Movie").count()
tv_show_df = df_train.filter(df_train.type == "TV Show")
tv_show_count = tv_show_df.count()

# Calculate how many additional samples are needed
oversample_count = movie_count - tv_show_count

# Perform oversampling (repeat TV Show rows)
oversampled_tv_show_df = tv_show_df.sample(withReplacement=True, fraction=(oversample_count / tv_show_count + 1), seed=42).limit(oversample_count)

# Combine original training set with oversampled TV Shows
df_train_balanced = df_train.union(oversampled_tv_show_df)

# Show class distribution after oversampling
print("Class distribution in training set AFTER oversampling:")
df_train_balanced.groupBy("type").count().show()

# Optional: Show test set class distribution (should remain untouched)
print("Class distribution in test set:")
df_test.groupBy("type").count().show()

                                                                                

Class distribution in training set AFTER oversampling:


                                                                                

+-------+-----+
|   type|count|
+-------+-----+
|TV Show| 4939|
|  Movie| 4939|
+-------+-----+

Class distribution in test set:


[Stage 41:>                                                         (0 + 1) / 1]

+-------+-----+
|   type|count|
+-------+-----+
|TV Show|  504|
|  Movie| 1191|
+-------+-----+



                                                                                

In [19]:
#train the models 
# Create classifiers
lr = LogisticRegression(featuresCol="final_features", labelCol="labels")
rf = RandomForestClassifier(featuresCol="final_features", labelCol="labels")
nb = NaiveBayes(featuresCol="final_features", labelCol="labels")
dt = DecisionTreeClassifier(featuresCol="final_features", labelCol="labels")
gbt = GBTClassifier(featuresCol="final_features", labelCol="labels")
svc = LinearSVC(featuresCol="final_features", labelCol="labels")

# Define Pipelines for each model (no need to repeat preprocessing stages)
pipeline_lr = Pipeline(stages=[lr])
pipeline_rf = Pipeline(stages=[rf])
pipeline_nb = Pipeline(stages=[nb])
pipeline_dt = Pipeline(stages=[dt])
pipeline_gbt = Pipeline(stages=[gbt])
pipeline_svc = Pipeline(stages=[svc])

# Train the models using the balanced training data (df_train_balanced)
lr_model = pipeline_lr.fit(df_train_balanced)
rf_model = pipeline_rf.fit(df_train_balanced)
nb_model = pipeline_nb.fit(df_train_balanced)
dt_model = pipeline_dt.fit(df_train_balanced)
gbt_model = pipeline_gbt.fit(df_train_balanced)
svc_model = pipeline_svc.fit(df_train_balanced)


25/04/11 11:53:09 WARN DAGScheduler: Broadcasting large task binary with size 1194.8 KiB
25/04/11 11:53:15 WARN MemoryStore: Not enough space to cache rdd_253_0 in memory! (computed 75.2 MiB so far)
25/04/11 11:53:15 WARN BlockManager: Persisting block rdd_253_0 to disk instead.
25/04/11 11:53:27 WARN MemoryStore: Not enough space to cache rdd_253_0 in memory! (computed 113.5 MiB so far)
25/04/11 11:53:32 WARN DAGScheduler: Broadcasting large task binary with size 1209.5 KiB
25/04/11 11:53:33 WARN MemoryStore: Not enough space to cache rdd_253_0 in memory! (computed 113.5 MiB so far)
25/04/11 11:53:35 WARN DAGScheduler: Broadcasting large task binary with size 1224.4 KiB
25/04/11 11:53:37 WARN MemoryStore: Not enough space to cache rdd_253_0 in memory! (computed 113.5 MiB so far)
25/04/11 11:53:39 WARN DAGScheduler: Broadcasting large task binary with size 1243.8 KiB
25/04/11 11:53:41 WARN MemoryStore: Not enough space to cache rdd_253_0 in memory! (computed 113.5 MiB so far)
25/04/11 

In [20]:

# Define evaluators for accuracy, precision, recall, and F1-score
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="labels", predictionCol="prediction", metricName="accuracy")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="labels", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="labels", predictionCol="prediction", metricName="weightedRecall")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="labels", predictionCol="prediction", metricName="f1")

# Function to evaluate models
def evaluate_model(model, model_name, train_data, test_data):
    # Training predictions
    train_predictions = model.transform(train_data)
    print(f"\n{model_name} - Training Set Metrics:")
    train_accuracy = evaluator_accuracy.evaluate(train_predictions)
    train_precision = evaluator_precision.evaluate(train_predictions)
    train_recall = evaluator_recall.evaluate(train_predictions)
    train_f1_score = evaluator_f1.evaluate(train_predictions)

    print(f"Training Accuracy: {train_accuracy:.4f}")
    print(f"Training Precision: {train_precision:.4f}")
    print(f"Training Recall: {train_recall:.4f}")
    print(f"Training F1 Score: {train_f1_score:.4f}")
    
    # Testing predictions
    test_predictions = model.transform(test_data)
    print(f"\n{model_name} - Test Set Metrics:")
    test_accuracy = evaluator_accuracy.evaluate(test_predictions)
    test_precision = evaluator_precision.evaluate(test_predictions)
    test_recall = evaluator_recall.evaluate(test_predictions)
    test_f1_score = evaluator_f1.evaluate(test_predictions)

    print(f"Testing Accuracy: {test_accuracy:.4f}")
    print(f"Testing Precision: {test_precision:.4f}")
    print(f"Testing Recall: {test_recall:.4f}")
    print(f"Testing F1 Score: {test_f1_score:.4f}")

# Example for Logistic Regression
evaluate_model(lr_model, "Logistic Regression", df_train_balanced, df_test)

# Example for Random Forest
evaluate_model(rf_model, "Random Forest", df_train_balanced, df_test)

# Example for Naive Bayes
evaluate_model(nb_model, "Naive Bayes", df_train_balanced, df_test)

# Example for Decision Tree
evaluate_model(dt_model, "Decision Tree", df_train_balanced, df_test)

# Example for Gradient Boosting
evaluate_model(gbt_model, "Gradient Boosting", df_train_balanced, df_test)

# Example for Support Vector Classifier
evaluate_model(svc_model, "Support Vector Classifier", df_train_balanced, df_test)



Logistic Regression - Training Set Metrics:


                                                                                

Training Accuracy: 0.9999
Training Precision: 0.9999
Training Recall: 0.9999
Training F1 Score: 0.9999

Logistic Regression - Test Set Metrics:


                                                                                

Testing Accuracy: 0.6985
Testing Precision: 0.6953
Testing Recall: 0.6985
Testing F1 Score: 0.6968

Random Forest - Training Set Metrics:


                                                                                

Training Accuracy: 0.6180
Training Precision: 0.6493
Training Recall: 0.6180
Training F1 Score: 0.5970

Random Forest - Test Set Metrics:


                                                                                

Testing Accuracy: 0.6826
Testing Precision: 0.6568
Testing Recall: 0.6826
Testing F1 Score: 0.6643

Naive Bayes - Training Set Metrics:


25/04/11 12:18:05 WARN DAGScheduler: Broadcasting large task binary with size 1004.6 KiB
25/04/11 12:18:11 WARN DAGScheduler: Broadcasting large task binary with size 1004.6 KiB
25/04/11 12:18:17 WARN DAGScheduler: Broadcasting large task binary with size 1004.6 KiB
25/04/11 12:18:23 WARN DAGScheduler: Broadcasting large task binary with size 1004.6 KiB
                                                                                

Training Accuracy: 0.9758
Training Precision: 0.9758
Training Recall: 0.9758
Training F1 Score: 0.9758

Naive Bayes - Test Set Metrics:


                                                                                

Testing Accuracy: 0.7357
Testing Precision: 0.7228
Testing Recall: 0.7357
Testing F1 Score: 0.7265

Decision Tree - Training Set Metrics:


                                                                                

Training Accuracy: 0.5951
Training Precision: 0.6946
Training Recall: 0.5951
Training F1 Score: 0.5357

Decision Tree - Test Set Metrics:


                                                                                

Testing Accuracy: 0.7333
Testing Precision: 0.7154
Testing Recall: 0.7333
Testing F1 Score: 0.6842

Gradient Boosting - Training Set Metrics:


                                                                                

Training Accuracy: 0.7114
Training Precision: 0.7520
Training Recall: 0.7114
Training F1 Score: 0.6993

Gradient Boosting - Test Set Metrics:


                                                                                

Testing Accuracy: 0.7233
Testing Precision: 0.7013
Testing Recall: 0.7233
Testing F1 Score: 0.7034

Support Vector Classifier - Training Set Metrics:


                                                                                

Training Accuracy: 0.9999
Training Precision: 0.9999
Training Recall: 0.9999
Training F1 Score: 0.9999

Support Vector Classifier - Test Set Metrics:


[Stage 1074:>                                                       (0 + 1) / 1]

Testing Accuracy: 0.7257
Testing Precision: 0.7132
Testing Recall: 0.7257
Testing F1 Score: 0.7173


                                                                                

In [None]:

# Initialize Logistic Regression model
lr = LogisticRegression(featuresCol="final_features", labelCol="labels")

# Create ParamGridBuilder for hyperparameter tuning for Logistic Regression
paramGrid_lr = (ParamGridBuilder()
                .addGrid(lr.regParam, [0.01, 0.1, 1.0])  
                .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])  
                .addGrid(lr.maxIter, [10, 50, 100]) 
                .build())

# Create CrossValidator for Logistic Regression
cv_lr = CrossValidator(estimator=lr, 
                       estimatorParamMaps=paramGrid_lr, 
                       evaluator=evaluator_accuracy,  
                       numFolds=10,  # 10-fold Stratified Cross Validation
                       seed=42)


cvModel_lr = cv_lr.fit(df_train_balanced)  

# Best model and parameters
best_lr_model = cvModel_lr.bestModel
best_params_lr = best_lr_model.extractParamMap()

# Print the best parameters for Logistic Regression
print("Best Parameters for Logistic Regression:")
for param, value in best_params_lr.items():
    print(f"{param.name}: {value}")


In [None]:

# Initialize Naive Bayes model
nb = NaiveBayes(featuresCol="final_features", labelCol="labels")

# Create ParamGridBuilder for hyperparameter tuning for Naive Bayes
paramGrid_nb = (ParamGridBuilder()
                .addGrid(nb.smoothing, [1.0, 1.5, 2.0])  
                .build())

# Create CrossValidator for Naive Bayes using the already defined evaluator
cv_nb = CrossValidator(estimator=nb, 
                       estimatorParamMaps=paramGrid_nb, 
                       evaluator=evaluator_accuracy, 
                       numFolds=10,  
                       seed=42)

# Train the model using Cross-Validation on the balanced training set
cvModel_nb = cv_nb.fit(df_train_balanced)

# Best model and parameters
best_nb_model = cvModel_nb.bestModel
best_params_nb = best_nb_model.extractParamMap()

# Print the best parameters for Naive Bayes
print("Best Parameters for Naive Bayes:")
for param, value in best_params_nb.items():
    print(f"{param.name}: {value}")


In [None]:

# Initialize LinearSVC model
svc = LinearSVC(featuresCol="final_features", labelCol="labels")

# Create ParamGridBuilder for hyperparameter tuning for SVM
paramGrid_svc = (ParamGridBuilder()
                 .addGrid(svc.regParam, [0.01, 0.1, 1.0])  # Regularization parameter
                 .addGrid(svc.maxIter, [10, 50, 100])  # Number of iterations
                 .build())

# Create CrossValidator for SVM using the existing evaluator
cv_svc = CrossValidator(estimator=svc, 
                        estimatorParamMaps=paramGrid_svc, 
                        evaluator=evaluator_accuracy,  # Use the existing evaluator
                        numFolds=10,  # 10-fold Stratified Cross Validation
                        seed=42)

# Train the model using Cross-Validation
cvModel_svc = cv_svc.fit(df_train_balanced)

# Best model and parameters
best_svc_model = cvModel_svc.bestModel
best_params_svc = best_svc_model.extractParamMap()

# Print the best parameters for SVM
print("Best Parameters for SVM:")
for param, value in best_params_svc.items():
    print(f"{param.name}: {value}")


In [None]:

# Initialize Logistic Regression model with manually entered parameters
final_lr_model = LogisticRegression(
    featuresCol="final_features", 
    labelCol="labels",
    elasticNetParam=0.0,  # L2 regularization (no ElasticNet mixing)
    regParam=1.0,  # Regularization strength
    maxIter=10,  # Number of iterations
    fitIntercept=True,  # Include intercept term
    standardization=True  # Standardize features
)

# Define a parameter grid for cross-validation
param_grid = (ParamGridBuilder()
              .addGrid(final_lr_model.regParam, [0.1, 0.5, 1.0])  # Test different regularization strengths
              .addGrid(final_lr_model.maxIter, [10, 20])  # Test different numbers of iterations
              .build())

# Create a 10-fold cross-validation using the manually set model and evaluator
cv_lr = CrossValidator(
    estimator=final_lr_model,  # Model with manually set parameters
    evaluator=evaluator_accuracy,  # Use the evaluator you defined earlier
    estimatorParamMaps=param_grid,  # Set the parameter grid for cross-validation
    numFolds=10,  # 10-fold cross-validation
    seed=42  # For reproducibility
)

# Perform the 10-fold cross-validation on the training data (df_train_balanced)
cv_model_lr = cv_lr.fit(df_train_balanced)

# The best model found from cross-validation
best_model_lr = cv_model_lr.bestModel

# Make predictions on the training data using the best model
train_predictions = best_model_lr.transform(df_train_balanced)

# Make predictions on the test data (original split)
test_predictions = best_model_lr.transform(df_test)

# Evaluate the model on the balanced training data
train_accuracy = evaluator.evaluate(train_predictions, {evaluator.metricName: "accuracy"})
train_precision = evaluator.evaluate(train_predictions, {evaluator.metricName: "weightedPrecision"})
train_recall = evaluator.evaluate(train_predictions, {evaluator.metricName: "weightedRecall"})
train_f1 = evaluator.evaluate(train_predictions, {evaluator.metricName: "f1"})
train_auc = binary_evaluator.evaluate(train_predictions)  # AUC for binary classification

# Evaluate the model on the test data (original split)
test_accuracy = evaluator.evaluate(test_predictions, {evaluator.metricName: "accuracy"})
test_precision = evaluator.evaluate(test_predictions, {evaluator.metricName: "weightedPrecision"})
test_recall = evaluator.evaluate(test_predictions, {evaluator.metricName: "weightedRecall"})
test_f1 = evaluator.evaluate(test_predictions, {evaluator.metricName: "f1"})
test_auc = binary_evaluator.evaluate(test_predictions)  # AUC for binary classification

# Print evaluation metrics for training data
print("Final Logistic Regression Training Data Metrics:")
print(f"Training Accuracy: {train_accuracy:.4f}")
print(f"Training Precision: {train_precision:.4f}")
print(f"Training Recall: {train_recall:.4f}")
print(f"Training F1 Score: {train_f1:.4f}")
print(f"Training AUC: {train_auc:.4f}")

# Print evaluation metrics for test data
print("\nFinal Logistic Regression Test Data Metrics:")
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Precision: {test_precision:.4f}")
print(f"Test Recall: {test_recall:.4f}")
print(f"Test F1 Score: {test_f1:.4f}")
print(f"Test AUC: {test_auc:.4f}")


                                                                                

In [None]:
# 1. Initialize Naive Bayes model with manually set parameters
final_nb_model = NaiveBayes(
    featuresCol="final_features",
    labelCol="labels",
    modelType="multinomial",
    smoothing=1.0,  # Smoothing factor
    predictionCol="prediction",
    probabilityCol="probability",
    rawPredictionCol="rawPrediction"
)

# 2. Define a parameter grid for cross-validation (smoothing parameter)
param_grid = (ParamGridBuilder()
              .addGrid(final_nb_model.smoothing, [0.1, 0.5, 1.0, 2.0])  # Test different smoothing values
              .build())

# 3. Set up cross-validation
cv_nb = CrossValidator(
    estimator=final_nb_model,
    evaluator=evaluator_accuracy,  # Reuse previously defined evaluator
    estimatorParamMaps=param_grid,
    numFolds=10,  # 10-fold cross-validation
    seed=42  # For reproducibility
)

# 4. Perform 10-fold cross-validation on the balanced training data
cv_model_nb = cv_nb.fit(df_train_balanced)

# The best model from cross-validation (even though we're manually setting parameters)
best_model_nb = cv_model_nb.bestModel

# 5. Predict on the balanced training data and test data
nb_train_pred = best_model_nb.transform(df_train_balanced)
nb_test_pred = best_model_nb.transform(df_test)

# 6. Evaluate on the training data (oversampled)
nb_train_acc = evaluator.evaluate(nb_train_pred, {evaluator.metricName: "accuracy"})
nb_train_prec = evaluator.evaluate(nb_train_pred, {evaluator.metricName: "weightedPrecision"})
nb_train_recall = evaluator.evaluate(nb_train_pred, {evaluator.metricName: "weightedRecall"})
nb_train_f1 = evaluator.evaluate(nb_train_pred, {evaluator.metricName: "f1"})
nb_train_auc = binary_evaluator.evaluate(nb_train_pred)  # AUC for binary classification

# 7. Evaluate on the test data (untouched test set)
nb_test_acc = evaluator.evaluate(nb_test_pred, {evaluator.metricName: "accuracy"})
nb_test_prec = evaluator.evaluate(nb_test_pred, {evaluator.metricName: "weightedPrecision"})
nb_test_recall = evaluator.evaluate(nb_test_pred, {evaluator.metricName: "weightedRecall"})
nb_test_f1 = evaluator.evaluate(nb_test_pred, {evaluator.metricName: "f1"})
nb_test_auc = binary_evaluator.evaluate(nb_test_pred)  

# 8. Output Naive Bayes results
print("\nFinal Naive Bayes Model Results:")
print(f"Train Accuracy: {nb_train_acc:.4f} | Test Accuracy: {nb_test_acc:.4f}")
print(f"Train Precision: {nb_train_prec:.4f} | Test Precision: {nb_test_prec:.4f}")
print(f"Train Recall: {nb_train_recall:.4f} | Test Recall: {nb_test_recall:.4f}")
print(f"Train F1 Score: {nb_train_f1:.4f} | Test F1 Score: {nb_test_f1:.4f}")
print(f"Train AUC: {nb_train_auc:.4f} | Test AUC: {nb_test_auc:.4f}")


In [None]:

# Define final LinearSVC model with best parameters
final_svm_model = LinearSVC(
    featuresCol="final_features",
    labelCol="labels",
    maxIter=100,
    regParam=0.01,
    fitIntercept=True,
    standardization=True,
    tol=1e-6,
    predictionCol="prediction",
    rawPredictionCol="rawPrediction"
)



# 1. Initialize BinaryClassificationEvaluator for AUC
binary_evaluator = BinaryClassificationEvaluator(labelCol="labels", rawPredictionCol="rawPrediction")

# 2. Train SVM model on the balanced training data (df_train_balanced)
svm_model_trained = final_svm_model.fit(df_train_balanced)

# 3. Predict on both balanced training and original test data
svm_train_pred = svm_model_trained.transform(df_train_balanced)
svm_test_pred = svm_model_trained.transform(df_test)

# 4. Training metrics
svm_train_acc = evaluator.evaluate(svm_train_pred, {evaluator.metricName: "accuracy"})
svm_train_prec = evaluator.evaluate(svm_train_pred, {evaluator.metricName: "weightedPrecision"})
svm_train_recall = evaluator.evaluate(svm_train_pred, {evaluator.metricName: "weightedRecall"})
svm_train_f1 = evaluator.evaluate(svm_train_pred, {evaluator.metricName: "f1"})

# 5. Testing metrics
svm_test_acc = evaluator.evaluate(svm_test_pred, {evaluator.metricName: "accuracy"})
svm_test_prec = evaluator.evaluate(svm_test_pred, {evaluator.metricName: "weightedPrecision"})
svm_test_recall = evaluator.evaluate(svm_test_pred, {evaluator.metricName: "weightedRecall"})
svm_test_f1 = evaluator.evaluate(svm_test_pred, {evaluator.metricName: "f1"})

# 6. Compute AUC for both train and test data
svm_train_auc = binary_evaluator.evaluate(svm_train_pred)
svm_test_auc = binary_evaluator.evaluate(svm_test_pred)

# 7. Output SVM results
print("\nFinal SVM Model Results:")
print(f"Train Accuracy: {svm_train_acc:.4f} | Test Accuracy: {svm_test_acc:.4f}")
print(f"Train Precision: {svm_train_prec:.4f} | Test Precision: {svm_test_prec:.4f}")
print(f"Train Recall: {svm_train_recall:.4f} | Test Recall: {svm_test_recall:.4f}")
print(f"Train F1 Score: {svm_train_f1:.4f} | Test F1 Score: {svm_test_f1:.4f}")
print(f"Train AUC: {svm_train_auc:.4f} | Test AUC: {svm_test_auc:.4f}")



#MODEL evaluation
#accuracy
#precision
#f1score
