## 1. Model Selection Configuration
Set to True for models you want to train, False for models to skip.

In [0]:
%pip install spark-nlp


Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
# Model selection configuration - Set to True to run, False to skip
RUN_MODELS = {
    'transformer_classifier': True,
    'bert_features': True, 
    'transformer_mlp': True,
    'distilbert_classifier': True,
    'roberta_features': False
}

# Random search configuration
RANDOM_SEARCH_CONFIG = {
    'num_folds': 3,
    'parallelism': 1
}
# File paths configuration
DATA_PATHS = {
    'train': "dbfs:/FileStore/tables/train_df.csv",
    'val': "dbfs:/FileStore/tables/val_df.csv",
    'test': "dbfs:/FileStore/tables/test_df.csv"
}

## 2. Importing Libraries

In [0]:
# Importing libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.lines import Line2D
from math import ceil
import time
import os

# PySpark imports
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, when, lit, count, lag, expr, concat_ws, collect_list
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, ArrayType, FloatType, StringType

# ML imports for classification
from pyspark.ml.classification import LogisticRegression, MultilayerPerceptronClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer, IndexToString
from pyspark.ml import Pipeline

# Scikit-learn imports for metrics (only for visualization)
from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc, precision_recall_curve

# Spark NLP imports for transformer models
try:
    import sparknlp
    from sparknlp.base import *
    from sparknlp.annotator import *
    from sparknlp.pretrained import PretrainedPipeline
    SPARK_NLP_AVAILABLE = True
    print("Spark NLP available - using transformer models")
except ImportError:
    SPARK_NLP_AVAILABLE = False
    print("Spark NLP not available - using alternative transformer approaches")



Spark NLP available - using transformer models


## 3. Loading and Preprocessing Data

In [0]:
def load_and_preprocess_data(file_paths):
    """Load and preprocess data from specified file paths."""
    # Load data
    train_data = spark.read.csv(file_paths['train'], header=True, inferSchema=True)
    val_data = spark.read.csv(file_paths['val'], header=True, inferSchema=True)
    test_data = spark.read.csv(file_paths['test'], header=True, inferSchema=True)
    
    # Convert numerical features to text for transformer processing
    feature_cols = [col for col in train_data.columns if col not in ['label', 'time', 'file']]
    
    # Create text representation of features for transformer models
    # Combine all features into a single text column
    def create_text_features(df):
        # Convert all feature columns to string and concatenate
        text_expr = concat_ws(" ", *[col(c).cast("string") for c in feature_cols])
        df = df.withColumn("text_features", text_expr)
        return df
    
    train_data = create_text_features(train_data)
    val_data = create_text_features(val_data)
    test_data = create_text_features(test_data)
    
    # Also keep original features for hybrid approaches
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    train_data = assembler.transform(train_data)
    val_data = assembler.transform(val_data)
    test_data = assembler.transform(test_data)
    
    print(f"Data loaded and preprocessed:")
    print(f" - Training samples: {train_data.count()}")
    print(f" - Validation samples: {val_data.count()}")
    print(f" - Test samples: {test_data.count()}")
    
    return train_data, val_data, test_data

## 4. Helper Functions

In [0]:
# Helper function to convert PySpark predictions to numpy arrays for plotting
def get_prediction_labels(predictions_df):
    """Extract prediction and label columns from PySpark DataFrame."""
    pred_labels = predictions_df.select("prediction", "label").toPandas()
    y_pred = pred_labels["prediction"].values
    y_true = pred_labels["label"].values
    return y_pred, y_true

# Helper function to get prediction probabilities
def get_prediction_probabilities(predictions_df):
    """Extract probability column from PySpark DataFrame."""
    prob_df = predictions_df.select("probability").toPandas()
    return np.array([x.toArray() for x in prob_df["probability"]])

# Helper function to plot confusion matrix
def plot_confusion_matrix(y_true, y_pred, title="Confusion Matrix"):
    """Plot confusion matrix using seaborn."""
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False)
    plt.title(title)
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.show()

