This code sets up a Spark Streaming context, processes the streamed data, and then trains a logistic regression model on the processed data.

In [None]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
        
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
        
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)


from pyspark.streaming import StreamingContext

# Now we make a streaming thread and save the RDD files to a specified PATH variable.
ssc = StreamingContext(sc, 30)
ssc_t = StreamingThread(ssc)
ssc_t.start()
ssc_t.stop()

lines = ssc.socketTextStream("localhost", 8080)
PATH = ...
lines.saveAsTextFiles(PATH)

# 3.3 Reading in data and training model

import pandas as pd
import pyspark.sql.functions as F
import string

# Constructing the data frame
df = pd.DataFrame([])
for path, dir, files in os.walk(PATH):
    for file in files:
        if file.startswith('part'):
            subsubfiles = os.path.join(path, file)
            read = spark.read.json(subsubfiles)
            pandasDF = read.toPandas()
            df = pd.concat([df, pandasDF])

df["messagesplit"] = df["message"].str.lower().str.split(' ')
df["label"] = (df["channel"] == "#jinnytty").astype(float)

# Removing punctuation
table = str.maketrans('', '', string.punctuation)
df_messagesplit2 = [list(map(lambda w: w.translate(table), comment)) for comment in df["messagesplit"]]
df["messagesplit"] = df_messagesplit2

from pyspark.sql import SQLContext, SparkSession
sc2 = SparkSession.builder.getOrCreate()
spark_df = sc2.createDataFrame(df)

# Splitting the dataset
training, test = spark_df.randomSplit([0.8, 0.2], seed=4)

# Data preprocessing and training with logistic regression
from pyspark.ml.feature import Word2Vec, VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

word2Vec = Word2Vec(vectorSize=300, minCount=5, inputCol="messagesplit", outputCol="result")
stringIndexer = StringIndexer(inputCol="username", outputCol="username_StringIndexer", handleInvalid="skip")
ohe = OneHotEncoder(inputCols=["username_StringIndexer"], outputCols=["username_OneHotEncoder"])
assemblerInput = ["result", "username_OneHotEncoder"]
vector_assembler = VectorAssembler(inputCols=assemblerInput, outputCol="VectorAssembler_Features")

stages = [word2Vec, stringIndexer, ohe, vector_assembler]
pipeline = Pipeline().setStages(stages)
model = pipeline.fit(training)
training_df = model.transform(training)

data = training_df.select(F.col("VectorAssembler_Features").alias("features"), F.col("label"))
lrModel = LogisticRegression().fit(data)

# Training a model purely on the word2vec column
assemblerInput2 = ["result"]
vector_assembler2 = VectorAssembler(inputCols=assemblerInput2, outputCol="VectorAssembler_Features")

stages2 = [word2Vec, vector_assembler2]
pipeline2 = Pipeline().setStages(stages2)
model2 = pipeline2.fit(training)
training_df2 = model2.transform(training)

data2 = training_df2.select(F.col("VectorAssembler_Features").alias("features"), F.col("label"))
lrModel2 = LogisticRegression().fit(data2)

# 3.4 Deploying the model on stream

from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType

# The operations we applied to the training data, we also apply on the streamed data.
# Note that the process function also includes code to print a confusion matrix.

def process(time, rdd):
    if rdd.isEmpty():
        return
    print("========= %s =========" % str(time))  # Convert to data frame
    df = spark.read.json(rdd)
    df = df.toPandas()
    df["messagesplit"] = df["message"].str.lower()
    df["messagesplit"] = df["messagesplit"].str.split(' ')
    df["label"] = (df["channel"] == "#jinnytty").astype(float)
    table = str.maketrans('', '', string.punctuation)
    df_messagesplit2 = list()
    for comment in list(df["messagesplit"]):
        stripped = [w.translate(table) for w in comment]
        df_messagesplit2.append(stripped)
    df["messagesplit"] = df_messagesplit2
    sc_pro = SparkSession.builder.getOrCreate()
    spark_df_pro = sc_pro.createDataFrame(df)
    # trained model is applied to the streamed data
    pp_df = model.transform(spark_df_pro)
    testData = pp_df.select(F.col("VectorAssembler_Features").alias("features"), F.col("label"))
    predictions = lrModel.transform(testData)
    preds_and_labels = predictions.select(F.col("label").cast(FloatType()), F.col("prediction"))
    preds_and_labels.show()
    preds_and_labels = preds_and_labels.select(['prediction', 'label'])
    metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
    print(metrics.confusionMatrix().toArray())

