In [48]:
import string
import nltk

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from nltk.stem.snowball import SnowballStemmer
from sklearn.feature_extraction import text

from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit, when
from pyspark.sql.types import StringType
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)


In [49]:
sc

In [50]:
spark

In [51]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [52]:
def clean_text (text) :
    words_List = nltk.word_tokenize(text)
    final_list = [elto for elto in words_List if elto not in STOP_WORDS]
    return " ".join(final_list)

In [53]:
def preprocess(df):
    # Notice that we want Sleep = SLEEP = SlEEp = sleeP ETC
    df.loc[:, 'message'] = df.loc[:, 'message'].str.lower()

    # Drop NaN values
    df.dropna(inplace=True, subset=['channel', 'message'])

    # Remove words like: can, could, will, been, would...
    df.loc[:, 'message'] = df.loc[:, 'message'].apply(clean_text)

    # stem separate words
    stemmer = SnowballStemmer("english")
    df.loc[:, 'message'] = df.loc[:, 'message'].astype(str).str.split()
    df.loc[:, 'message'] = df.loc[:, 'message'].apply(lambda x: [stemmer.stem(word) for word in x])

    # Remove rows with empty messages
    df = df[df['message'].astype(bool)]

    # Rejoin list of messages to single string message separated by <space>
    df.loc[:, 'message'] = df.loc[:, 'message'].apply(lambda x: ' '.join(x))

    df.rename(columns={'channel': 'label'}, inplace=True)

    final_df = df.loc[:, ['message', 'label']]

    return final_df

In [54]:
MODELS_PATH = 'models\\'
MODEL = 'multinomialNB'
STOP = text.ENGLISH_STOP_WORDS
STOP_WORDS = list(STOP) + list(string.punctuation)

mapping = {0:'#iitztimmy', 1:'#pgl'}

globals()['my_model'] = NaiveBayesModel.load(MODELS_PATH+MODEL)
globals()['models_loaded'] = True

In [55]:
# globals()['models_loaded'] = False
# globals()['my_model'] = None

# Toy predict function. Normally you'd use your loaded globals()['my_model'] here
# def predict(df):
#     predictions = globals()['my_model'].transform(df)
#     # predictions = predictions.withColumn('prediction', 
#     #                     when(col('prediction') == 0, lit(mapping[0])).otherwise(lit(mapping[1])))
    
#     predictions.show()
#     return predictions.prediction

# predict_udf = udf(predict, StringType())

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to PySpark DataFrame
    df = spark.read.json(rdd)
    
    # Convert to Pandas DataFrame for preprocessing
    df_pandas = df.toPandas()
    df_pandas = preprocess(df_pandas)
    # Reconvert to PySpark DataFrame (I'm sure there is a better way to do this)
    df = spark.createDataFrame(df_pandas)

    # break the sentence into a list of words
    tokenizer = Tokenizer(inputCol="message", outputCol="words")
    words_data = tokenizer.transform(df)

    # TF section
    hashing_TF = HashingTF(inputCol='words', outputCol='rawFeatures', numFeatures=2000)
    featurized_data = hashing_TF.transform(words_data)

    # IDF section
    idf = IDF(inputCol='rawFeatures', outputCol='features')
    idf_model = idf.fit(featurized_data)

    rescaled_data = idf_model.transform(featurized_data)
    
    # rescaled_data.show()

    # Utilize our predict function
    # df_withpreds = df.withColumn("pred", predict_udf(
    #     struct([df[x] for x in df.columns])
    # ))
    # df_withpreds.show()
    
    # Normally, you wouldn't use a UDF (User Defined Function) Python function to predict as we did here (you can)
    # but an MLlib model you've built and saved with Spark
    # In this case, you need to prevent loading your model in every call to "process" as follows:
    
    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = NaiveBayesModel.load(MODELS_PATH+MODEL)
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model: 
    df_result = globals()['my_model'].transform(rescaled_data)
    df_result = df_result.withColumn('prediction', 
        when(col('prediction') == 0, lit(mapping[0])).otherwise(lit(mapping[1])))
    
    df_result.select(['message', 'label', 'probability', 'prediction']).show()

In [56]:
print(globals()['my_model'])

NaiveBayesModel: uid=NaiveBayes_af335a176991, modelType=multinomial, numClasses=2, numFeatures=2000


In [57]:
ssc = StreamingContext(sc, 10)

In [58]:
lines = ssc.socketTextStream("localhost", 8080)
lines.foreachRDD(process)

In [59]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

In [62]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----


In [61]:
# print('completed')

+-------+-----+--------------------+----------+
|message|label|         probability|prediction|
+-------+-----+--------------------+----------+
|comment| #pgl|[0.49664443605553...|      #pgl|
+-------+-----+--------------------+----------+

