In [1]:
from pyspark.sql.functions import col, sum
from pyspark.sql.functions import length, avg
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col
import pandas as pd
from pyspark.sql import SparkSession
# Start Spark session
#spark = SparkSession.builder.appName("SpookyAuthor").getOrCreate()

spark = SparkSession.builder \
    .appName("SpookyAuthor") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()


# Load CSV into Spark DataFrame
df = spark.read.csv("train.csv", header=True, inferSchema=True)

# View schema
df.printSchema()

# Show sample data
df.show(5)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/22 22:16:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/07/22 22:16:21 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- author: string (nullable = true)

+-------+--------------------+------+
|     id|                text|author|
+-------+--------------------+------+
|id26305|This process, how...|   EAP|
|id17569|It never once occ...|   HPL|
|id11008|In his left hand ...|   EAP|
|id27763|How lovely is spr...|   MWS|
|id12958|Finding nothing e...|   HPL|
+-------+--------------------+------+
only showing top 5 rows


In [2]:
df_train = spark.read.csv("train.csv", header=True, inferSchema=True)
print("\nTrain")
df_train.show()

df_test = spark.read.csv("test.csv", header=True, inferSchema=True)
print("\nTest")
df_test.show()


Train
+-------+--------------------+------+
|     id|                text|author|
+-------+--------------------+------+
|id26305|This process, how...|   EAP|
|id17569|It never once occ...|   HPL|
|id11008|In his left hand ...|   EAP|
|id27763|How lovely is spr...|   MWS|
|id12958|Finding nothing e...|   HPL|
|id22965|A youth passed in...|   MWS|
|id09674|The astronomer, p...|   EAP|
|id13515|The surcingle hun...|   EAP|
|id19322|I knew that you c...|   EAP|
|id00912|I confess that ne...|   MWS|
|id16737|"He shall find th...|   MWS|
|id16607|Here we barricade...|   EAP|
|id19764|Herbert West need...|   HPL|
|id18886|The farm like gro...|   HPL|
|id17189|But a glance will...|   EAP|
|id12799|He had escaped me...|   MWS|
|id08441|To these speeches...|   EAP|
|id13117|Her native sprigh...|   MWS|
|id14862|I even went so fa...|   EAP|
|id20836|His facial aspect...|   HPL|
+-------+--------------------+------+
only showing top 20 rows

Test
+-------+--------------------+
|     id|          

In [3]:
# check for null
from pyspark.sql.functions import col, isnan, when, count

# Get list of numeric columns
numeric_types = ["DoubleType", "IntegerType", "FloatType", "LongType"]
numeric_columns = [f.name for f in df_train.schema.fields if f.dataType.simpleString() in [t.lower().replace("type", "") for t in numeric_types]]

# Apply isnan() only to numeric columns, isNull() to all
df_train.select([
    count(
        when(
            col(c).isNull() | (isnan(col(c)) if c in numeric_columns else False),
            c
        )
    ).alias(c)
    for c in df_train.columns
]).show()



+---+----+------+
| id|text|author|
+---+----+------+
|  0|   0|     0|
+---+----+------+



In [4]:
from pyspark.ml.feature import StopWordsRemover, Tokenizer
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.feature import Normalizer
from pyspark.ml import Pipeline

# Step 1: Tokenization
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
df_train_tokenized = tokenizer.transform(df_train)

# Step 2: Stop word removal
stopwords = StopWordsRemover.loadDefaultStopWords("english") + ['i', 'you', 'he', 'she', 'it', 'we', 'they', 'me', 'him', 'her']
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens", stopWords=stopwords)
df_train_filtered = remover.transform(df_train_tokenized)


df_train_filtered.show()

