## Predicting with Spark

As given in the example notebook, we used the same outline. However, we added preprocessing and defined the schema again since we need to  convert the new data into processable data. In addition we defined the stemmer again since saved pipelines cannot bring stemmer with loading them. 

As stated in the example, we load our models with globals() method. Then for process function, we implemented as follows: 
•	Pre-process —> To convert data to lowercase, applying trimming, regexreplace.
•	Pipeline 1 —> StopWordsRemover + RegexTokenizer
•	Stemmer
•	Pipeline 2 —> HashingTF + IDF
•	Predicting with Random Forest or Logistic Regression as defined earlier. 


In [1]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [None]:
sc

In [None]:
spark

In [2]:
import random
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import udf, struct, array, col, lit, lower, trim, regexp_replace, when
from pyspark.ml.feature import StopWordsRemover, VectorAssembler
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, HashingTF, IDF
from nltk.stem.snowball import SnowballStemmer
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, BooleanType, ArrayType
from pyspark.ml.classification import LinearSVCModel, RandomForestClassificationModel

Since the new data came into the stream and has a fixed structure, a schema is defined. In addition to that, since the new data should be processable, the preprocessing function is defined which is similar to one that is made in the Assignment3Process.ipynb. With preprocessing, the new data is converted into lowercase, and trim and regexp_replace are employed. The "Frontpage" feature is converted to binary 0-1 and if there are any missing values, we treat them by dropping each. 

In [3]:
schema = StructType([
    StructField("aid", StringType()),
    StructField("title", StringType()),
    StructField("url", StringType()),
    StructField("domain", StringType()),
    StructField("votes", IntegerType()),
    StructField("user", StringType()),
    StructField("posted_at", TimestampType()),
    StructField("comments", IntegerType()),
    StructField("source_title", StringType()),
    StructField("source_text", StringType()),
    StructField("frontpage", BooleanType())
])


def preprocess(df):
    print("Checking for duplicates")
    
    if "source_text" in df.columns:
        df = df.withColumn("source_text", lower(col("source_text")))
        df = df.withColumn("source_text", trim(col("source_text")))
        df = df.withColumn("source_text", regexp_replace(col("source_text"), "[^a-zA-Z0-9,.!? ]", ""))
    if "source_title" in df.columns:
        df = df.withColumn("source_title", lower(col("source_title")))
        df = df.withColumn("source_title", trim(col("source_title")))
        df = df.withColumn("source_title", regexp_replace(col("source_title"), "[^a-zA-Z0-9,.!? ]", ""))
    if "title" in df.columns:
        df = df.withColumn("title", lower(col("title")))
        df = df.withColumn("title", trim(col("title")))
        df = df.withColumn("title", regexp_replace(col("title"), "[^a-zA-Z0-9,.!? ]", ""))
    if 'frontpage' in df.columns:
        df = df.withColumn('frontpage', when(df['frontpage'] == True, 1).otherwise(0))
    df = df.dropna(how='any')
    if "aid" in df.columns and "url" in df.columns:
        df = df.drop("aid", "url")
    return df

#Stemmer 

stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))

After defining preprocessing and stemming function, we can load our models. By using the predefined code within the spark notebooks, we can define a process which will conduct following steps: 

First, it will load new data that will come into the stream, then its going to implement the first pipeline with using ".transform". Following that, stemming will be applied and the second pipeline will be conducted afterwards. Finally, the model generated and saved in the process.ipynb will be used to predict incoming new data.

In [4]:
globals()['models_loaded'] = False
globals()['my_model'] = None
globals()['pipeline_'] = None
globals()['pipeline1_'] = None


def process(time, rdd):
    if rdd.isEmpty(): 
        return 
    
    print("========= %s =========" % str(time))
    
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = RandomForestClassificationModel.load("/Users/umutkurt/Downloads/spark/pipe&model/randomforestmodel")
        globals()['pipeline1_'] = PipelineModel.load("/Users/umutkurt/Downloads/spark/pipe&model/pipeline1")
        globals()['pipeline_'] = PipelineModel.load("/Users/umutkurt/Downloads/spark/pipe&model/pipeline")
        globals()['models_loaded'] = True
    
    # Convert to data frame
    df = spark.read.json(rdd)
    
    new_df = preprocess(df)

    
    new_df_p1 = globals()['pipeline1_'].transform(new_df)

    new_df_stem = new_df_p1.withColumn("stem_words", stemmer_udf(col("words")))

    df_ = globals()['pipeline_'].transform(new_df_stem)

    predictions = globals()['my_model'].transform(df_)

    columns_to_show = ["comments", "domain", "posted_at", "source_text", "source_title", "title", "user", "votes", "frontpage", "probability"]
    predictions.select(columns_to_show).show()

   

In [5]:
ssc = StreamingContext(sc, 10)



In [6]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [7]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

24/05/25 17:59:31 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
24/05/25 17:59:31 WARN BlockManager: Block input-0-1716652771200 replicated to only 0 peer(s) instead of 1 peers
24/05/25 17:59:35 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
24/05/25 17:59:35 WARN BlockManager: Block input-0-1716652775200 replicated to only 0 peer(s) instead of 1 peers
24/05/25 17:59:38 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
24/05/25 17:59:38 WARN BlockManager: Block input-0-1716652778400 replicated to only 0 peer(s) instead of 1 peers
24/05/25 17:59:39 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
24/05/25 17:59:39 WARN BlockManager: Block input-0-1716652779400 replicated to only 0 peer(s) instead of 1 peers
24/05/25 17:59:40 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
24/05/25 17:59:40 WARN BlockManager: Block input-0-1716652780400 replicated to