# Helper function to print classification report
def print_classification_report(y_true, y_pred):
    """Print classification report with precision, recall, and F1 scores."""
    report = classification_report(y_true, y_pred)
    print("Classification Report:")
    print(report)

def evaluate_model(model_name, val_predictions, num_classes, has_probability=True):
    """Perform comprehensive evaluation of a model."""
    print(f"\n--- {model_name} Evaluation ---")
    
    # Extract predictions and true labels
    y_pred, y_true = get_prediction_labels(val_predictions)
    
    # Plot confusion matrix
    print("\nConfusion Matrix:")
    plot_confusion_matrix(y_true, y_pred, f"{model_name} Confusion Matrix")
    
    # Print classification report
    print(f"\n{model_name} Classification Report:")
    print_classification_report(y_true, y_pred)
    
    # If model has probability outputs, plot ROC and PR curves
    y_pred_proba = None
    if has_probability:
        try:
            y_pred_proba = get_prediction_probabilities(val_predictions)
            print(f"{model_name} has probability outputs available")
        except:
            print(f"{model_name} probability outputs not available")
            has_probability = False
    
    return y_pred, y_true, y_pred_proba


In [0]:
def perform_random_search(model, param_grid, train_data, val_data, num_folds=3, parallelism=2):
    """Perform random search for hyperparameter tuning of a model."""
    # Initialize the evaluator for F1 score
    evaluator = MulticlassClassificationEvaluator(
        labelCol='label',
        predictionCol='prediction',
        metricName='f1'
    )
    
    # Initialize CrossValidator for hyperparameter tuning
    cv = CrossValidator(
        estimator=model,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        numFolds=num_folds,
        parallelism=parallelism
    )
    
    # Start timing
    start_time = time.time()
    
    # Fit the cross-validator to the training data
    print("Training model with random search...")
    cv_model = cv.fit(train_data)
    
    # End timing
    training_time = time.time() - start_time
    print(f"Training completed in {training_time:.2f} seconds")
    
    # Extract the best model
    best_model = cv_model.bestModel
    
    # Get best parameters
    best_params = {}
    for param in best_model.extractParamMap():
        param_name = param.name
        param_value = best_model.getOrDefault(param)
        best_params[param_name] = param_value
    
    # Make predictions with the best model
    train_predictions = best_model.transform(train_data)
    val_predictions = best_model.transform(val_data)
    
    # Calculate F1 scores
    train_f1 = evaluator.evaluate(train_predictions)
    val_f1 = evaluator.evaluate(val_predictions)
    
    return best_model, best_params, train_predictions, val_predictions, train_f1, val_f1

## 5 - Modelling

In [0]:
class TransformerFeatureExtractor:
    """Custom transformer feature extractor using Spark NLP"""
    
    def __init__(self, model_name="bert_base_uncased"):
        self.model_name = model_name
        
    def create_pipeline(self):
        """Create Spark NLP pipeline for feature extraction"""
        if not SPARK_NLP_AVAILABLE:
            raise ImportError("Spark NLP not available")
        
        # Document assembler
        document_assembler = DocumentAssembler() \
            .setInputCol("text_features") \
            .setOutputCol("document")
        
        # Tokenizer
        tokenizer = Tokenizer() \
            .setInputCols(["document"]) \
            .setOutputCol("token")
        
        # BERT embeddings
        bert_embeddings = BertEmbeddings.pretrained("bert_base_uncased", "en") \
            .setInputCols(["document", "token"]) \
            .setOutputCol("bert_embeddings") \
            .setCaseSensitive(False) \
            .setMaxSentenceLength(512)
        
        # Sentence embeddings (pooling)
        sentence_embeddings = SentenceEmbeddings() \
            .setInputCols(["document", "bert_embeddings"]) \
            .setOutputCol("sentence_embeddings") \
            .setPoolingStrategy("AVERAGE")
        
        return Pipeline(stages=[
            document_assembler,
            tokenizer, 
            bert_embeddings,
            sentence_embeddings
        ])