+--------------------+-----+--------------------+----------+
|             message|label|         probability|prediction|
+--------------------+-----+--------------------+----------+
|       caus 's rerun| #pgl|[0.93701013694104...|#iitztimmy|
|secretlab offici ...| #pgl|[8.52557473675622...|      #pgl|
+--------------------+-----+--------------------+----------+

+--------------------+-----+--------------------+----------+
|             message|label|         probability|prediction|
+--------------------+-----+--------------------+----------+
|hahah ’ s photo s...| #pgl|[0.49664443605553...|      #pgl|
+--------------------+-----+--------------------+----------+



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.loc[:, 'message'] = df.loc[:, 'message'].apply(lambda x: ' '.join(x))
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.rename(columns={'channel': 'label'}, inplace=True)


+---------------+-----+--------------------+----------+
|        message|label|         probability|prediction|
+---------------+-----+--------------------+----------+
|won navi heroic| #pgl|[0.49664443605553...|      #pgl|
+---------------+-----+--------------------+----------+

+--------------------+-----+--------------------+----------+
|             message|label|         probability|prediction|
+--------------------+-----+--------------------+----------+
|doritoschip dorit...| #pgl|[0.49664443605553...|      #pgl|
+--------------------+-----+--------------------+----------+

+--------------------+-----+--------------------+----------+
|             message|label|         probability|prediction|
+--------------------+-----+--------------------+----------+
|snoopidoopi1 navi...| #pgl|[0.87153351925149...|#iitztimmy|
|                navi| #pgl|[0.49664443605553...|      #pgl|
+--------------------+-----+--------------------+----------+

+--------------------+-----+------------------

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.loc[:, 'message'] = df.loc[:, 'message'].apply(lambda x: ' '.join(x))
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.rename(columns={'channel': 'label'}, inplace=True)


+--------------------+-----+--------------------+----------+
|             message|label|         probability|prediction|
+--------------------+-----+--------------------+----------+
|ramagari wonder e...| #pgl|[0.49664443605553...|      #pgl|
+--------------------+-----+--------------------+----------+

+--------+-----+--------------------+----------+
| message|label|         probability|prediction|
+--------+-----+--------------------+----------+
|wall lol| #pgl|[0.49664443605553...|      #pgl|
+--------+-----+--------------------+----------+

+-------+-----+--------------------+----------+
|message|label|         probability|prediction|
+-------+-----+--------------------+----------+
|     xd| #pgl|[0.49664443605553...|      #pgl|
+-------+-----+--------------------+----------+

+--------------------+----------+--------------------+----------+
|             message|     label|         probability|prediction|
+--------------------+----------+--------------------+----------+
|   did d

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.rename(columns={'channel': 'label'}, inplace=True)
Exception in thread Thread-15:
Traceback (most recent call last):
  File "C:\Users\Nikos\AppData\Local\Programs\Python\Python310\lib\threading.py", line 1009, in _bootstrap_inner
    self.run()
  File "C:\Users\Nikos\AppData\Local\Temp\ipykernel_17268\265281298.py", line 11, in run
  File "C:\Users\Nikos\Desktop\analytics_project\analytics_assignment_3\spark-3.2.1-bin-hadoop2.7\python\pyspark\streaming\context.py", line 200, in awaitTermination
    self._jssc.awaitTermination()
  File "C:\Users\Nikos\Desktop\analytics_project\analytics_assignment_3\spark-3.2.1-bin-hadoop2.7\python\lib\py4j-0.10.9.3-src.zip\py4j\java_gateway.py", line 1321, in __call__
  File "C:\Users\Nikos\Desktop\analytics_project\analytics_assignment_3\spark-

+--------+-----+--------------------+----------+
| message|label|         probability|prediction|
+--------+-----+--------------------+----------+
|hard4enc| #pgl|[0.96408527998699...|#iitztimmy|
|   round| #pgl|[0.05513107247989...|      #pgl|
+--------+-----+--------------------+----------+

+-------+-----+--------------------+----------+
|message|label|         probability|prediction|
+-------+-----+--------------------+----------+
| f0rest| #pgl|[0.49664443605553...|      #pgl|
+-------+-----+--------------------+----------+

+---------------+-----+--------------------+----------+
|        message|label|         probability|prediction|
+---------------+-----+--------------------+----------+
|ramagari forest| #pgl|[0.49664443605553...|      #pgl|
+---------------+-----+--------------------+----------+

+-------+-----+--------------------+----------+
|message|label|         probability|prediction|
+-------+-----+--------------------+----------+
|   wood| #pgl|[0.49664443605553...|   