## MSBX 5420 Assignment 3
This assignment is about Spark Machine Learning and Spark Streaming. First two tasks focus on machine learning, and the third one combines machine learning and streaming analysis. We will use IMDB reviews data for the whole assignment.

### Task 1 - Topic Modeling on Moive Reviews with Spark ML
First of all, let's load the data. The data structure is very simple.One column is review text, and another column is the label of review sentiment (positive or negative). Same as exercise, we can load .csv.gz file directly from spark.

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[4]').config("spark.driver.memory", "2g").appName('spark_ml_imdb').getOrCreate()

In [2]:
reviews = spark.read.options(inferSchema = True, multiLine = True, escape = '\"').csv('IMDB_Reviews.csv.gz', header=True)
reviews.show()

+--------------------+---------+
|              review|sentiment|
+--------------------+---------+
|One of the other ...| positive|
|A wonderful littl...| positive|
|I thought this wa...| positive|
|Basically there's...| negative|
|Petter Mattei's "...| positive|
|Probably my all-t...| positive|
|I sure would like...| positive|
|This show was an ...| negative|
|Encouraged by the...| negative|
|If you like origi...| positive|
|Phil the Alien is...| negative|
|I saw this movie ...| negative|
|So im not a big f...| negative|
|The cast played S...| negative|
|This a fantastic ...| positive|
|Kind of drawn in ...| negative|
|Some films just s...| positive|
|This movie made i...| negative|
|I remember this f...| positive|
|An awful film! It...| negative|
+--------------------+---------+
only showing top 20 rows



First, we should clean up the review texts. Besides those special characters we have tried to remove in exercise, here we also need to remove the html tags in the text.

In [3]:
import pyspark.sql.functions as fn
import pyspark.ml.feature as ft

#remove html tags in the text with regular expression
reviews = reviews.withColumn('review', fn.regexp_replace(fn.col("review"), '<[^>]+>', ' '))
#remove special characters and line breaks in the text with regular expression
reviews = reviews.withColumn('review', fn.regexp_replace(fn.col("review"), '([^\s\w_]|_)+', ' ')).withColumn('review', fn.regexp_replace(fn.col("review"), '[\n\r]', ' '))
reviews.take(1)