# Because we want to see the model work without usernames, 
# we create a process based on the model trained purely on the word2vec column:

def process2(time, rdd):
    if rdd.isEmpty():
        return
    print("========= %s =========" % str(time))  # Convert to data frame
    df = spark.read.json(rdd)
    df = df.toPandas()
    df["messagesplit"] = df["message"].str.lower()
    df["messagesplit"] = df["messagesplit"].str.split(' ')
    df["label"] = (df["channel"] == "#jinnytty").astype(float)
    table = str.maketrans('', '', string.punctuation)
    df_messagesplit2 = list()
    for comment in list(df["messagesplit"]):
        stripped = [w.translate(table) for w in comment]
        df_messagesplit2.append(stripped)
    df["messagesplit"] = df_messagesplit2
    sc_pro = SparkSession.builder.getOrCreate()
    spark_df_pro = sc_pro.createDataFrame(df)
    # trained model is applied to the streamed data
    pp_df = model2.transform(spark_df_pro)
    testData = pp_df.select(F.col("VectorAssembler_Features").alias("features"), F.col("label"))
    predictions = lrModel2.transform(testData)
    preds_and_labels = predictions.select(F.col("label").cast(FloatType()), F.col("prediction"))
    preds_and_labels.show()
    preds_and_labels = preds_and_labels.select(['prediction', 'label'])
    metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
    print(metrics.confusionMatrix().toArray())

# Starting the streaming thread:
ssc = StreamingContext(sc, 10)
ssc_t = StreamingThread(ssc)
lines = ssc.socketTextStream("localhost", 8080)
lines.foreachRDD(process)

# 3.4.1 Prediction with usernames
ssc_t.start()



ssc_t.stop()

# 3.4.2 Prediction without usernames
ssc2 = StreamingContext(sc, 10)
ssc_t2 = StreamingThread(ssc2)
lines2 = ssc2.socketTextStream("localhost", 8080)
lines2.foreachRDD(process2)
ssc_t2.start()



ssc_t2.stop()


# Transforming the test data using model2
test_pred_df = model2.transform(test)

# Preparing the test data
testdata = test_pred_df.select(
    F.col("VectorAssembler_Features").alias("features"),  # renaming the column to "features"
    F.col("label")
)

# Making predictions using the logistic regression model
predictions = lrModel2.transform(testdata)

# Importing necessary libraries
from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id, row_number

# Creating a window specification
w = Window.orderBy(monotonically_increasing_id())

# Joining the message column with prediction and label by row number
preds_labels = predictions.select(
    F.col("label").cast(FloatType()),
    F.col("prediction")
).withColumn("columnindex", row_number().over(w))

message = test.select("message").withColumn("columnindex", row_number().over(w))

preds_labels_message = preds_labels.join(
    message, preds_labels.columnindex == message.columnindex, 'inner'
).drop(message.columnindex)

preds_labels_message.show()

# Filtering messages where label and prediction both are 1
filtered_messages_1 = preds_labels_message.filter(
    (preds_labels_message.label + preds_labels_message.prediction) > 1
).collect()

# Filtering messages where label and prediction both are 0
filtered_messages_0 = preds_labels_message.filter(
    (preds_labels_message.label + preds_labels_message.prediction) < 1
).collect()

# Filtering messages where label is 1 but prediction is 0
wrong_predictions_1 = preds_labels_message.filter(
    (preds_labels_message.label == 1.0) & (preds_labels_message.prediction == 0.0)
).collect()

# Filtering messages where label is 0 but prediction is 1
wrong_predictions_0 = preds_labels_message.filter(
    (preds_labels_message.label == 0.0) & (preds_labels_message.prediction == 1.0)
).collect()