def create_transformer_mlp_model(input_dim, hidden_layers, num_classes):
    """Create deep MLP that works with transformer features"""
    layers = [input_dim] + hidden_layers + [num_classes]
    
    return MultilayerPerceptronClassifier(
        labelCol="label",
        featuresCol="transformer_features",
        layers=layers,
        seed=42
    )

# Load and preprocess the data
train_data, val_data, test_data = load_and_preprocess_data(DATA_PATHS)

# Display a few samples
print("\nSample of training data:")
train_data.show(3)

# Count classes
num_classes = train_data.select("label").distinct().count()
print(f"\nNumber of classes: {num_classes}")

# Store model results
model_names = []
val_scores = []

Data loaded and preprocessed:
 - Training samples: 3142910
 - Validation samples: 681381
 - Test samples: 681395

Sample of training data:
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------+-----+--------------------+--------------------+
|              FP1-F7|               F7-T7|               T7-P7|               P7-O1|              FP1-F3|               F3-C3|               C3-P3|               P3-O1|              FP2-F4|               F4-C4|               C4-P4|               P4-O2|              FP2-F8|               F8-T8|             

In [0]:
# ==================== TRANSFORMER CLASSIFIER ====================
if RUN_MODELS['transformer_classifier'] and SPARK_NLP_AVAILABLE:
    print("\n==== Transformer Classifier Model ====")
    
    try:
        # Create transformer feature extractor
        transformer_extractor = TransformerFeatureExtractor()
        feature_pipeline = transformer_extractor.create_pipeline()
        
        # Extract features
        print("Extracting transformer features...")
        feature_model = feature_pipeline.fit(train_data)
        train_transformed = feature_model.transform(train_data)
        val_transformed = feature_model.transform(val_data)
        
        # Convert embeddings to vector features
        def extract_embeddings_udf(embeddings):
            if embeddings and len(embeddings) > 0:
                return embeddings[0].embeddings.tolist()
            else:
                return [0.0] * 768  # BERT base dimension
        
        extract_udf = udf(extract_embeddings_udf, ArrayType(FloatType()))
        
        # Apply UDF to extract embeddings
        train_with_features = train_transformed.withColumn("embedding_array", extract_udf("sentence_embeddings"))
        val_with_features = val_transformed.withColumn("embedding_array", extract_udf("sentence_embeddings"))
        
        # Convert to vector format
        from pyspark.ml.functions import array_to_vector
        train_final = train_with_features.withColumn("transformer_features", array_to_vector("embedding_array"))
        val_final = val_with_features.withColumn("transformer_features", array_to_vector("embedding_array"))
        
        # Create classifier on top of transformer features
        transformer_classifier = LogisticRegression(
            labelCol="label",
            featuresCol="transformer_features"
        )
        
        # Parameter grid for transformer classifier
        tc_param_grid = ParamGridBuilder() \
            .addGrid(transformer_classifier.regParam, [0.01, 0.1]) \
            .addGrid(transformer_classifier.elasticNetParam, [0.0, 0.5]) \
            .build()
        
        # Perform random search
        tc_best_model, tc_best_params, tc_train_preds, tc_val_preds, tc_train_f1, tc_val_f1 = perform_random_search(
            transformer_classifier, tc_param_grid, train_final, val_final,
            num_folds=RANDOM_SEARCH_CONFIG['num_folds'],
            parallelism=RANDOM_SEARCH_CONFIG['parallelism']
        )
        
        # Print best parameters and performance
        print("\nBest Transformer Classifier Parameters:")
        for param, value in tc_best_params.items():
            print(f" {param}: {value}")
        
        print(f"\nTransformer Classifier - Training F1 Score: {tc_train_f1:.4f}")
        print(f"Transformer Classifier - Validation F1 Score: {tc_val_f1:.4f}")
        
        # Run comprehensive evaluation
        tc_y_pred, tc_y_true, tc_y_pred_proba = evaluate_model("Transformer Classifier", tc_val_preds, num_classes)
        
        model_names.append("Transformer Classifier")
        val_scores.append(tc_val_f1)
        
    except Exception as e:
        print(f"Error with Transformer Classifier: {e}")
        print("Skipping Transformer Classifier")