24/05/25 17:59:43 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
24/05/25 17:59:43 WARN BlockManager: Block input-0-1716652783200 replicated to only 0 peer(s) instead of 1 peers
24/05/25 17:59:47 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
24/05/25 17:59:47 WARN BlockManager: Block input-0-1716652787600 replicated to only 0 peer(s) instead of 1 peers
24/05/25 17:59:49 WARN StopWordsRemover: Default locale set was [en_TR]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
24/05/25 17:59:49 WARN StopWordsRemover: Default locale set was [en_TR]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
24/05/25 17:59:49 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
24/05/25 17:59:49 WARN BlockManager: Block input-0-1716652789400 r

Checking for duplicates


24/05/25 17:59:56 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
24/05/25 17:59:56 WARN BlockManager: Block input-0-1716652796400 replicated to only 0 peer(s) instead of 1 peers
                                                                                

+--------+---------------+-------------------+--------------------+--------------------+--------------------+------------+-----+---------+--------------------+
|comments|         domain|          posted_at|         source_text|        source_title|               title|        user|votes|frontpage|         probability|
+--------+---------------+-------------------+--------------------+--------------------+--------------------+------------+-----+---------+--------------------+
|       0|      arxiv.org|2024-05-25 03:59:51|2201.03545 a conv...|a convnet for the...|a convnet for the...|      laybak|    1|        0|[0.82969814340329...|
|       0|breckyunits.com|2024-05-25 04:01:32|abort bars  abort...|          abort bars|abort bars a sugg...| thunderbong|    6|        1|[0.70796434224812...|
|       0|        wsj.com|2024-05-25 04:04:00|             wsj.com|             wsj.com|behind the scenes...|   fortran77|    3|        0|[0.83630282909688...|
|       0|        jxnl.co|2024-05-25 04:

                                                                                

Checking for duplicates


24/05/25 18:00:00 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
24/05/25 18:00:00 WARN BlockManager: Block input-0-1716652800400 replicated to only 0 peer(s) instead of 1 peers
                                                                                

+--------+------------+-------------------+--------------------+--------------------+----------------+--------------+-----+---------+--------------------+
|comments|      domain|          posted_at|         source_text|        source_title|           title|          user|votes|frontpage|         probability|
+--------+------------+-------------------+--------------------+--------------------+----------------+--------------+-----+---------+--------------------+
|       0|   tatem.com|2024-05-25 04:12:00|smart email built...|smart email built...|           tatem|handfuloflight|    3|        0|[0.83630282909688...|
|       0|     read.cv|2024-05-25 04:19:06|explore   front p...|             explore|         read.cv|handfuloflight|    2|        0|[0.87234476880854...|
|       0|    charm.sh|2024-05-25 04:20:24|charmcharmwe make...|               charm|           charm|handfuloflight|    2|        0|[0.87766086771230...|
|       0|michelin.com|2024-05-25 04:21:32|what is...wok hei...|  what

24/05/25 18:00:04 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
24/05/25 18:00:04 WARN BlockManager: Block input-0-1716652804400 replicated to only 0 peer(s) instead of 1 peers
                                                                                

+--------+------------+-------------------+--------------------+--------------------+--------------------+---------+-----+---------+--------------------+
|comments|      domain|          posted_at|         source_text|        source_title|               title|     user|votes|frontpage|         probability|
+--------+------------+-------------------+--------------------+--------------------+--------------------+---------+-----+---------+--------------------+
|       0|deadline.com|2024-05-25 04:21:52|kabosu dead belov...|kabosu dies belov...|kabosu dies belov...|     qp11|    1|        0|[0.88180397100250...|
|       0|   webix.com|2024-05-25 04:22:45|webix javascript ...|webix javascript ...|               webix|banish-m4|    1|        0|[0.87234476880854...|
+--------+------------+-------------------+--------------------+--------------------+--------------------+---------+-----+---------+--------------------+



In [8]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----


24/05/25 18:00:05 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
	at java.base/java.net.SocketInputStream.socketRead0(Native Method)
	at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:168)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
	at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181)
	at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
	at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
	at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.getNext(SocketInputDStream.scala:121)
	at org.a

Checking for duplicates
+--------+-------------+-------------------+--------------------+--------------------+--------------------+--------------+-----+---------+--------------------+
|comments|       domain|          posted_at|         source_text|        source_title|               title|          user|votes|frontpage|         probability|
+--------+-------------+-------------------+--------------------+--------------------+--------------------+--------------+-----+---------+--------------------+
|       0|ajroach42.com|2024-05-25 04:24:55|community softwar...|community softwar...|community softwar...|    shadowgovt|    1|        0|[0.87234476880854...|
|       0|  hacknote.co|2024-05-25 04:25:37|hacknote a smart ...|            hacknote|            hacknote|handfuloflight|    6|        1|[0.70796434224812...|
+--------+-------------+-------------------+--------------------+--------------------+--------------------+--------------+-----+---------+--------------------+