[Row(review='One of the other reviewers has mentioned that after watching just 1 Oz episode you ll be hooked  They are right  as this is exactly what happened with me   The first thing that struck me about Oz was its brutality and unflinching scenes of violence  which set in right from the word GO  Trust me  this is not a show for the faint hearted or timid  This show pulls no punches with regards to drugs  sex or violence  Its is hardcore  in the classic use of the word   It is called OZ as that is the nickname given to the Oswald Maximum Security State Penitentary  It focuses mainly on Emerald City  an experimental section of the prison where all the cells have glass fronts and face inwards  so privacy is not high on the agenda  Em City is home to many Aryans  Muslims  gangstas  Latinos  Christians  Italians  Irish and more so scuffles  death stares  dodgy dealings and shady agreements are never far away   I would say the main appeal of the show is due to the fact that it goes where 

Now let's create tokenizer to start the data processing.

In [4]:
tokenizer = ft.RegexTokenizer(inputCol='review', outputCol='review_tok', pattern='\s+|[,.\"/!]')
tokenizer.transform(reviews).select('review_tok').take(1)

[Row(review_tok=['one', 'of', 'the', 'other', 'reviewers', 'has', 'mentioned', 'that', 'after', 'watching', 'just', '1', 'oz', 'episode', 'you', 'll', 'be', 'hooked', 'they', 'are', 'right', 'as', 'this', 'is', 'exactly', 'what', 'happened', 'with', 'me', 'the', 'first', 'thing', 'that', 'struck', 'me', 'about', 'oz', 'was', 'its', 'brutality', 'and', 'unflinching', 'scenes', 'of', 'violence', 'which', 'set', 'in', 'right', 'from', 'the', 'word', 'go', 'trust', 'me', 'this', 'is', 'not', 'a', 'show', 'for', 'the', 'faint', 'hearted', 'or', 'timid', 'this', 'show', 'pulls', 'no', 'punches', 'with', 'regards', 'to', 'drugs', 'sex', 'or', 'violence', 'its', 'is', 'hardcore', 'in', 'the', 'classic', 'use', 'of', 'the', 'word', 'it', 'is', 'called', 'oz', 'as', 'that', 'is', 'the', 'nickname', 'given', 'to', 'the', 'oswald', 'maximum', 'security', 'state', 'penitentary', 'it', 'focuses', 'mainly', 'on', 'emerald', 'city', 'an', 'experimental', 'section', 'of', 'the', 'prison', 'where', 'all

Then remove stopwords in the text.

In [5]:
stopwords = ft.StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='review_stop')
stopwords.transform(tokenizer.transform(reviews)).select('review_stop').take(1)

[Row(review_stop=['one', 'reviewers', 'mentioned', 'watching', '1', 'oz', 'episode', 'll', 'hooked', 'right', 'exactly', 'happened', 'first', 'thing', 'struck', 'oz', 'brutality', 'unflinching', 'scenes', 'violence', 'set', 'right', 'word', 'go', 'trust', 'show', 'faint', 'hearted', 'timid', 'show', 'pulls', 'punches', 'regards', 'drugs', 'sex', 'violence', 'hardcore', 'classic', 'use', 'word', 'called', 'oz', 'nickname', 'given', 'oswald', 'maximum', 'security', 'state', 'penitentary', 'focuses', 'mainly', 'emerald', 'city', 'experimental', 'section', 'prison', 'cells', 'glass', 'fronts', 'face', 'inwards', 'privacy', 'high', 'agenda', 'em', 'city', 'home', 'many', 'aryans', 'muslims', 'gangstas', 'latinos', 'christians', 'italians', 'irish', 'scuffles', 'death', 'stares', 'dodgy', 'dealings', 'shady', 'agreements', 'never', 'far', 'away', 'say', 'main', 'appeal', 'show', 'due', 'fact', 'goes', 'shows', 'wouldn', 'dare', 'forget', 'pretty', 'pictures', 'painted', 'mainstream', 'audien

Now same as what we did in the exercise, let's create `CountVectorizer` to transform the text into term frequency vector. 

In [6]:
tf = ft.CountVectorizer(inputCol=stopwords.getOutputCol(), outputCol='review_tf')
tokenized = stopwords.transform(tokenizer.transform(reviews))
tf.fit(tokenized).transform(tokenized).select('review_tf').take(1)

[Row(review_tf=SparseVector(101111, {2: 1.0, 10: 1.0, 13: 2.0, 17: 2.0, 28: 1.0, 32: 1.0, 35: 3.0, 39: 1.0, 45: 2.0, 46: 1.0, 50: 1.0, 53: 1.0, 54: 2.0, 57: 1.0, 83: 1.0, 85: 1.0, 91: 1.0, 93: 1.0, 97: 1.0, 101: 2.0, 108: 1.0, 121: 1.0, 128: 3.0, 138: 2.0, 160: 1.0, 161: 1.0, 169: 1.0, 174: 1.0, 184: 1.0, 191: 2.0, 195: 1.0, 217: 1.0, 235: 1.0, 249: 1.0, 250: 1.0, 251: 1.0, 264: 1.0, 278: 1.0, 286: 2.0, 302: 1.0, 316: 1.0, 324: 1.0, 370: 1.0, 386: 1.0, 409: 2.0, 438: 1.0, 448: 4.0, 453: 1.0, 457: 1.0, 480: 1.0, 501: 1.0, 514: 1.0, 526: 1.0, 534: 2.0, 535: 1.0, 567: 2.0, 582: 1.0, 685: 1.0, 707: 3.0, 754: 1.0, 779: 1.0, 826: 1.0, 921: 1.0, 939: 1.0, 1073: 3.0, 1090: 1.0, 1107: 1.0, 1146: 1.0, 1181: 1.0, 1210: 1.0, 1290: 1.0, 1293: 1.0, 1313: 1.0, 1347: 1.0, 1464: 1.0, 1473: 1.0, 1496: 1.0, 1619: 1.0, 1899: 1.0, 1916: 1.0, 1936: 1.0, 2015: 1.0, 2117: 1.0, 2211: 1.0, 2328: 1.0, 2340: 1.0, 2381: 1.0, 2421: 1.0, 2471: 1.0, 2588: 1.0, 2790: 1.0, 2792: 1.0, 2865: 1.0, 2903: 1.0, 2967: 6.0, 30

Then we use the `LDA` model to do topic modeling. We create the model here with 30 topics.

In [7]:
import pyspark.ml.clustering as clus
lda = clus.LDA(k=30, optimizer='online', maxIter=10, featuresCol=tf.getOutputCol())

Now let's build the pipeline to train the topic model from the raw data. It will take a while to run.

In [8]:
from pyspark.ml import Pipeline

# Define stages of the pipeline
pipeline_stages = [
    # Tokenize the review text
    tokenizer,
    # Remove stopwords
    stopwords,
    # Convert text into a sparse vector of token counts
    tf,
    # Fit LDA model
    lda
]

# Create the pipeline
lda_pipeline = Pipeline(stages=pipeline_stages)

# Fit the pipeline to the data
pipeline_model = lda_pipeline.fit(reviews)


In [9]:
topics = pipeline_model.transform(reviews)
topics.select('topicDistribution').take(5)

[Row(topicDistribution=DenseVector([0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.9946])),
 Row(topicDistribution=DenseVector([0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.9894])),
 Row(topicDistribution=DenseVector([0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.9895])),
 Row(topicDistribution=DenseVector([0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005,

Let's see if we have properly discovered the topics. This is just the same code we display topics in the exercise - we will reuse it several times here.

In [10]:
#Code to extract topics from models
vectorized_model = pipeline_model.stages[2]
topic_model = pipeline_model.stages[3]
vocab = vectorized_model.vocabulary
topic_words_list = topic_model.describeTopics(20)
topic_words_rdd = topic_words_list.rdd
topics_words = topic_words_rdd.map(lambda row: row['termIndices']).map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()

for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

topic: 0
*************************
film
movie
one
time
made
good
like
bad
films
get
two
always
first
plot
father
little
years
also
cast
life
*************************
topic: 1
*************************
movie
bad
part
film
one
like
us
still
time
character
story
good
much
two
even
horizon
end
make
lost
fuller
*************************
topic: 2
*************************
film
one
movie
good
like
well
great
never
show
first
really
story
also
see
made
much
get
watch
time
two
*************************
topic: 3
*************************
movie
one
film
series
first
story
movies
good
really
like
much
watch
little
puppet
get
also
even
time
book
made
*************************
topic: 4
*************************
movie
one
like
film
movies
great
bad
also
really
story
good
get
made
much
even
scene
well
see
think
characters
*************************
topic: 5
*************************
movie
film
one
like
really
good
even
many
movies
see
time
well
great
films
make
story
made
first
character
plot
********

How do you think about the topics? Do they make sense? If you think the topics we get from the movie reviews should be better, let's continue to see what we can do to make them better.

One possible reason is that we have many words that do not show up frequently. That is, they are very specific words to certain movies but don't occur across reviews. Such words are not very meaningful and they do not represent common themes in those reviews. So here we limit the frequency of words to at least 5 and run LDA with pipeline again.

In [11]:
#filter the countvectorizer
tf = ft.CountVectorizer(inputCol=stopwords.getOutputCol(), outputCol='review_tf', minDF=5)

In [12]:
#[Your Code] to build a ML pipeline and fit LDA again


# Create the pipeline
lda_pipeline = Pipeline(stages=pipeline_stages)

# Fit the pipeline to the data
pipeline_model = lda_pipeline.fit(reviews)

topics = pipeline_model.transform(reviews)
topics.select('topicDistribution').take(5)

[Row(topicDistribution=DenseVector([0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.9946])),
 Row(topicDistribution=DenseVector([0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.9894])),
 Row(topicDistribution=DenseVector([0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.9895])),
 Row(topicDistribution=DenseVector([0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005,

In [13]:
#Code to extract topics from models
vectorized_model = pipeline_model.stages[2]
topic_model = pipeline_model.stages[3]
vocab = vectorized_model.vocabulary
topic_words_list = topic_model.describeTopics(20)
topic_words_rdd = topic_words_list.rdd
topics_words = topic_words_rdd.map(lambda row: row['termIndices']).map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()

for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

topic: 0
*************************
film
movie
one
time
made
good
like
bad
films
get
two
always
first
plot
father
little
years
also
cast
life
*************************
topic: 1
*************************
movie
bad
part
film
one
like
us
still
time
character
story
good
much
two
even
horizon
end
make
lost
fuller
*************************
topic: 2
*************************
film
one
movie
good
like
well
great
never
show
first
really
story
also
see
made
much
get
watch
time
two
*************************
topic: 3
*************************
movie
one
film
series
first
story
movies
good
really
like
much
watch
little
puppet
get
also
even
time
book
made
*************************
topic: 4
*************************
movie
one
like
film
movies
great
bad
also
really
story
good
get
made
much
even
scene
well
see
think
characters
*************************
topic: 5
*************************
movie
film
one
like
really
good
even
many
movies
see
time
well
great
films
make
story
made
first
character
plot
********

It is expected that the topics are getting better but still not very satisfying. Some words may be very specific to some reviews. Also, there are lots of words shown in different topics many times; possibly they are too common so they shouldn't be that important. Let's take one more step to use TF-IDF vector rather than TF vector. To build IF-IDF, we first create TF with CountVectorizer then create IDF from TF vector. Then we run LDA model with IF-IDF vector.

In [14]:
#use tf-idf vector
tf = ft.CountVectorizer(inputCol=stopwords.getOutputCol(), outputCol="review_tf", vocabSize=10000)
idf = ft.IDF(inputCol=tf.getOutputCol(), outputCol="review_tfidf", minDocFreq=5)

In [15]:
#[Your Code] to create a LDA model (30 topics) and put everything together into a ML pipeline to fit LDA

from pyspark.ml import Pipeline

# Define stages of the pipeline
pipeline_stages = [
    # Tokenize the review text
    tokenizer,
    # Remove stopwords
    stopwords,
    # Convert text into a sparse vector of token counts
    tf,
    idf,
    # Fit LDA model
    lda
]

# Create the pipeline
lda_pipeline = Pipeline(stages=pipeline_stages)

# Fit the pipeline to the data
pipeline_model = lda_pipeline.fit(reviews)

topics = pipeline_model.transform(reviews)
topics.select('topicDistribution').take(5)

[Row(topicDistribution=DenseVector([0.0002, 0.0002, 0.2916, 0.0002, 0.5871, 0.0002, 0.0002, 0.1156, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002])),
 Row(topicDistribution=DenseVector([0.0004, 0.0004, 0.1968, 0.0004, 0.7915, 0.0004, 0.0004, 0.0005, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004])),
 Row(topicDistribution=DenseVector([0.0004, 0.0004, 0.1917, 0.0004, 0.7977, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004])),
 Row(topicDistribution=DenseVector([0.0005, 0.0005, 0.0005, 0.0005, 0.8901, 0.0005, 0.0005, 0.0964, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005,

In [16]:
#Code to extract topics from models
tf_model = pipeline_model.stages[2]
topic_model = pipeline_model.stages[4]
vocab = tf_model.vocabulary
topic_words_list = topic_model.describeTopics(20)
topic_words_rdd = topic_words_list.rdd
topics_words = topic_words_rdd.map(lambda row: row['termIndices']).map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()

for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

topic: 0
*************************
film
carmen
one
movie
freeman
made
vega
kar
wai
time
films
good
scarlet
paz
like
first
love
lau
bad
however
*************************
topic: 1
*************************
movie
streisand
one
barbra
film
like
bugs
prom
donna
bad
even
best
good
also
scenes
movies
see
action
much
know
*************************
topic: 2
*************************
film
one
like
movie
even
time
man
good
also
get
story
well
back
two
great
see
films
make
new
much
*************************
topic: 3
*************************
film
dreyfuss
dictator
movie
nevsky
time
one
eisenstein
jonathan
julia
get
see
really
also
know
characters
good
much
story
man
*************************
topic: 4
*************************
film
movie
one
like
good
time
even
really
see
story
much
well
bad
get
people
great
first
also
movies
characters
*************************
topic: 5
*************************
bam
jackass
one
movie
give
viva
make
two
much
war
town
good
film
even
thing
people
every
better
know
ll

The topics should be more reasonable now. You should believe they can still be further improved by cleaning up the text and tuning the hyperparameters but let's stop here for assignment. If you want to try yourself beyond the assignment, you can change the model configuration to see if you can get any further improvement.

### Task 2 - Movie Review Sentiment Analysis with Spark ML
The second task we are going to prediction. Let's continue with the reviews data and now we can do sentiment analysis with the TF-IDF. So with the TF-IDF vector, we can train and predict review sentiment.

In [17]:
#first let's confirm the potential labels
#(it is possible sentiment can be neutral so we should make sure if that's the case)
reviews.select('sentiment').distinct().show()

+---------+
|sentiment|
+---------+
| positive|
| negative|
+---------+



In [18]:
#let's create the binary numerical lable from postive/negative
reviews = reviews.withColumn('sentiment_label', fn.when(fn.col('sentiment')=='positive', 1.0).otherwise(0.0))
reviews.show()

+--------------------+---------+---------------+
|              review|sentiment|sentiment_label|
+--------------------+---------+---------------+
|One of the other ...| positive|            1.0|
|A wonderful littl...| positive|            1.0|
|I thought this wa...| positive|            1.0|
|Basically there s...| negative|            0.0|
|Petter Mattei s  ...| positive|            1.0|
|Probably my all t...| positive|            1.0|
|I sure would like...| positive|            1.0|
|This show was an ...| negative|            0.0|
|Encouraged by the...| negative|            0.0|
|If you like origi...| positive|            1.0|
|Phil the Alien is...| negative|            0.0|
|I saw this movie ...| negative|            0.0|
|So im not a big f...| negative|            0.0|
|The cast played S...| negative|            0.0|
|This a fantastic ...| positive|            1.0|
|Kind of drawn in ...| negative|            0.0|
|Some films just s...| positive|            1.0|
|This movie made i..

In [19]:
#split the training and testing set, with 80/20
reviews_train, reviews_test = reviews.randomSplit([0.8, 0.2], seed=200)

In [20]:
import pyspark.ml.classification as cl
from pyspark.ml import Pipeline

#create logistic gression model and then build the pipeline to train the model
lr = cl.LogisticRegression(maxIter=10, labelCol='sentiment_label', featuresCol=idf.getOutputCol())

In [21]:
#[Your Code] to build a ML pipeline and train logistic regression model


# Define stages of the pipeline
pipeline_stages = [
    tokenizer,
    stopwords,
    tf,
    idf,
    lr
]

# Create the pipeline
lr_pipeline = Pipeline(stages=pipeline_stages)

# Fit the pipeline to the training data
lr_model = lr_pipeline.fit(reviews_train)



In [22]:
#make predictions with pipeline model
predictions = lr_model.transform(reviews_test)

In [23]:
import pyspark.ml.evaluation as ev
#model evaluation for binary classification
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='sentiment_label')
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderPR'}))

0.9338446403526336
0.9273869863834877


The prediction performance looks acceptable. Here note that TF-IDF is a long vector (here we select top 10000 words, but still a large number), so let's try something different. As mentioned in the class, another way to model text is word embedding with the Word2Vec model. So next we create word vector and use it to predict sentiment.

In [24]:
#create word2vec model
word2vec = ft.Word2Vec(vectorSize=100, minCount=5, inputCol=stopwords.getOutputCol(), outputCol="review_word2vec")

In [25]:
#same logistic regression model, but take output from word2vec model
#[Your Code] to create a logistic regression model and build pipeline with word2vec to train logistic regession; then make predictions and evaluate model (areaUnderROC and areaUnderPR)

# Logistic Regression model
lr = cl.LogisticRegression(maxIter=10, labelCol='sentiment_label', featuresCol="review_word2vec")

# Define stages of the pipeline
pipeline_stages = [
    tokenizer,
    stopwords,
    word2vec,
    lr
]

# Create the pipeline
lr_word2vec_pipeline = Pipeline(stages=pipeline_stages)

# Fit the pipeline to the training data
lr_word2vec_model = lr_word2vec_pipeline.fit(reviews_train)

# Make predictions on the test data
predictions = lr_word2vec_model.transform(reviews_test)

In [26]:
#import pyspark.ml.evaluation as ev
#model evaluation for binary classification
#evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='sentiment_label')
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderPR'}))

0.934560428771788
0.9316361736134724


Check the prediction performance with only 100 features; word2vec model is a very useful representation of words and it reduces dimensionality significantly. 

In the end, let's try an alternative model. In classfication, Support Vector Machine (SVM) is commonly used and let's see how we can use it here. We will keep the orginal configuration of word2vec model (vector size is 100) here for SVM.

In [27]:
#same word2vec model configuration is adopted here
word2vec = ft.Word2Vec(vectorSize=100, minCount=5, inputCol=stopwords.getOutputCol(), outputCol="review_word2vec")
#create svm with LinearSVC, with features from word2vec model outputCol
svm = cl.LinearSVC(maxIter=10, labelCol='sentiment_label', featuresCol=word2vec.getOutputCol())

In [28]:
#build the ml pipeline and train the model; then make predictions 
#[Your Code] to build the ML pipeline to train SVM model; then make predictions (no need to evaluate, the evaluation is slightly different here so provided below)
 
# Define stages of the pipeline
pipeline_stages = [
    tokenizer,
    stopwords,
    word2vec,
    svm
]

# Create the pipeline
svm_pipeline = Pipeline(stages=pipeline_stages)

# Fit the pipeline to the training data
svm_model = svm_pipeline.fit(reviews_train)

# Make predictions on the test data
predictions = svm_model.transform(reviews_test)


In [29]:
#save the model for task 3
from pyspark.ml import PipelineModel
model_path = './svm_model'
svm_model.write().overwrite().save(model_path)

In [30]:
#model evaluation, here slightly different for LinearSVC
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='sentiment_label')
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderPR'}))

0.9340058906030851
0.9311235672362355


### Task 3 - Combine Spark ML and Streaming Analysis
Now we have our sentiment prediction model with acceptable predictive performance. The last task is to combine this machine learning mode with spark streaming. That is, with a data stream, we will use the trained model to make real-time predictions. We will still use IMDB reviews and here we will create a simulated data stream from files in a directory, then receive the review stream and predict sentiment using spark structured streaming.

We will first use `streaming_producer.ipynb` specifically for this assignment so that we can simulate a data stream to send files incrementally into a directory and we can read stream from the same directory. To do that, we use `streaming_producer.ipynb` - it will send random reviews in the form of csv files into `review_stream` directory. Then this spark application will read review stream data from `review_stream` directory.

In [31]:
reviews_test_stream = reviews_test.select('review')

In [32]:
import pandas as pd
import numpy as np
import os

#here we save the data as csv to allow streaming_producer.ipynb to take random reviews from it
df = reviews_test_stream.toPandas()
df.to_csv('review_test.csv', index=False)

In [33]:
#load the saved model
from pyspark.ml import PipelineModel
model_path = './svm_model'
svm_model = PipelineModel.load(model_path)

Now we can read from the stream. For convenience, we just take the existing spark dataframe to reuse the schema. Please note the first comment for streaming producer.

In [34]:
#run streaming_producer.ipynb notebook (but make sure you try to work on the code below first), then go back here to run subsequent code

In [35]:
#read from data stream in a folder
streaming_path = './review_stream'
if not os.path.exists(streaming_path):
    os.makedirs(streaming_path)
streaming_review = spark.readStream.schema(reviews_test_stream.schema).option("maxFilesPerTrigger", 1).csv(streaming_path)

Here from the data stream, we want to know three results. First, how many positive or negative reviews we have received in real time? Second, how many positive or negative reviews in each time window of 60 seconds? Third, we are interested in the positive and negative reviews in each time window of 60 seconds, but with sliding window for every 30 seconds. So we will do some calculations, and to capture time window, we will use the current timestamp to create 'processing_time' (the time we receive the data) and apply window on this timestamp.

In [36]:
streaming_review_time = streaming_review.withColumn('processing_time', fn.current_timestamp())

In [None]:
# Make predictions on the streaming data
streaming_sentiment = svm_model.transform(streaming_review_time)

# Define the query to write the streaming data with predictions to a sink
query = streaming_sentiment.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Await termination of the query
query.awaitTermination()

In [39]:
#create a predicted text label from 'predicted column' is the prediction
streaming_sentiment = streaming_sentiment.withColumn('predicted', fn.when(fn.col('prediction')==1.0, 'positive').otherwise('negative'))

In [40]:
#[Your Code] to do transformations to get the results we need
#first, get total number of positive and negative reviews we have received
#second, still number of positive and negative reviews we received, but by time window (60 seconds)
#third, still number of positive and negative reviews we received, but by sliding time window (60 seconds window for every 30 seconds)
#names of streaming dataframes are 'sentiment_count', 'sentiment_window_count', 'sentiment_sliding_window_count' (see the next cell)


from pyspark.sql.functions import window

# First transformation: Total number of positive and negative reviews
sentiment_count = streaming_sentiment.groupBy('predicted').count()

# Second transformation: Number of positive and negative reviews by time window (60 seconds)
sentiment_window_count = streaming_sentiment.groupBy(window("processing_time", "60 seconds"), "predicted").count()

# Third transformation: Number of positive and negative reviews by sliding time window (60 seconds window for every 30 seconds)
sentiment_sliding_window_count = streaming_sentiment.groupBy(window("processing_time", "60 seconds", "30 seconds"), "predicted").count()

# Naming the streaming dataframes
sentiment_count = sentiment_count.withColumnRenamed("count", "total_count")
sentiment_window_count = sentiment_window_count.withColumnRenamed("count", "window_count")
sentiment_sliding_window_count = sentiment_sliding_window_count.withColumnRenamed("count", "sliding_window_count")

# Printing the schema of the dataframes to ensure correctness
sentiment_count.printSchema()
sentiment_window_count.printSchema()
sentiment_sliding_window_count.printSchema()



root
 |-- predicted: string (nullable = false)
 |-- total_count: long (nullable = false)

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- predicted: string (nullable = false)
 |-- window_count: long (nullable = false)

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- predicted: string (nullable = false)
 |-- sliding_window_count: long (nullable = false)



In [41]:
#now we have two streaming dataframe results
print(sentiment_count.isStreaming)
print(sentiment_window_count.isStreaming)
print(sentiment_sliding_window_count.isStreaming)

True
True
True


In [43]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[4]') \
                    .config("spark.driver.memory", "2g") \
                    .config("spark.jars", "graphframes-0.8.3-spark3.5-s_2.12.jar") \
                    .config("spark.packages", "graphframes:graphframes:0.8.3-spark3.5-s_2.12") \
                    .appName('spark_graphframe').getOrCreate()

In [None]:
#[Your Code] to define the query variables and set the result tables
#name of quiery variables are 'query_sentiment', 'query_sentiment_window', 'query_sentiment_sliding_window' (see the last cell)
#name of result tables are 'sentiment', 'sentiment_window', 'sentiment_sliding_window' (see the next cell)

# Define query variables
query_sentiment = sentiment_count \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("sentiment") \
    .start()

query_sentiment_window = sentiment_window_count \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("sentiment_window") \
    .start()

query_sentiment_sliding_window = sentiment_sliding_window_count \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("sentiment_sliding_window") \
    .start()



In [47]:
#create a dashboard for 2 minutes to show the live results from the three result tbales
import time
start_time = time.time()
current_time = time.time()
while current_time - start_time <= 120:
    spark.sql('select * from sentiment').show()
    spark.sql('select * from sentiment_window order by window').show(truncate=False)
    spark.sql('select * from sentiment_sliding_window order by window').show(truncate=False)
    current_time = time.time()
    time.sleep(10)

+---------+-----------+
|predicted|total_count|
+---------+-----------+
| positive|         26|
| negative|         29|
+---------+-----------+

+------------------------------------------+---------+------------+
|window                                    |predicted|window_count|
+------------------------------------------+---------+------------+
|{2024-03-17 04:43:00, 2024-03-17 04:44:00}|positive |26          |
|{2024-03-17 04:43:00, 2024-03-17 04:44:00}|negative |29          |
+------------------------------------------+---------+------------+

+------------------------------------------+---------+--------------------+
|window                                    |predicted|sliding_window_count|
+------------------------------------------+---------+--------------------+
|{2024-03-17 04:43:00, 2024-03-17 04:44:00}|negative |29                  |
|{2024-03-17 04:43:00, 2024-03-17 04:44:00}|positive |26                  |
|{2024-03-17 04:43:30, 2024-03-17 04:44:30}|positive |26          

In [48]:
#stop query to finish streaming analysis
query_sentiment.stop()
query_sentiment_window.stop()
query_sentiment_sliding_window.stop()

In [None]:
###---------Code End-----------###