==== Transformer Classifier Model ====
Error with Transformer Classifier: 'JavaPackage' object is not callable
Skipping Transformer Classifier


In [0]:
# ==================== BERT FEATURES + MLP ====================
if RUN_MODELS['bert_features']:
    print("\n==== BERT Features + MLP Model ====")
    
    if SPARK_NLP_AVAILABLE:
        try:
            # Use same transformer features as above but with MLP
            if 'train_final' in locals():
                # Create MLP with transformer features
                bert_mlp = create_transformer_mlp_model(
                    input_dim=768,  # BERT base dimension
                    hidden_layers=[256, 128, 64],
                    num_classes=num_classes
                )
                
                # Parameter grid for BERT MLP
                bert_param_grid = ParamGridBuilder() \
                    .addGrid(bert_mlp.blockSize, [32, 64]) \
                    .addGrid(bert_mlp.maxIter, [50, 100]) \
                    .addGrid(bert_mlp.stepSize, [0.01, 0.03]) \
                    .build()
                
                # Perform random search
                bert_best_model, bert_best_params, bert_train_preds, bert_val_preds, bert_train_f1, bert_val_f1 = perform_random_search(
                    bert_mlp, bert_param_grid, train_final, val_final,
                    num_folds=RANDOM_SEARCH_CONFIG['num_folds'],
                    parallelism=RANDOM_SEARCH_CONFIG['parallelism']
                )
                
                print("\nBest BERT + MLP Parameters:")
                for param, value in bert_best_params.items():
                    print(f" {param}: {value}")
                
                print(f"\nBERT + MLP - Training F1 Score: {bert_train_f1:.4f}")
                print(f"BERT + MLP - Validation F1 Score: {bert_val_f1:.4f}")
                
                # Run comprehensive evaluation
                bert_y_pred, bert_y_true, _ = evaluate_model("BERT + MLP", bert_val_preds, num_classes, has_probability=False)
                
                model_names.append("BERT + MLP")
                val_scores.append(bert_val_f1)
            else:
                print("Transformer features not available, skipping BERT + MLP")
                
        except Exception as e:
            print(f"Error with BERT + MLP: {e}")
    else:
        print("Spark NLP not available, skipping BERT + MLP")


==== BERT Features + MLP Model ====
Transformer features not available, skipping BERT + MLP