+-------+--------------------+------+--------------------+--------------------+
|     id|                text|author|              tokens|     filtered_tokens|
+-------+--------------------+------+--------------------+--------------------+
|id26305|This process, how...|   EAP|[this, process,, ...|[process,, howeve...|
|id17569|It never once occ...|   HPL|[it, never, once,...|[never, occurred,...|
|id11008|In his left hand ...|   EAP|[in, his, left, h...|[left, hand, gold...|
|id27763|How lovely is spr...|   MWS|[how, lovely, is,...|[lovely, spring, ...|
|id12958|Finding nothing e...|   HPL|[finding, nothing...|[finding, nothing...|
|id22965|A youth passed in...|   MWS|[a, youth, passed...|[youth, passed, s...|
|id09674|The astronomer, p...|   EAP|[the, astronomer,...|[astronomer,, per...|
|id13515|The surcingle hun...|   EAP|[the, surcingle, ...|[surcingle, hung,...|
|id19322|I knew that you c...|   EAP|[i, knew, that, y...|[knew, say, 'ster...|
|id00912|I confess that ne...|   MWS|[i,

In [5]:
#Stage 2: Feature Extraction

#TF-IDF Feature Extraction
from pyspark.ml.feature import CountVectorizer, IDF

#convert filtered tokens to term frequency vectors
count_vectorizer = CountVectorizer(inputCol="filtered_tokens", outputCol="raw_features", vocabSize=10000, minDF=5)

#set IDF to scale term frequencies by inverse document frequency
idf = IDF(inputCol="raw_features", outputCol="tfidf_features")

#normalize the TF-IDF features
from pyspark.ml.feature import Normalizer

#normalize the TF-IDF features using L2 norm (len)
normalizer = Normalizer(inputCol="tfidf_features", outputCol="normalized_features", p=2.0)

#build and apply pipeline
from pyspark.ml import Pipeline

#create pipeline with existing tokenizer, stop word remover, and new stages
pipeline = Pipeline(stages=[
    tokenizer,
    remover,
    count_vectorizer,
    idf,
    normalizer
])

#fit pipeline to training data
pipeline_model = pipeline.fit(df_train)

#transform training data
df_train_transformed = pipeline_model.transform(df_train)

#show transformed training data with added feature columns
df_train_transformed.select("id", "text", "author", "filtered_tokens", "raw_features", "tfidf_features", "normalized_features").show()

#apply pipeline to the test data
df_test_transformed = pipeline_model.transform(df_test)

#show transformed test data
df_test_transformed.select("id", "text", "filtered_tokens", "raw_features", "tfidf_features", "normalized_features").show()

                                                                                

+-------+--------------------+------+--------------------+--------------------+--------------------+--------------------+
|     id|                text|author|     filtered_tokens|        raw_features|      tfidf_features| normalized_features|
+-------+--------------------+------+--------------------+--------------------+--------------------+--------------------+
|id26305|This process, how...|   EAP|[process,, howeve...|(9079,[4,9,33,47,...|(9079,[4,9,33,47,...|(9079,[4,9,33,47,...|
|id17569|It never once occ...|   HPL|[never, occurred,...|(9079,[4,10,228,7...|(9079,[4,10,228,7...|(9079,[4,10,228,7...|
|id11008|In his left hand ...|   EAP|[left, hand, gold...|(9079,[48,87,136,...|(9079,[48,87,136,...|(9079,[48,87,136,...|
|id27763|How lovely is spr...|   MWS|[lovely, spring, ...|(9079,[85,88,422,...|(9079,[85,88,422,...|(9079,[85,88,422,...|
|id12958|Finding nothing e...|   HPL|[finding, nothing...|(9079,[2,67,145,3...|(9079,[2,67,145,3...|(9079,[2,67,145,3...|
|id22965|A youth passed 

In [6]:
# Split the training data (80% train, 20% validation)
train_data, val_data = df_train_transformed.randomSplit([0.8, 0.2], seed=42)

# Optional: cache if large
# train_data.cache()
# val_data.cache()


In [7]:
# Build pipeline using training data
label_indexer = StringIndexer(inputCol="author", outputCol="label")
lr = LogisticRegression(featuresCol="normalized_features", labelCol="label", maxIter=10)

pipeline = Pipeline(stages=[label_indexer, lr])

# 👇 Fit model on the TRAINING DATA only
model = pipeline.fit(df_train_transformed)  # or train_data if you split earlier

# 👇 Now make predictions on the TEST set (which does NOT have 'author')
predictions = model.transform(df_test_transformed)

# If you want to evaluate, you must evaluate on validation data that *has* labels:
val_predictions = model.transform(val_data)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(val_predictions)
print(f"Validation Accuracy: {accuracy:.4f}")


25/07/22 22:16:54 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/07/22 22:17:00 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /10.0.0.177:53889 is closed
25/07/22 22:17:00 ERROR ChunkFetchRequestHandler: Error sending result ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=2030366165001,chunkIndex=0],buffer=org.apache.spark.storage.BlockManagerManagedBuffer@2766657c] to /10.0.0.177:53900; closing connection
java.io.IOException: No buffer space available
	at java.base/sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:62)
	at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:132)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:97)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:53)
	at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:532)
	at org.apache.spark.util.io.ChunkedByteBufferFileRegi

Validation Accuracy: 0.9780


                                                                                

In [10]:
# Evaluate F1 score
f1_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = f1_evaluator.evaluate(val_predictions)
print(f"Validation F1 Score: {f1_score:.4f}")



25/07/22 22:34:27 WARN DAGScheduler: Broadcasting large task binary with size 90.0 MiB
[Stage 325:>                                                        (0 + 1) / 1]

Validation F1 Score: 0.9742


                                                                                

In [13]:
import pandas as pd

# Step 1: Get the trained Logistic Regression model from pipeline
lr_model = model.stages[-1]  # last stage
vectorizer_model = pipeline_model.stages[2]  # CountVectorizer is stage 2 in your pipeline

# Step 2: Get vocabulary
vocab = vectorizer_model.vocabulary  # list of words

# Step 3: Get coefficients
coefs = lr_model.coefficientMatrix.toArray()  # shape: (numClasses, numFeatures)

# Step 4: Wrap into DataFrame
feature_weights = pd.DataFrame({
    "word": vocab,
    "EAP": coefs[0],  # assuming label 0 = EAP
    "HPL": coefs[1],
    "MWS": coefs[2],
})

# Step 5: View top words per class
print("Top EAP words:")
print(feature_weights.sort_values("EAP", ascending=False).head(10))

print("\nTop HPL words:")
print(feature_weights.sort_values("HPL", ascending=False).head(10))

print("\nTop MWS words:")
print(feature_weights.sort_values("MWS", ascending=False).head(10))



Top EAP words:
               word        EAP        HPL        MWS
7973            les  31.529796  -7.115751  -6.693555
7673         renown  28.767139  -7.372131  -5.048677
8531        garret,  28.705112  -1.941936 -24.790361
5967    mountainous  28.670752 -10.619615  -8.572036
5955  acquaintance,  27.572957  -6.513499  -5.107116
7677          valor  26.903874 -16.308050  -3.191897
8481       momently  26.850046  -3.723839 -21.637591
8033        slight,  24.678751 -12.905075  -5.766720
6071        driving  24.616322 -10.922387  -6.734704
6113        darkest  23.946692 -16.765882   2.087710

Top HPL words:
            word        EAP        HPL        MWS
7898     wedding  -6.713819  30.362181  -4.856881
5243      close, -20.364282  28.105938  -5.631452
7068      defeat -19.671866  28.047485  -1.312548
8237    ecstasy, -26.779940  25.973695   2.196979
7972       murky  -6.331920  25.301273 -16.558625
4699      given,   0.055081  24.961775  -7.473968
6679     nature;  -8.133241  24.1033

In [None]:
# We performed a logistic regression with an accuracy of 97.4%.  And the confusion matrix shows strong correlation (high positive values) for key words from an author in their works while the absence of those key words (negative values) in the other author's works.
