# M6 Team Assignment: Spooky Authorship With Spark Part 2

#### Group 13
- Aidan Lonergan
- Daniel Lillard
- Radhika Garg
- Claudine Uwiragiye

## Objective
- In this assignment, your team will improve your scores from the first Spooky Authorship assignment. Your goal should be to get at least a 80% accuracy. If you already have over 80% accuracy, aim to get 85% accuracy. 

<hr>

### Stage 0 - Import Data

In [1]:
# Stage 0 Solution
from pyspark.sql import SparkSession
import pandas as pd

# Start spark session and load train and test data sets
spark = SparkSession.builder \
    .appName("Module_5_Project") \
    .master("local[*]") \
    .config("spark.driver.memory", "20g") \
    .config("spark.executor.memory", "20g") \
    .config("spark.python.worker.memory", "1g") \
    .config("spark.executor.pyspark.memory", "2g") \
    .config("spark.rpc.io.connectionTimeout", "30s") \
    .config("spark.default.parallelism", "16") \
    .config("spark.executor.cores", "8") \
    .config("spark.task.cpus", "1") \
    .getOrCreate()

df_train = spark.read.csv('./train.csv', header=True, inferSchema=True, quote='"', escape='"')

# Optional: Subsample dataset for faster testing (comment out for full dataset)
#df_train = df_train.limit(10000)


<hr>

### Stage 1 - Data Preparation

In [2]:
# Step 1 - Preprocessing
from pyspark.ml.feature import NGram
from pyspark.sql.functions import col, split, explode, lower, regexp_replace, regexp_count, collect_list, first, length, array_union
import nltk
from nltk.corpus import stopwords
from pyspark.sql.types import StringType, ArrayType

# Get stop words
nltk.download('stopwords')
stop_words = set(stopwords.words('english'))

# Add punctuation count before removing it (Cannot use regexp_count here even when passing in a raw string...)
df_train = df_train.withColumn("comma_count", length(col("text")) - length(regexp_replace(col("text"), ",", "")))

# Clean and lowercase text, remove punctuation
df_train_cleaned = df_train.withColumn("clean_text", lower(regexp_replace(col("text"), r"[^a-zA-Z0-9\s]", "")))

# Tokenize into words then filter out empty strings after tokenization
df_train_words = df_train_cleaned.withColumn("word", explode(split(col("clean_text"), r"\s+"))).filter(col('word') != "")

# Remove stop words
df_train_filtered = df_train_words.filter(~col("word").isin(stop_words))

# Aggregate words into list per id, author (currently they are a single field per row) and retain text column
df_grouped = df_train_filtered.groupBy("id", "author").agg(
    collect_list("word").alias("words"),
    first("clean_text").alias("clean_text"),
    first("comma_count").alias("comma_count")
)
df_grouped = df_grouped.repartition(16)

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\aflon\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


<hr>

### Stage 2 - Feature Extraction

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, StandardScaler, VectorAssembler, StringIndexer
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Train-test split on grouped data (before feature extraction)
train_data_raw, test_data_raw = df_grouped.randomSplit([0.8, 0.2], seed=42)
train_data_raw = train_data_raw.repartition(16)
test_data_raw = test_data_raw.repartition(16)
train_data_raw.cache()
test_data_raw.cache()

# Define stages for pipeline
hashingTF = HashingTF(inputCol='words', outputCol='tf', numFeatures=4096)
idf = IDF(inputCol='tf', outputCol='tfidf', minDocFreq=5)
scaler = StandardScaler(inputCol='tfidf', outputCol='features', withMean=True, withStd=True)
#assembler = VectorAssembler(inputCols=['tfidf_norm', 'comma_count', 'sentance_length'], outputCol='features')
indexer = StringIndexer(inputCol='author', outputCol='label')

# Define MLP trainer
nn_trainer = MultilayerPerceptronClassifier(
    featuresCol='features',
    labelCol='label',
    solver="l-bfgs",
    tol=1e-6,
    maxIter=200,
    seed=42
)

# Define pipeline with all stages
nn_pipeline = Pipeline(stages=[hashingTF, idf, scaler, indexer, nn_trainer])

# Define evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol='label',
    predictionCol='prediction',
    metricName='accuracy'
)

# Determine number of classes dynamically
indexer_model = indexer.fit(train_data_raw)
num_classes = len(indexer_model.labels)

<hr>

### Stage 3 - Machine Learning
1) Perform train/test split
2) Train neural network to achieve >85% accuracy

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

# Define parameter grid for cross-validation
paramGrid = (ParamGridBuilder()
    .addGrid(hashingTF.numFeatures, [8192])
    .addGrid(nn_trainer.layers, [
        [8192, 300, 150, 75, 3]
    ])
    .addGrid(nn_trainer.stepSize, [0.01])
    .addGrid(nn_trainer.maxIter, [100])
    .addGrid(nn_trainer.blockSize, [128])
    .build())

tvs = TrainValidationSplit(
    estimator=nn_pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    trainRatio=0.8,
    parallelism=8
)

# 4. Fit & evaluate
tvs_model = tvs.fit(train_data_raw)
best_model = tvs_model.bestModel

print("Train Acc:", evaluator.evaluate(best_model.transform(train_data_raw)))
print("Test  Acc:", evaluator.evaluate(best_model.transform(test_data_raw)))
print("Best Params:", {
    "numFeatures": best_model.stages[0].getNumFeatures(),
    "layers":       best_model.stages[-1].getLayers(),
    "stepSize":     best_model.stages[-1].getStepSize(),
    "maxIter":      best_model.stages[-1].getMaxIter(),
    "blockSize":    best_model.stages[-1].getBlockSize()
})

Train Acc: 0.9998092816274634
Test  Acc: 0.718148725949038
Best Params: {'numFeatures': 8192, 'layers': [8192, 300, 150, 75, 3], 'stepSize': 0.01, 'maxIter': 100, 'blockSize': 128}


<hr>

### Stage 4 - Evaluation and Visualization

In [None]:
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

label_mapping = indexer_model.labels  # Get mapping of indices to author names for analysis (e.g., ['EAP', 'HPL', 'MWS'])
test_data_nn = test_data.drop('label')
nn_test_predictions = nn_model.transform(test_data_nn)

nn_evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='f1')
acc_score = nn_evaluator.evaluate(nn_test_predictions)
print(f"Neural Network Accuracy Score: {acc_score}")

# Get confusion matrix
nn_test_predictions = nn_test_predictions.select('label', 'prediction').toPandas()
nn_conf_mat = confusion_matrix(nn_test_predictions['label'], nn_test_predictions['prediction'])

# Plot confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(nn_conf_mat, annot=True, fmt='d', cmap='Blues', xticklabels=label_mapping, yticklabels=label_mapping)
plt.title("Neural Network Confusion Matrix")
plt.xlabel("Predicted Author")
plt.ylabel("True Author")
plt.show()