# Assignment 3: streaming analytics on text data

## Spark setting and data import

The task the about streaming analysis, including text analysis, building machine learning models specifically with PySpark and how to obtain data from streaming. The data collected are from a Spark setting. After duplicate removal there is about 5 thoudsand entries in the data set, which is large enough for a mahchine learning model on text analysis. The data is split for training and testing and preprocessed as what should be done normally in machine learning. Alongside, some text-analysis-specific methods are used like punctuation/stopwords removal as well as TF-IDF.

The first step is to introduce saved stories collected via Spark. For sake of model performance, dataframe used is Saprk Dataframe instead of RDD or Pandas Dataframe. First of all, a PySpark environment has been set up. The saved stories is read through.

In [1]:
from pyspark.sql.functions import col, when
from pyspark.sql.functions import *
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from nltk.stem import PorterStemmer

In [2]:
spark = SparkSession.builder \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [3]:
sc

In [4]:
spark

In [5]:
# Read through all the subdirectories saved
df = spark.read.json("C:/Users/Admin/Desktop/spark_data")

## Preprocession

Transaformations on the Spark Dataframe is executed for model training. Before transformation, duplicate rows are dropped baecause Spark cannot tell whether a data has been collected so there are overlapped entries. Then, some useless columns are dropped. For columns like user, aid, url, posted_at and domain, they are of no use because they are too unique. There are two 'titles': 'title' and 'source_title'. We only keep 'title' considering the fact that people would browse Hector News initially instead of browsing the source titles. Missing values are detected and dropped as well. As inspected, there are two kinds of missing values. One type is 'NULL' contents. Another one is '404 error' (shown as 'Page not found').

### Removing duplicates

In [6]:
# Remove duplicate rows and count
df = df.dropDuplicates()
df.count()

4396

In [7]:
schema = StructType([
    StructField("aid", StringType(), True),
    StructField("title", StringType(), True),
    StructField("url", StringType(), True),
    StructField("frontpage", StringType(), True),
    StructField("source_text", StringType(), True),
        StructField("votes", IntegerType(), True),
    StructField("comments", IntegerType(), True),

        StructField("domain", StringType(), True),
        StructField("user", StringType(), True),
    StructField("posted_at", StringType(), True),
        StructField("source_title", StringType(), True)
])


# Parse the JSON strings using the defined schema
df = df.withColumn("parsed_value", from_json(col("value"), schema)).select("parsed_value.*")

# Show the DataFrame with all columns
df.show(5)

+--------+--------------------+--------------------+---------+--------------------+-----+--------+-----------------+---------+-------------------+--------------------+
|     aid|               title|                 url|frontpage|         source_text|votes|comments|           domain|     user|          posted_at|        source_title|
+--------+--------------------+--------------------+---------+--------------------+-----+--------+-----------------+---------+-------------------+--------------------+
|40364744|A design for micr...|https://www.nextb...|    false|Millimeter accura...|    1|       0|nextbigfuture.com|    fanf2|2024-05-15 09:24:02|Millimeter accura...|
|40133032|Unreal Engine 5.4...|https://dev.epicg...|    false|Unreal Engine 5.4...|    1|       0|    epicgames.com|makepanic|2024-04-23 15:27:19|Unreal Engine 5.4...|
|40206221|Sunlight and Vita...|https://www.ncbi....|     true|Sunlight and Vita...|   43|      42|          nih.gov|      luu|2024-04-30 01:10:50|Sunlight and V

In [8]:
# drop some columns which wwould never be used
df = df.drop('aid','source_title','posted_at','user','url','domain')
df.show(5)

+--------------------+---------+--------------------+-----+--------+
|               title|frontpage|         source_text|votes|comments|
+--------------------+---------+--------------------+-----+--------+
|A design for micr...|    false|Millimeter accura...|    1|       0|
|Unreal Engine 5.4...|    false|Unreal Engine 5.4...|    1|       0|
|Sunlight and Vita...|     true|Sunlight and Vita...|   43|      42|
|Bryan Caplan on A...|    false|Q&A: Bryan Caplan...|    1|       0|
|Probing single el...|    false|Probing single el...|    1|       0|
+--------------------+---------+--------------------+-----+--------+
only showing top 5 rows



