In [2]:
#Imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover

#Create a spark session
spark = SparkSession.builder.appName("SpookyAuthorIdentification").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/26 16:26:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Stage 0: Import Data

In [3]:
#Load training data into a data frame
train_df = spark.read.csv('Datasets/train.csv', header=True, inferSchema=True)

#Verify
train_df.printSchema()
train_df.show(5)


root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- author: string (nullable = true)

+-------+--------------------+------+
|     id|                text|author|
+-------+--------------------+------+
|id26305|This process, how...|   EAP|
|id17569|It never once occ...|   HPL|
|id11008|In his left hand ...|   EAP|
|id27763|How lovely is spr...|   MWS|
|id12958|Finding nothing e...|   HPL|
+-------+--------------------+------+
only showing top 5 rows



In [4]:
#Load test data into a data frame
test_df = spark.read.csv('Datasets/test.csv', header=True, inferSchema=True)

#Verify
test_df.printSchema()
test_df.show(5)

root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)

+-------+--------------------+
|     id|                text|
+-------+--------------------+
|id02310|Still, as I urged...|
|id24541|If a fire wanted ...|
|id00134|And when they had...|
|id27757|While I was think...|
|id04081|I am not sure to ...|
+-------+--------------------+
only showing top 5 rows



In [5]:
#Print num rows and cols in the training set
train_row_count = train_df.count()
train_column_count = len(train_df.columns)
print(f"Training Data - Rows: {train_row_count}, Columns: {train_column_count}")

Training Data - Rows: 19579, Columns: 3


In [6]:
#Print num rows and cols in the test set
test_row_count = test_df.count()
test_column_count = len(test_df.columns)
print(f"Test Data - Rows: {test_row_count}, Columns: {test_column_count}")

Test Data - Rows: 8392, Columns: 2


In [7]:
#Check for missing values (There are none)
train_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in train_df.columns]).show()
test_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in test_df.columns]).show()

+---+----+------+
| id|text|author|
+---+----+------+
|  0|   0|     0|
+---+----+------+

+---+----+
| id|text|
+---+----+
|  0|   0|
+---+----+



# Stage 1: Data Preparation - Exploratory data anlysis and text mining pre-processing

In [8]:
#Create sentence length column
train_df = train_df.withColumn("sentence_length", F.length(F.col("text")))
train_df.select("sentence_length").describe().show()

+-------+------------------+
|summary|   sentence_length|
+-------+------------------+
|  count|             19579|
|   mean|139.99765054395016|
| stddev|101.25452331007808|
|    min|                 5|
|    max|              3682|
+-------+------------------+



In [9]:
#Use tokenizer to tokenize the text
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenized_train_df = tokenizer.transform(train_df)

#Print tokenized output
tokenized_train_df.select("text", "words").show(5, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                                                                                                                                   |words                                                                                                                                                                                                                                                              

In [11]:
#Create an instance of StopWordsRemover and use it to filter the tokens and remove stop words
#The new filtered tokens are placed in a new column "filtered_words"
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
cleaned_train_df = remover.transform(tokenized_train_df)

#Print the cleaned output
cleaned_train_df.select("words", "filtered_words").show(5, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|words                                                                                                                                                                                                                                                                            |filtered_words                                                                                                                                                                              |
+---------------------------------------------------------------------

# Stage 2: Feature Extraction

In [42]:
## Stage 2 from Richa's File
import setuptools
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, Normalizer
from pyspark.ml import Pipeline


# Step 6: Tokenization
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
tokens_df = tokenizer.transform(train_df)

# Step 7: Stop words removal
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
filtered_df = remover.transform(tokens_df)

# Step 8: TF-IDF calculation using CountVectorizer
vectorizer = CountVectorizer(inputCol="filtered_tokens", outputCol="vectorized_tokens")
idf = IDF(inputCol="vectorized_tokens", outputCol="tfidf")

# Step 9: Normalization
normalizer = Normalizer(inputCol="tfidf", outputCol="normalized_features")

# Step 10: Create and apply pipeline
pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, idf, normalizer])
processed_data = pipeline.fit(train_df).transform(train_df)

# Step 11: Show the final processed DataFrame with normalized features
processed_data.select("normalized_features").show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|normalized_features   

24/10/26 17:50:40 WARN DAGScheduler: Broadcasting large task binary with size 1180.3 KiB


In [43]:
processed_data.show(5)

