# Predicting in real-time

In this notebook we will build predictive models and run them in real-time.

## Set-up Spark

Copy-paste from example scripts

In [1]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

## Define the models

Some notes:
* We will define two models
* For each model, we define its own `process` function since the general structure of how the predictions are obtained and appended to the dataframe changes.

In [2]:
import random
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
import pyspark.sql.types as tp

### VADER sentiment analyzer
This is a model that we didn't even had to train, as it uses the VADER sentiment scores to score each text on how positive/negative it is.

In [3]:
import pandas as pd
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# Initialize the VADER sentiment analyzer
analyzer = SentimentIntensityAnalyzer()

# Define a function to apply VADER sentiment analysis to a text string
def get_sentiment(text):
    text_str = str(text) # convert to string
    
    sentiment = analyzer.polarity_scores(text_str)
    if sentiment['compound'] > 0:
        return 1
    else:
        return 0

udf_VADER = udf(get_sentiment, tp.StringType())

def process_VADER(time, rdd):
    if rdd.isEmpty():
        print("rdd was empty...")
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()

    df_withpreds = df.withColumn("pred", udf_VADER( struct(df.review_text) ))
    
    df_withpreds.show()

### Logistic regression model

General structure (data pipeline):
* Tokenize review
* 'Translate' words into vector representation
* Apply a logistic regression model to with vector representations as regressors

In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
    
# Read in the data set. 
my_schema = tp.StructType([
  tp.StructField(name= 'review_id', dataType= tp.IntegerType(), nullable= True),
  tp.StructField(name= 'app_id', dataType= tp.IntegerType(), nullable= True),
  tp.StructField(name= 'review_text', dataType= tp.StringType(), nullable= True),
  tp.StructField(name= 'label', dataType= tp.IntegerType(), nullable= True)
])
my_data = spark.read.csv("C:/Users/wille/spark/MyData/review_data.csv",
                         schema=my_schema,
                         header=True)

# Probably because of the way the data set was stored as a .csv-file, the extremely long reviews are stored improperly,
# leading to missing values. Therefore, we drop these cases from the data set.
my_data = my_data.dropna()

# view the data
my_data.show(5)

# print the schema of the file
print("Schema:")
my_data.printSchema()

+---------+-------+--------------------+-----+
|review_id| app_id|         review_text|label|
+---------+-------+--------------------+-----+
|136510302|2103530|I simply love it,...|    1|
|136509602|2349550|  Gifted word: Grace|    1|
|136510134|1685730|It's pretty good!...|    1|
|136510117|1685730|I recommend it, I...|    1|
|136509657|2364130|simple point and ...|    0|
+---------+-------+--------------------+-----+
only showing top 5 rows

Schema:
root
 |-- review_id: integer (nullable = true)
 |-- app_id: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- label: integer (nullable = true)



In [5]:
# This model will consist of several stages, elaborated further below.

# define stage 1: tokenize the tweet text    
stage_1 = RegexTokenizer(inputCol= 'review_text' , outputCol= 'tokens', pattern= '\\W')
# define stage 2: remove the stop words
stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words')
# define stage 3: create a word vector of the size 50
stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector', vectorSize=50)
# define stage 4: Logistic Regression Model
model = LogisticRegression(featuresCol= 'vector', labelCol= 'label')

In [6]:
# Next, we combine each of these stages together into a data pipeline...
pipeline = Pipeline(stages = [stage_1, stage_2, stage_3, model])

# and fit the model with the training data
pipelineFit = pipeline.fit(my_data)

In [12]:
from pyspark.sql.functions import monotonically_increasing_id, row_number, transform
from pyspark.sql.window import Window

def process_logistic_regression(time, rdd):
    if rdd.isEmpty():
        print("rdd was empty")
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    
    # Make prediction on data frame based on learned model
    out = pipelineFit.transform(df).select("prediction")
    
    # Combine the prediction (which is a data frame) with the original data frame. After searching for a long time on
    # how to do this, the only solution we found is this incredibly convoluted way (first append index column to both
    # data frames, then merge them based on this index column, then drop the index column).
    out = out.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
    df = df.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
    df_withpreds = df.join(out, on=["row_index"]).drop("row_index")
    
    # Show data frame again, alongside the predictions.
    df_withpreds.show()

## Make predictions

Now we are ready to make real-time predictions

In [13]:
ssc = StreamingContext(sc, 10)

lines = ssc.socketTextStream("seppe.net", 7778)

# Fill in either 'process_VADER' or 'process_logistic_regression'
lines.foreachRDD(process_logistic_regression)

ssc_t = StreamingThread(ssc)
ssc_t.start()

In [11]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