In [0]:
# ==================== TRANSFORMER-INSPIRED MLP ====================
if RUN_MODELS['transformer_mlp']:
    print("\n==== Transformer-Inspired MLP Model ====")
    
    # Create transformer-inspired features using original numerical features
    # Simulate attention mechanisms through feature interactions
    def create_attention_features(df, feature_cols):
        # Create pairwise feature interactions (attention-like)
        attention_cols = []
        
        # Select top features to avoid combinatorial explosion
        top_features = feature_cols[:10]
        
        for i, feat1 in enumerate(top_features):
            for j, feat2 in enumerate(top_features[i+1:], i+1):
                interaction_col = f"attn_{i}_{j}"
                df = df.withColumn(interaction_col, col(feat1) * col(feat2))
                attention_cols.append(interaction_col)
        
        return df, attention_cols
    
    # Get original feature columns
    feature_cols = [col for col in train_data.columns if col not in ['label', 'time', 'file', 'text_features', 'features']]
    
    # Create attention-like features
    train_attention, attention_cols = create_attention_features(train_data, feature_cols)
    val_attention, _ = create_attention_features(val_data, feature_cols)
    
    # Combine original and attention features
    all_features = feature_cols + attention_cols
    
    # Assemble features
    transformer_assembler = VectorAssembler(inputCols=all_features, outputCol="transformer_inspired_features")
    train_attention_final = transformer_assembler.transform(train_attention)
    val_attention_final = transformer_assembler.transform(val_attention)
    
    # Create deep MLP (transformer-inspired architecture)
    transformer_inspired_mlp = MultilayerPerceptronClassifier(
        labelCol="label",
        featuresCol="transformer_inspired_features",
        layers=[len(all_features), 512, 256, 128, 64, num_classes],  # Deep architecture
        seed=42
    )
    
    # Parameter grid for transformer-inspired MLP
    tmlp_param_grid = ParamGridBuilder() \
        .addGrid(transformer_inspired_mlp.blockSize, [32, 64]) \
        .addGrid(transformer_inspired_mlp.maxIter, [100, 200]) \
        .addGrid(transformer_inspired_mlp.stepSize, [0.001, 0.01]) \
        .build()
    
    # Perform random search
    tmlp_best_model, tmlp_best_params, tmlp_train_preds, tmlp_val_preds, tmlp_train_f1, tmlp_val_f1 = perform_random_search(
        transformer_inspired_mlp, tmlp_param_grid, train_attention_final, val_attention_final,
        num_folds=RANDOM_SEARCH_CONFIG['num_folds'],
        parallelism=RANDOM_SEARCH_CONFIG['parallelism']
    )
    
    print("\nBest Transformer-Inspired MLP Parameters:")
    for param, value in tmlp_best_params.items():
        print(f" {param}: {value}")
    
    print(f"\nTransformer-Inspired MLP - Training F1 Score: {tmlp_train_f1:.4f}")
    print(f"Transformer-Inspired MLP - Validation F1 Score: {tmlp_val_f1:.4f}")
    
    # Run comprehensive evaluation
    tmlp_y_pred, tmlp_y_true, _ = evaluate_model("Transformer-Inspired MLP", tmlp_val_preds, num_classes, has_probability=False)
    
    model_names.append("Transformer-Inspired MLP")
    val_scores.append(tmlp_val_f1)


==== Transformer-Inspired MLP Model ====
Training model with random search...


In [0]:
# ==================== DISTILBERT CLASSIFIER ====================
if RUN_MODELS['distilbert_classifier'] and SPARK_NLP_AVAILABLE:
    print("\n==== DistilBERT Classifier Model ====")
    
    try:
        # Create DistilBERT pipeline
        document_assembler = DocumentAssembler() \
            .setInputCol("text_features") \
            .setOutputCol("document")
        
        tokenizer = Tokenizer() \
            .setInputCols(["document"]) \
            .setOutputCol("token")
        
        # Use DistilBERT embeddings
        distilbert_embeddings = DistilBertEmbeddings.pretrained("distilbert_base_uncased", "en") \
            .setInputCols(["document", "token"]) \
            .setOutputCol("distilbert_embeddings") \
            .setCaseSensitive(False)
        
        sentence_embeddings = SentenceEmbeddings() \
            .setInputCols(["document", "distilbert_embeddings"]) \
            .setOutputCol("sentence_embeddings") \
            .setPoolingStrategy("AVERAGE")
        
        distilbert_pipeline = Pipeline(stages=[
            document_assembler,
            tokenizer,
            distilbert_embeddings,
            sentence_embeddings
        ])
        
        # Extract DistilBERT features
        print("Extracting DistilBERT features...")
        distilbert_model = distilbert_pipeline.fit(train_data)
        train_distilbert = distilbert_model.transform(train_data)
        val_distilbert = distilbert_model.transform(val_data)
        
        # Process embeddings (similar to BERT processing)
        train_distilbert_features = train_distilbert.withColumn("embedding_array", extract_udf("sentence_embeddings"))
        val_distilbert_features = val_distilbert.withColumn("embedding_array", extract_udf("sentence_embeddings"))
        
        train_distilbert_final = train_distilbert_features.withColumn("distilbert_features", array_to_vector("embedding_array"))
        val_distilbert_final = val_distilbert_features.withColumn("distilbert_features", array_to_vector("embedding_array"))
        
        # Create classifier
        distilbert_classifier = LogisticRegression(
            labelCol="label",
            featuresCol="distilbert_features"
        )
        
        # Parameter grid
        db_param_grid = ParamGridBuilder() \
            .addGrid(distilbert_classifier.regParam, [0.01, 0.1]) \
            .addGrid(distilbert_classifier.elasticNetParam, [0.0, 0.5]) \
            .build()
        
        # Perform random search
        db_best_model, db_best_params, db_train_preds, db_val_preds, db_train_f1, db_val_f1 = perform_random_search(
            distilbert_classifier, db_param_grid, train_distilbert_final, val_distilbert_final,
            num_folds=RANDOM_SEARCH_CONFIG['num_folds'],
            parallelism=RANDOM_SEARCH_CONFIG['parallelism']
        )
        
        print("\nBest DistilBERT Classifier Parameters:")
        for param, value in db_best_params.items():
            print(f" {param}: {value}")
        
        print(f"\nDistilBERT Classifier - Training F1 Score: {db_train_f1:.4f}")
        print(f"DistilBERT Classifier - Validation F1 Score: {db_val_f1:.4f}")
        
        # Run comprehensive evaluation
        db_y_pred, db_y_true, db_y_pred_proba = evaluate_model("DistilBERT Classifier", db_val_preds, num_classes)
        
        model_names.append("DistilBERT Classifier")
        val_scores.append(db_val_f1)
        
    except Exception as e:
        print(f"Error with DistilBERT Classifier: {e}")
        print("Skipping DistilBERT Classifier")