### Removing missing values

In [9]:
# Missing values check: 2 types could be viewed as missing values, then count
# Type 1: Page not found
# Type 2: NULL
df = df.where(df.source_text != 'Page not found')
df.dropna()
df.count()

4340

### Encoding the label column

The label variable 'frontpage' is in string type. To make predictions, it is converted as a dummy variable.

In [10]:
# Encode the label column 'frontpage' and show it to verify
df = df.withColumn('frontpage', when(df.frontpage==True, 1).otherwise(0))
df.show(5)

+--------------------+---------+--------------------+-----+--------+
|               title|frontpage|         source_text|votes|comments|
+--------------------+---------+--------------------+-----+--------+
|A design for micr...|        0|Millimeter accura...|    1|       0|
|Unreal Engine 5.4...|        0|Unreal Engine 5.4...|    1|       0|
|Sunlight and Vita...|        1|Sunlight and Vita...|   43|      42|
|Bryan Caplan on A...|        0|Q&A: Bryan Caplan...|    1|       0|
|Probing single el...|        0|Probing single el...|    1|       0|
+--------------------+---------+--------------------+-----+--------+
only showing top 5 rows



### Removing punctuations, stopwords, stemming and tokenizing the text

In preparation of featurization, the 'source_text' variable is tokenized so that sentences could be sliced into single words. After tokenization, punctuations and stop words have been removed. Then all the tokens are stemmed in terms of syntax.

In [11]:
# For text, remove the punctuations ('/"/,/./:/-/?/!/:/|/[/])
df_punc_drop = df.withColumn('source_text', regexp_replace(df.source_text, '[^a-zA-Z0-9]', ' '))

In [12]:
# For text, make every word in lowercase
df_token = Tokenizer(inputCol="source_text", outputCol="tokens").transform(df_punc_drop)
df_token.show(5)

