# Assignment 3: streaming analytics on text data

## Read all files and import RDD

The first step is to introduce saved stories. For sake of model performance, dataframe used is Saprk Dataframe instead of RDD or Pandas Dataframe.

In [165]:
sc

In [166]:
spark

In [340]:
# Read through all the subdirectories saved
df = spark.read.json("C:/Users/Admin/Advanced Analytics for Bid Data World/Assignment 3/saved_stories/*")

# Show top 20 rows to observe whether it is correctly formatted and count
df.show()
df.count()

+--------+--------+--------------------+---------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------+-----+
|     aid|comments|              domain|frontpage|          posted_at|         source_text|        source_title|               title|                 url|          user|votes|
+--------+--------+--------------------+---------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------+-----+
|39988880|       0|themeasureofaplan...|    false|2024-04-10 09:57:10|Moonshine Money: ...|Moonshine Money: ...|Moonshine Money: ...|https://themeasur...|getToTheChopin|    3|
|39988889|       0|    scitechdaily.com|    false|2024-04-10 09:58:28|New Jurassic Foss...|New Jurassic Foss...|New Jurassic Foss...|https://scitechda...|    isaacfrond|    1|
|39988912|       0|           proton.me|    false|2024-04-10 10:01:14|Proton and Standa...|Proton and Standa...|Proton a

1346

Transaformations on the Spark Dataframe is executed for model training.

In [341]:
# Remove duplicate rows and count
df = df.dropDuplicates()
df.count()

792

In [342]:
# Keep necessary columns ('source_title', 'frontpage') and show the transformed dataframe for check
from pyspark.sql.functions import col

df = df.select(col('source_text'), col('frontpage'))
df.show()

+--------------------+---------+
|         source_text|frontpage|
+--------------------+---------+
|Chess therapy - W...|     true|
|Will the debate a...|    false|
|How to remember t...|    false|
|GitHub - Miscella...|    false|
|PFAS: EPA's new r...|     true|
|[2404.05961] LLM2...|    false|
|Quando Aggiorname...|    false|
|Brazil's Twitter ...|     true|
|No Substitute for...|    false|
|Moore's Law for E...|    false|
|Why workplaces sh...|    false|
|SEOperate: Notion...|    false|
|From bug detectio...|    false|
|Why Can't My Mom ...|     true|
|Gentoo Linux beco...|    false|
|AI Song Cover Gen...|    false|
|ClassroomIO\n\nLo...|    false|
|Clojure's slow st...|    false|
|Frontiers | Seism...|    false|
|Run-time Polymorp...|    false|
+--------------------+---------+
only showing top 20 rows



In [343]:
# Missing values check: 2 types could be viewed as missing values, then count
# Type 1: Page not found
# Type 2: NULL
df = df.where(df.source_text != 'Page not found')
df.dropna()
df.count()

791

In [344]:
# Encode the label column 'frontpage' and show it to verify
from pyspark.sql.functions import when

df = df.withColumn('frontpage', when(df.frontpage==True, 1).otherwise(0))
df.show()

+--------------------+---------+
|         source_text|frontpage|
+--------------------+---------+
|Chess therapy - W...|        1|
|Will the debate a...|        0|
|How to remember t...|        0|
|GitHub - Miscella...|        0|
|PFAS: EPA's new r...|        1|
|[2404.05961] LLM2...|        0|
|Quando Aggiorname...|        0|
|Brazil's Twitter ...|        1|
|No Substitute for...|        0|
|Moore's Law for E...|        0|
|Why workplaces sh...|        0|
|SEOperate: Notion...|        0|
|From bug detectio...|        0|
|Why Can't My Mom ...|        1|
|Gentoo Linux beco...|        0|
|AI Song Cover Gen...|        0|
|ClassroomIO\n\nLo...|        0|
|Clojure's slow st...|        0|
|Frontiers | Seism...|        0|
|Run-time Polymorp...|        0|
+--------------------+---------+
only showing top 20 rows



In [369]:
# For text, remove the punctuations ('/"/,/./:/-/?/!/:/|/[/])
from pyspark.sql.functions import *

df_punc_drop = df.withColumn('source_text', regexp_replace(df.source_text, '[^a-zA-Z0-9]', ' '))
df_punc_drop.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [370]:
# For text, make every word in lowercase
from pyspark.ml.feature import Tokenizer

training = Tokenizer(inputCol="source_text", outputCol="tokens").transform(df_punc_drop)
training.show()

