In [3]:
import os

from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
    .appName("SpookyAuthorIdentification") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.maxResultSize", "12g") \
    .getOrCreate()


# Load the CSV data into a Spark DataFrame
file_path = "train.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)
df = df.sample(fraction=0.5, seed=42)
# Show the schema and the first few rows of the Spark DataFrame
df.printSchema()
df.show(5, truncate=False)


root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- author: string (nullable = true)

+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|id     |text                                                                                                                                                                                                                                                                                                                                                |author|
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------

In [4]:
from pyspark.ml.feature import Tokenizer

# Tokenize the text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
df_tokens = tokenizer.transform(df)

# Show the tokenized text
df_tokens.select("tokens").show(5, truncate=False)


+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|tokens                                                                                                                                                                                                                                                                                                                                                                                                            |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [5]:
from pyspark.ml.feature import StopWordsRemover

# Customize stopwords list to include pronouns
stopwords = StopWordsRemover.loadDefaultStopWords("english") + ['I', 'you', 'he', 'she', 'it', 'we', 'they', 'me', 'him', 'her']

# Remove stopwords from the tokenized text
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens", stopWords=stopwords)
df_cleaned = remover.transform(df_tokens)

# Show the cleaned tokens
df_cleaned.select("filtered_tokens").show(5, truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|filtered_tokens                                                                                                                                                                                                                                    |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[lovely, spring, looked, windsor, terrace, sixteen, fertile, counties, spread, beneath,, speckled, happy, cottages, wealthier, towns,, looked, former, years,, heart, cheering, fair.]                                                             |
|[surcingle, hun

In [6]:
#Stage 2 Feature Extraction

#import required functions
from pyspark.ml.feature import StopWordsRemover, Tokenizer
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.feature import Normalizer
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

#TF-IDF calculation
vectorizer = CountVectorizer(inputCol="filtered_tokens", outputCol="vectorized_tokens")
idf = IDF(inputCol="vectorized_tokens", outputCol="tfidf")
normalizer = Normalizer(inputCol="tfidf", outputCol="normalized_features")
indexer = StringIndexer(inputCol="author", outputCol="label")

#The dataframe has already gone through the tokenizer and remover steps at this point, so they don't need to
#be included in the pipeline
pipeline = Pipeline(stages=[vectorizer, idf, normalizer, indexer])

processed_data = pipeline.fit(df_cleaned).transform(df_cleaned)



In [7]:
# Stage 3 Machine Learning

# Random Forest
from pyspark.ml.classification import RandomForestClassifier

# Split the data
train_data, test_data = processed_data.randomSplit([0.8, 0.2], seed=42)

# Random Forest
rf = RandomForestClassifier(labelCol="label", featuresCol="normalized_features", numTrees=20)
rf_model = rf.fit(train_data)

predictions = rf_model.transform(test_data)



In [8]:
# Logistic Regression with PCA
from pyspark.ml.classification import LogisticRegression


from pyspark.ml.feature import PCA

#pca = PCA(k=100, inputCol="normalized_features", outputCol="pca_features")
#processed_data = pca.fit(processed_data).transform(processed_data)

logistic_regression = LogisticRegression(featuresCol="normalized_features", labelCol="label", maxIter=50)

train_data, test_data = processed_data.randomSplit([0.8, 0.2], seed=42)


lr_model = logistic_regression.fit(train_data)

lr_predictions = lr_model.transform(test_data)

In [9]:
# Stage 4 Evaluation

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Random Forest Accuracy
accuracy = evaluator.evaluate(predictions)

print(f"Random Forest Accuracy = {accuracy:.2f}")

# Logistic Regression AccuracyLogistic Regression Accuracy =  0.60


lr_accuracy = evaluator.evaluate(lr_predictions)
print(f"Logistic Regression Accuracy =  {lr_accuracy:.2f}")

Random Forest Accuracy = 0.37
Logistic Regression Accuracy =  0.60