+--------------------+---------+--------------------+-----+--------+--------------------+
|               title|frontpage|         source_text|votes|comments|              tokens|
+--------------------+---------+--------------------+-----+--------+--------------------+
|A design for micr...|        0|Millimeter accura...|    1|       0|[millimeter, accu...|
|Unreal Engine 5.4...|        0|Unreal Engine 5 4...|    1|       0|[unreal, engine, ...|
|Sunlight and Vita...|        1|Sunlight and Vita...|   43|      42|[sunlight, and, v...|
|Bryan Caplan on A...|        0|Q A  Bryan Caplan...|    1|       0|[q, a, , bryan, c...|
|Probing single el...|        0|Probing single el...|    1|       0|[probing, single,...|
+--------------------+---------+--------------------+-----+--------+--------------------+
only showing top 5 rows



In [13]:
# For text, remove stop words (a/an/the/then/and...)
stopwords = StopWordsRemover()
stopwords.getStopWords()

['i',
 'me',
 'my',
 'myself',
 'we',
 'our',
 'ours',
 'ourselves',
 'you',
 'your',
 'yours',
 'yourself',
 'yourselves',
 'he',
 'him',
 'his',
 'himself',
 'she',
 'her',
 'hers',
 'herself',
 'it',
 'its',
 'itself',
 'they',
 'them',
 'their',
 'theirs',
 'themselves',
 'what',
 'which',
 'who',
 'whom',
 'this',
 'that',
 'these',
 'those',
 'am',
 'is',
 'are',
 'was',
 'were',
 'be',
 'been',
 'being',
 'have',
 'has',
 'had',
 'having',
 'do',
 'does',
 'did',
 'doing',
 'a',
 'an',
 'the',
 'and',
 'but',
 'if',
 'or',
 'because',
 'as',
 'until',
 'while',
 'of',
 'at',
 'by',
 'for',
 'with',
 'about',
 'against',
 'between',
 'into',
 'through',
 'during',
 'before',
 'after',
 'above',
 'below',
 'to',
 'from',
 'up',
 'down',
 'in',
 'out',
 'on',
 'off',
 'over',
 'under',
 'again',
 'further',
 'then',
 'once',
 'here',
 'there',
 'when',
 'where',
 'why',
 'how',
 'all',
 'any',
 'both',
 'each',
 'few',
 'more',
 'most',
 'other',
 'some',
 'such',
 'no',
 'nor',
 '

In [14]:
stopwords = stopwords.setInputCol('tokens').setOutputCol('words')
df_clean = stopwords.transform(df_token)
df_clean.show(5)

+--------------------+---------+--------------------+-----+--------+--------------------+--------------------+
|               title|frontpage|         source_text|votes|comments|              tokens|               words|
+--------------------+---------+--------------------+-----+--------+--------------------+--------------------+
|A design for micr...|        0|Millimeter accura...|    1|       0|[millimeter, accu...|[millimeter, accu...|
|Unreal Engine 5.4...|        0|Unreal Engine 5 4...|    1|       0|[unreal, engine, ...|[unreal, engine, ...|
|Sunlight and Vita...|        1|Sunlight and Vita...|   43|      42|[sunlight, and, v...|[sunlight, vitami...|
|Bryan Caplan on A...|        0|Q A  Bryan Caplan...|    1|       0|[q, a, , bryan, c...|[q, , bryan, capl...|
|Probing single el...|        0|Probing single el...|    1|       0|[probing, single,...|[probing, single,...|
+--------------------+---------+--------------------+-----+--------+--------------------+--------------------+
o

In [15]:
# Stemming with PorterStemmer
def stem_words(words):
    stemmer = PorterStemmer()
    return [stemmer.stem(word) for word in words]

# Register the UDF
stem_udf = udf(stem_words, ArrayType(StringType()))

# Apply the stemmer UDF
df_final = df_clean.withColumn("words", stem_udf(col("words")))

In [16]:
df_final.show(5)

+--------------------+---------+--------------------+-----+--------+--------------------+--------------------+
|               title|frontpage|         source_text|votes|comments|              tokens|               words|
+--------------------+---------+--------------------+-----+--------+--------------------+--------------------+
|A design for micr...|        0|Millimeter accura...|    1|       0|[millimeter, accu...|[millimet, accur,...|
|Unreal Engine 5.4...|        0|Unreal Engine 5 4...|    1|       0|[unreal, engine, ...|[unreal, engin, 5...|
|Sunlight and Vita...|        1|Sunlight and Vita...|   43|      42|[sunlight, and, v...|[sunlight, vitami...|
|Bryan Caplan on A...|        0|Q A  Bryan Caplan...|    1|       0|[q, a, , bryan, c...|[q, , bryan, capl...|
|Probing single el...|        0|Probing single el...|    1|       0|[probing, single,...|[probe, singl, el...|
+--------------------+---------+--------------------+-----+--------+--------------------+--------------------+
o

### Featurization: method selection

After tokenization and stemming, now we have two methods of featurization: TF-IDF and Topic Modeling. They are chosen because in this text prediction course, we think certain keywords and topics are more engaging. So other methods like Word2Vec are not considered for they might be more useful in sentiment analysis instead of text classification.

#### Topic modeling

In [None]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.clustering import LDA

vectorizer = CountVectorizer(inputCol="words", outputCol="counts")
vectorizer_model = vectorizer.fit(df_final)
df_topic = vectorizer_model.transform(df_final)

# Train the LDA model
num_topics = 5
lda = LDA(k=num_topics, maxIter=10, featuresCol="counts")
lda_model = lda.fit(df_topic)
training_new = lda_model.transform(df_topic)
#Add columns for each topic
for i in range(num_topics):
    extract_topic_for_index_udf = udf(lambda topic_distribution, i=i: float(topic_distribution[i]), DoubleType())
    training_new = training_new.withColumn(f"topic_{i}", extract_topic_for_index_udf(training_new["topicDistribution"]))

#### TF-IDF

In [23]:
# Using TF-IDF
# TF
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
df_tf = hashingTF.transform(df_final)

In [24]:
# IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(df_tf)
df_tf_idf = idfModel.transform(df_tf)

In [27]:
df_tf_idf.show(5)

+--------------------+---------+--------------------+-----+--------+--------------------+--------------------+--------------------+--------------------+
|               title|frontpage|         source_text|votes|comments|              tokens|               words|         rawFeatures|            features|
+--------------------+---------+--------------------+-----+--------+--------------------+--------------------+--------------------+--------------------+
|A design for micr...|        0|Millimeter accura...|    1|       0|[millimeter, accu...|[millimet, accur,...|(262144,[43,440,5...|(262144,[43,440,5...|
|Unreal Engine 5.4...|        0|Unreal Engine 5 4...|    1|       0|[unreal, engine, ...|[unreal, engin, 5...|(262144,[22,30,49...|(262144,[22,30,49...|
|Sunlight and Vita...|        1|Sunlight and Vita...|   43|      42|[sunlight, and, v...|[sunlight, vitami...|(262144,[43,156,1...|(262144,[43,156,1...|
|Bryan Caplan on A...|        0|Q A  Bryan Caplan...|    1|       0|[q, a, , bryan

In [28]:
from pyspark.ml.feature import Normalizer

normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
training_tfidf = normalizer.transform(df_tf_idf)

In [31]:
from pyspark.ml.feature import PCA as PCAml

pca = PCAml(k=10, inputCol="normFeatures", outputCol="pca_features")
pca_model = pca.fit(training_tfidf)
training = pca_model.transform(training_tfidf)

Py4JJavaError: An error occurred while calling o413.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 140.0 failed 1 times, most recent failure: Lost task 0.0 in stage 140.0 (TID 18083) (10.46.203.199 executor driver): java.lang.IllegalArgumentException: axpy only supports adding to a dense vector but got type class org.apache.spark.mllib.linalg.SparseVector.
	at org.apache.spark.mllib.linalg.BLAS$.axpy(BLAS.scala:76)
	at org.apache.spark.mllib.feature.PCA.$anonfun$fit$3(PCA.scala:52)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$SliceIterator.next(Iterator.scala:273)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1492)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1492)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1465)
	at org.apache.spark.rdd.RDD.$anonfun$first$1(RDD.scala:1506)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.first(RDD.scala:1506)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.numCols(RowMatrix.scala:62)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.computePrincipalComponentsAndExplainedVariance(RowMatrix.scala:487)
	at org.apache.spark.mllib.feature.PCA.fit(PCA.scala:65)
	at org.apache.spark.ml.feature.PCA.fit(PCA.scala:93)
	at org.apache.spark.ml.feature.PCA.fit(PCA.scala:64)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: axpy only supports adding to a dense vector but got type class org.apache.spark.mllib.linalg.SparseVector.
	at org.apache.spark.mllib.linalg.BLAS$.axpy(BLAS.scala:76)
	at org.apache.spark.mllib.feature.PCA.$anonfun$fit$3(PCA.scala:52)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$SliceIterator.next(Iterator.scala:273)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1492)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [None]:
training.show(5)

### Reduction on the dimensions (UMAP)

In [26]:
import numpy as np
import umap
from pyspark.ml.feature import StringIndexer, VectorAssembler

ModuleNotFoundError: No module named 'umap'

In [None]:
# Index labels
indexer = StringIndexer(inputCol="frontpage", outputCol="indexedLabel")
final_data = indexer.fit(rescaledData).transform(rescaledData)

# Select the features and convert to a NumPy array
features_array = np.array(assembled_data.select("features").rdd.map(lambda row: row.features.toArray()).collect())

# Apply UMAP
reducer = umap.UMAP(n_components=2, random_state=42)
umap_results = reducer.fit_transform(features_array)

# Convert the UMAP results to a PySpark DataFrame
umap_df = spark.createDataFrame(umap_results.tolist(), ["UMAP1", "UMAP2"])

# Add the UMAP results to the original DataFrame
final_umap_df = assembled_data.withColumn("id", col("indexedLabel")).join(umap_df.withColumn("id", col("UMAP1")), "id").drop("id")

# Show the results
final_umap_df.show()

### Word2Vec

In [18]:
# Word2Vec
from pyspark.ml.feature import Word2Vec

word2Vec = Word2Vec(vectorSize=5, minCount=0, inputCol="words", outputCol="features")
model = word2Vec.fit(df_final)
df_wvec = model.transform(df_final)

In [19]:
df_wvec.show(5)

+--------------------+---------+--------------------+-----+--------+--------------------+--------------------+--------------------+
|               title|frontpage|         source_text|votes|comments|              tokens|               words|            features|
+--------------------+---------+--------------------+-----+--------+--------------------+--------------------+--------------------+
|A design for micr...|        0|Millimeter accura...|    1|       0|[millimeter, accu...|[millimet, accur,...|[-0.1366724245900...|
|Unreal Engine 5.4...|        0|Unreal Engine 5 4...|    1|       0|[unreal, engine, ...|[unreal, engin, 5...|[-0.0154509230286...|
|Sunlight and Vita...|        1|Sunlight and Vita...|   43|      42|[sunlight, and, v...|[sunlight, vitami...|[-0.2339610227476...|
|Bryan Caplan on A...|        0|Q A  Bryan Caplan...|    1|       0|[q, a, , bryan, c...|[q, , bryan, capl...|[-0.2994003929908...|
|Probing single el...|        0|Probing single el...|    1|       0|[probing

### Normalization

In [None]:
from pyspark.ml.feature import Normalizer

normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
training = normalizer.transform(df_wvec)

### Train/test data split

In [25]:
# Split the data into train and test
splits = df_wvec.randomSplit([0.8, 0.2], 1234)
train = splits[0]
test = splits[1]

ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

In [None]:
train = train.drop('title','tokens','words','source_text','votes','comments')
test = test.drop('title','tokens','words','source_text','votes','comments')

In [22]:
train.show(5)

+---------+--------------------+
|frontpage|            features|
+---------+--------------------+
|        0|[-0.2649321264992...|
|        0|[-0.2285036979353...|
|        0|[-0.2318851084571...|
|        0|[-0.1040695888514...|
|        0|[-0.2843592267971...|
+---------+--------------------+
only showing top 5 rows



### Oversampling on train set

In [None]:
print('Count of negative cases:', train.select('frontpage').where(train.frontpage==0).count())
print('Count of positive cases:', train.select('frontpage').where(train.frontpage==1).count())

Positive cases are much fewer that models could be unable to fully learn the patterns. To solve class imbalance and to consider the storage of Spark, undersampling is used.

In [None]:
# Create undersampling function
def undersample_majority(df, ratio=1):
    '''
    ratio is the ratio of majority to minority
    Eg. ratio 1 is equivalent to majority:minority = 1:1
    ratio 5 is equivalent to majority:minority = 5:1
    '''
    minority_count = df.filter('frontpage')==1).count()
    whole_count = df.count()
    undersampled_majority = df.filter('frontpage'==0)\
                                .sample(withReplacement=False, fraction=(ratio*minority_count/whole_count),seed=42)
    undersampled_df = df.filter('frontpage'==1).union(undersampled_majority)
    
    return undersampled_df

In [None]:
train_final = undersampled_df_minority(train, ratio=1)

In [None]:
train_final.show(5)

In [None]:
print('Count of negative cases:', training_final.select('frontpage').where(training_final.frontpage==0).count())
print('Count of positive cases:', training_final.select('frontpage').where(training_final.frontpage==1).count())

## Model training and evaluation

### Model 1: Naive Bayes

In [24]:
# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", featuresCol='features', labelCol='frontpage')

# train the model
nbm = nb.fit(train)

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 50254)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "C:\Users\Admin\Desktop\spark\spark-3.5.1-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Admin\anaconda3\Lib\socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\Admin\Desktop\spark\spark-3.5.1-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^

ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

In [None]:
# select example rows to display.
predictions = nbm.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="frontpage", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

### Model 2: Logistics Regression

In [52]:
# Save the model to the local file, please define own local directory here

model_path = 'C:/Users/Admin/Advanced Analytics for Bid Data World/Assignment 3/models/naive_bayes'
naive_bayes = nbm.save(model_path)

## Model deployment

There are two goals:
(1) save the model;
(2) preprocessing the incoming message.
In the model deployment the preprocessing and tokenization steps are basically repeated to featurize the incoming stream data so that the model could understand the data. However, for all the tools we did not refit them in combination of the new data and the original data set. The new data is decomposited using the tools fitted on the original data set only.

In [39]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [40]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType, FloatType

In [41]:
from pyspark.ml.classification import NaiveBayesModel

In [53]:
globals()['models_loaded'] = False
globals()['my_model'] = None

# Define the prediction function
#def predict(df):
    #return globals()['my_model'].transform(df)

#predict_udf = udf(predict, FloatType())

# The final function
def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df_stream = spark.read.json(rdd)
    #df_stream.show()
    
    # Remove punctuations
    df_punc_drop_stream = df_stream.withColumn('source_text', regexp_replace(df_stream.source_text, '[^a-zA-Z0-9]', ' '))
    
    # Transformed with tokens
    df_token_stream = Tokenizer(inputCol="source_text", outputCol="tokens").transform(df_punc_drop_stream)
    #df_token_stream.show()
    
    # Remove stopwords
    df_clean_stream = stopwords.transform(df_token_stream)
    #df_clean_stream.show()
    
    # Apply Word2Vec
    #result_stream = word_embed.transform(df_clean_stream)

    # Apply TF-IDF
    featurizedData_stream = hashingTF.transform(df_clean_stream)
    rescaledData_stream = idfModel.transform(featurizedData_stream)
    
    # Finalized the training data
    training_stream = rescaledData_stream.drop('source_text','tokens','words')
    #training_stream.show()

    # Make predictions with the selected model
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = NaiveBayesModel.load(model_path)
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model (uncomment below):
    
    df_result = globals()['my_model'].transform(training_stream)
    df_result_output = df_result.drop('rawFeatures','words','tokens','features','rawPrediction')
    df_result_output.show()

## Streaming prediction

After defining the function, now the socket is connected. With each batch of data comming in, there is a prediction accordingly. Both probabilities and predicted classification are shown at the end of each entry. As seen from the result, most entries without appearing on the frontpage are predicted correctly. For some entries with appearing on the frontpage, the model could give a correct prediction as well.

In [54]:
ssc = StreamingContext(sc, 10)

In [55]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [56]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

+--------+--------+--------------+---------+-------------------+--------------------+--------------------+--------------------+-----------+-----+--------------------+--------------------+----------+
|     aid|comments|        domain|frontpage|          posted_at|        source_title|               title|                 url|       user|votes|       rawPrediction|         probability|prediction|
+--------+--------+--------------+---------+-------------------+--------------------+--------------------+--------------------+-----------+-----+--------------------+--------------------+----------+
|40436229|       0|bisqwit.iki.fi|     true|2024-05-22 01:04:02|Rockman / Megaman...|Rockman / Mega Ma...|https://bisqwit.i...|JojoFatsani|    5|[-3751.5663977685...|[1.0,4.2246778939...|       0.0|
+--------+--------+--------------+---------+-------------------+--------------------+--------------------+--------------------+-----------+-----+--------------------+--------------------+----------+

+---

In [57]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
+--------+--------+--------------------+---------+-------------------+--------------------+--------------------+--------------------+--------+-----+--------------------+--------------------+----------+
|     aid|comments|              domain|frontpage|          posted_at|        source_title|               title|                 url|    user|votes|       rawPrediction|         probability|prediction|
+--------+--------+--------------------+---------+-------------------+--------------------+--------------------+--------------------+--------+-----+--------------------+--------------------+----------+
|40436625|       0|twitter.com/apple...|     true|2024-05-22 02:06:29|                   X|Meta plans to not...|https://twitter.c...|    ad8e|    6|[-664.17737877334...|[0.08005604942408...|       1.0|
|40436631|       0|        politico.com|    false|2024-05-22 02:06:54|    Just a moment...|Hemp and Marijuan...|https://www.polit...|DocFein