+--------------------+---------+--------------------+
|         source_text|frontpage|              tokens|
+--------------------+---------+--------------------+
|Chess therapy   W...|        1|[chess, therapy, ...|
|Will the debate a...|        0|[will, the, debat...|
|How to remember t...|        0|[how, to, remembe...|
|GitHub   Miscella...|        0|[github, , , misc...|
|PFAS  EPA s new r...|        1|[pfas, , epa, s, ...|
| 2404 05961  LLM2...|        0|[, 2404, 05961, ,...|
|Quando Aggiorname...|        0|[quando, aggiorna...|
|Brazil s Twitter ...|        1|[brazil, s, twitt...|
|No Substitute for...|        0|[no, substitute, ...|
|Moore s Law for E...|        0|[moore, s, law, f...|
|Why workplaces sh...|        0|[why, workplaces,...|
|SEOperate  Notion...|        0|[seoperate, , not...|
|From bug detectio...|        0|[from, bug, detec...|
|Why Can t My Mom ...|        1|[why, can, t, my,...|
|Gentoo Linux beco...|        0|[gentoo, linux, b...|
|AI Song Cover Gen...|      

In [365]:
# For text, remove stop words (a/an/the/then/and...)
from pyspark.ml.feature import StopWordsRemover

stopwords = StopWordsRemover()
stopwords.getStopWords()

['i',
 'me',
 'my',
 'myself',
 'we',
 'our',
 'ours',
 'ourselves',
 'you',
 'your',
 'yours',
 'yourself',
 'yourselves',
 'he',
 'him',
 'his',
 'himself',
 'she',
 'her',
 'hers',
 'herself',
 'it',
 'its',
 'itself',
 'they',
 'them',
 'their',
 'theirs',
 'themselves',
 'what',
 'which',
 'who',
 'whom',
 'this',
 'that',
 'these',
 'those',
 'am',
 'is',
 'are',
 'was',
 'were',
 'be',
 'been',
 'being',
 'have',
 'has',
 'had',
 'having',
 'do',
 'does',
 'did',
 'doing',
 'a',
 'an',
 'the',
 'and',
 'but',
 'if',
 'or',
 'because',
 'as',
 'until',
 'while',
 'of',
 'at',
 'by',
 'for',
 'with',
 'about',
 'against',
 'between',
 'into',
 'through',
 'during',
 'before',
 'after',
 'above',
 'below',
 'to',
 'from',
 'up',
 'down',
 'in',
 'out',
 'on',
 'off',
 'over',
 'under',
 'again',
 'further',
 'then',
 'once',
 'here',
 'there',
 'when',
 'where',
 'why',
 'how',
 'all',
 'any',
 'both',
 'each',
 'few',
 'more',
 'most',
 'other',
 'some',
 'such',
 'no',
 'nor',
 '

In [371]:
stopwords = stopwords.setInputCol('tokens').setOutputCol('words')
training_new = stopwords.transform(training)
training_new.show()

+--------------------+---------+--------------------+--------------------+
|         source_text|frontpage|              tokens|               words|
+--------------------+---------+--------------------+--------------------+
|Chess therapy   W...|        1|[chess, therapy, ...|[chess, therapy, ...|
|Will the debate a...|        0|[will, the, debat...|[debate, , psi, ,...|
|How to remember t...|        0|[how, to, remembe...|[remember, differ...|
|GitHub   Miscella...|        0|[github, , , misc...|[github, , , misc...|
|PFAS  EPA s new r...|        1|[pfas, , epa, s, ...|[pfas, , epa, new...|
| 2404 05961  LLM2...|        0|[, 2404, 05961, ,...|[, 2404, 05961, ,...|
|Quando Aggiorname...|        0|[quando, aggiorna...|[quando, aggiorna...|
|Brazil s Twitter ...|        1|[brazil, s, twitt...|[brazil, twitter,...|
|No Substitute for...|        0|[no, substitute, ...|[substitute, vict...|
|Moore s Law for E...|        0|[moore, s, law, f...|[moore, law, ever...|
|Why workplaces sh...|   

In [None]:
# For text, make each token(word) as a feature and get a tabular training set
from pyspark.ml.feature import HashingTF




## Feature extraction

In [None]:
# Use word2vec



## Model training and evaluation

In [None]:
# Introduce a Bayes Naive model (classification)



## Deploy the model

In [2]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [5]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType

In [6]:
globals()['models_loaded'] = False
globals()['my_model'] = None

# Toy predict function that returns a random probability. Normally you'd use your loaded globals()['my_model'] here
def predict(df):
    return random.random()

predict_udf = udf(predict, StringType())

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    
    # Utilize our predict function
    df_withpreds = df.withColumn("pred", predict_udf(
        struct([df[x] for x in df.columns])
    ))
    df_withpreds.show()
    
    # Normally, you wouldn't use a UDF (User Defined Function) Python function to predict as we did here (you can)
    # but an MLlib model you've built and saved with Spark
    # In this case, you need to prevent loading your model in every call to "process" as follows:
    
    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = '***' # Replace '***' with e.g.:    [...].load('my_logistic_regression')
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model (uncomment below):
    
    # df_result = globals()['my_model'].transform(df)
    # df_result.show()

In [None]:
ssc = StreamingContext(sc, 10)

In [None]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [None]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

In [None]:
ssc_t.stop()