+-------+--------------------+------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     id|                text|author|sentence_length|              tokens|     filtered_tokens|   vectorized_tokens|               tfidf| normalized_features|
+-------+--------------------+------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|id26305|This process, how...|   EAP|            231|[this, process,, ...|[process,, howeve...|(42876,[4,9,33,47...|(42876,[4,9,33,47...|(42876,[4,9,33,47...|
|id17569|It never once occ...|   HPL|             71|[it, never, once,...|[never, occurred,...|(42876,[4,10,228,...|(42876,[4,10,228,...|(42876,[4,10,228,...|
|id11008|In his left hand ...|   EAP|            200|[in, his, left, h...|[left, hand, gold...|(42876,[48,87,136...|(42876,[48,87,136...|(42876,[48,87,136...|
|id27763|How lovely is spr...|   MWS|         

24/10/26 17:50:42 WARN DAGScheduler: Broadcasting large task binary with size 1194.9 KiB


# Stage 3 Machine Learning

In [44]:
## encoding authors 
processed_data = processed_data.withColumn(
    "author",
    F.when(F.col("author") == "EAP", 0)
    .when(F.col("author") == "HPL", 1)
    .when(F.col("author") == "MWS", 2)
    .otherwise(None)
)

ml_df = processed_data.filter(F.col("author").isNotNull())

train_data, test_data = ml_df.randomSplit([0.8, 0.2], seed=42)

## grabbing normalized_features and author only for test data
test_data = test_data.select("normalized_features", "author")

train_data.show(5)


24/10/26 17:50:44 WARN DAGScheduler: Broadcasting large task binary with size 1233.2 KiB


+-------+--------------------+------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     id|                text|author|sentence_length|              tokens|     filtered_tokens|   vectorized_tokens|               tfidf| normalized_features|
+-------+--------------------+------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|id00001|Idris was well co...|     2|             49|[idris, was, well...|[idris, well, con...|(42876,[30,565,15...|(42876,[30,565,15...|(42876,[30,565,15...|
|id00002|I was faint, even...|     1|             87|[i, was, faint,, ...|[faint,, even, fa...|(42876,[2,12,49,1...|(42876,[2,12,49,1...|(42876,[2,12,49,1...|
|id00004|He might see, per...|     0|            134|[he, might, see,,...|[might, see,, per...|(42876,[0,4,26,19...|(42876,[0,4,26,19...|(42876,[0,4,26,19...|
|id00005|All obeyed the Lo...|     2|         

In [41]:
from pyspark.ml.classification import LogisticRegression

# Initialize LogisticRegression model
lr = LogisticRegression(featuresCol="normalized_features", labelCol="author")

# Train the model
lr_model = lr.fit(train_data)

24/10/26 17:43:48 WARN DAGScheduler: Broadcasting large task binary with size 1258.3 KiB
24/10/26 17:43:49 WARN DAGScheduler: Broadcasting large task binary with size 1259.0 KiB
24/10/26 17:43:49 WARN DAGScheduler: Broadcasting large task binary with size 1259.0 KiB
24/10/26 17:43:49 WARN DAGScheduler: Broadcasting large task binary with size 1259.0 KiB
24/10/26 17:43:49 WARN DAGScheduler: Broadcasting large task binary with size 1259.0 KiB
24/10/26 17:43:49 WARN DAGScheduler: Broadcasting large task binary with size 1259.0 KiB
24/10/26 17:43:49 WARN DAGScheduler: Broadcasting large task binary with size 1259.0 KiB
24/10/26 17:43:49 WARN DAGScheduler: Broadcasting large task binary with size 1259.0 KiB
24/10/26 17:43:50 WARN DAGScheduler: Broadcasting large task binary with size 1259.0 KiB
24/10/26 17:43:50 WARN DAGScheduler: Broadcasting large task binary with size 1259.0 KiB
24/10/26 17:43:50 WARN DAGScheduler: Broadcasting large task binary with size 1259.0 KiB
24/10/26 17:43:50 WAR

In [45]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predictions = lr_model.transform(test_data)

# Evaluate accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="author", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

24/10/26 17:54:58 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB


Accuracy: 0.7367680180180181


                                                                                

In [48]:
## Confusion Matrix
## Shows prediction distribution
## For example, our model prediction of correctly 0 - 0 or EAP - EAP makes up 28% of all predictions
## Our True positives add up to the above accuracy score of ~74% which is well above the 50% threshold


confusion_matrix = predictions.groupBy("author", "prediction").count().orderBy("author", "prediction")
total_predictions = predictions.count()

confusion_matrix = confusion_matrix.withColumn(
    "percentage",
    (F.col("count") / total_predictions) * 100
)

confusion_matrix = confusion_matrix.orderBy("author", "prediction")
confusion_matrix.show()

24/10/26 18:00:02 WARN DAGScheduler: Broadcasting large task binary with size 1225.5 KiB
24/10/26 18:00:03 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB


+------+----------+-----+------------------+
|author|prediction|count|        percentage|
+------+----------+-----+------------------+
|     0|       0.0| 1025| 28.85698198198198|
|     0|       1.0|  150| 4.222972972972973|
|     0|       2.0|  207| 5.827702702702703|
|     1|       0.0|  177| 4.983108108108108|
|     1|       1.0|  786| 22.12837837837838|
|     1|       2.0|  121|3.4065315315315314|
|     2|       0.0|  187| 5.264639639639639|
|     2|       1.0|   93|2.6182432432432434|
|     2|       2.0|  806|22.691441441441444|
+------+----------+-----+------------------+



24/10/26 18:00:03 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