In [0]:
# ==================== MODEL COMPARISON AND BEST MODEL SELECTION ====================
if model_names:
    # Find the best model based on validation F1 scores
    best_model_index = val_scores.index(max(val_scores))
    best_model_name = model_names[best_model_index]
    print(f"\nBest Model: {best_model_name} with Validation F1: {max(val_scores):.4f}")
    
    # Get the corresponding best model for test evaluation
    if best_model_name == "Transformer Classifier" and 'tc_best_model' in locals():
        best_model = tc_best_model
        test_data_processed = feature_model.transform(test_data)
        test_data_final = test_data_processed.withColumn("embedding_array", extract_udf("sentence_embeddings")) \
                                           .withColumn("transformer_features", array_to_vector("embedding_array"))
    elif best_model_name == "BERT + MLP" and 'bert_best_model' in locals():
        best_model = bert_best_model
        test_data_processed = feature_model.transform(test_data)
        test_data_final = test_data_processed.withColumn("embedding_array", extract_udf("sentence_embeddings")) \
                                           .withColumn("transformer_features", array_to_vector("embedding_array"))
    elif best_model_name == "Transformer-Inspired MLP" and 'tmlp_best_model' in locals():
        best_model = tmlp_best_model
        test_attention, _ = create_attention_features(test_data, feature_cols)
        test_data_final = transformer_assembler.transform(test_attention)
    elif best_model_name == "DistilBERT Classifier" and 'db_best_model' in locals():
        best_model = db_best_model
        test_data_processed = distilbert_model.transform(test_data)
        test_data_final = test_data_processed.withColumn("embedding_array", extract_udf("sentence_embeddings")) \
                                           .withColumn("distilbert_features", array_to_vector("embedding_array"))
    
    if 'best_model' in locals() and 'test_data_final' in locals():
        # Make predictions on the test set
        test_predictions = best_model.transform(test_data_final)
        
        # Initialize the evaluator for F1 score
        evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='f1')
        
        # Evaluate on test set
        test_f1 = evaluator.evaluate(test_predictions)
        print(f"Test F1 Score with {best_model_name}: {test_f1:.4f}")
        
        # Run comprehensive evaluation on test set
        print(f"\n--- {best_model_name} Test Set Evaluation ---")
        test_y_pred, test_y_true, test_y_pred_proba = evaluate_model(
            f"{best_model_name} (Test)", test_predictions, num_classes,
            has_probability=('Classifier' in best_model_name)
        )
    else:
        print("Best model not available for test evaluation")
        
else:
    print("No models were run, skipping test set evaluation.")

print("\n=== Transformer Models Pipeline Complete ===")

No models were run, skipping test set evaluation.

=== Transformer Models Pipeline Complete ===
