## Initialize Spark and import necessary packages

In [1]:
# Create a local SparkContext object named "MyApp" that you can use to interact with Spark // for rdds
from pyspark import SparkContext
sc = SparkContext("local", "MyApp")

23/05/19 00:58:18 WARN Utils: Your hostname, thinkpad-t470 resolves to a loopback address: 127.0.1.1; using 192.168.0.164 instead (on interface wlp4s0)
23/05/19 00:58:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/19 00:58:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Create a SparkSession object named spark with an app name of "MyApp". You can then use it to interact with Spark. // for dataframes
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .getOrCreate()

In [3]:
import threading
from pyspark.streaming import StreamingContext
from pyspark.sql import DataFrame
from pyspark.sql.types import * 
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark import StorageLevel
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from functools import reduce
import pandas 
import numpy as np
import os

## Constructing data set using the provided stream

In [None]:
# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)
        

# Start streaming and saving
ssc = StreamingContext(sc, 10)
lines = ssc.socketTextStream("seppe.net", 7778)
lines.saveAsTextFiles("/home/linas/Desktop/kul/$em 2/advanced analytics in business/assignment 3/streaming_test/")
ssc_t = StreamingThread(ssc)
ssc_t.start()

In [None]:
# Stop streaming
ssc_t.stop()

In [4]:
# Define empty RDD
empty_rdd = spark.sparkContext.emptyRDD()

# Define schema for the dataframe
columns = StructType([StructField('review_id', StringType(), False),
                        StructField('app_id', StringType(), False), 
                        StructField('review_text', StringType(), False), 
                        StructField('label', StringType(), False)])

# Create empty dataframe
df = spark.createDataFrame(data=empty_rdd, schema=columns)
df.show()

+---------+------+-----------+-----+
|review_id|app_id|review_text|label|
+---------+------+-----------+-----+
+---------+------+-----------+-----+



In [5]:
# Populate empty dataframe by reading the files saved after streaming
def scan_folder(parent, schema):
    for file_name in os.listdir(parent):
        if file_name.startswith("-168"):
            for text_name in os.listdir(parent+"/"+file_name):
                if text_name.startswith("part"):
                    sub_df=spark.read.format("json").schema(schema).load(parent+"/"+file_name+"/"+text_name)
                    globals()["df"]=globals()["df"].union(sub_df)

scan_folder("/home/linas/Desktop/kul/$em 2/advanced analytics in business/assignment 3/data/streaming_data_subsample/", columns)

In [6]:
# Remove duplicate rows
df = df.dropDuplicates()

In [7]:
# Remove empty reviews
df = df.filter(df.review_text != "")

In [8]:
# Set label column to integer type
df = df.withColumn("label", df["label"].cast(IntegerType()))

In [9]:
# Check if the dataframe is populated correctly
df.select(df.columns[:20]).show()
print((df.count(), len(df.columns)))
# NOTE: shape() is not valid for pyspark dataframes

23/05/18 22:15:34 WARN DAGScheduler: Broadcasting large task binary with size 6.0 MiB
23/05/18 22:18:50 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
                                                                                

+---------+---------------+--------------------+-----+
|review_id|         app_id|         review_text|label|
+---------+---------------+--------------------+-----+
|136616773|         955200|       Ebin :DDDDDDD|    1|
|136616125|        2328760|Downgrade compare...|    0|
|136616752|        2328760|Step 1: Abandon y...|    0|
|136619845|        1913870|Incredible party ...|    1|
|136633177|        1498040|If you like theor...|    1|
|136635517|        1811990|Adventure Time an...|    1|
|136633228|        1772830|Played a couple h...|    1|
|136635511|        1811990|first steam revie...|    0|
|136755504|         307950|Played 51 minutes...|    0|
|136758662|        1527950|WhiteTales... I m...|    0|
|136755179|         307950|          cool game.|    1|
|136757637|        1527950|I'm addicted. The...|    1|
|136667892|1798010,1798020|I want to convinc...|    1|
|136667372|1798010,1798020|battle network 1 ...|    1|
|136667726|1798010,1798020|My childhood! \nA...|    1|
|136667270

23/05/18 22:19:19 WARN DAGScheduler: Broadcasting large task binary with size 6.0 MiB
23/05/18 22:22:32 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB


(793, 4)


                                                                                

In [10]:
# Count class distribution
count_df = df.groupBy('label').count().orderBy('label')
count_df.show()

23/05/18 22:22:58 WARN DAGScheduler: Broadcasting large task binary with size 6.0 MiB
23/05/18 22:26:18 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
23/05/18 22:26:20 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
                                                                                

+-----+-----+
|label|count|
+-----+-----+
|    0|  146|
|    1|  647|
+-----+-----+



## Constructing predictive model

In [11]:
# Split into train and test set
df, test= df.randomSplit(weights = [0.70, 0.30])

In [12]:
# # Preprocess review data: tokenize
# step1 = RegexTokenizer(inputCol="review_text", outputCol="tokens", pattern="\\W")

# # Preprocess review data: remove stopwords and add slang words generated by ChatGPT
# swr = StopWordsRemover(inputCol='tokens', outputCol='filtered_words')
# w_slang = swr.getStopWords() + ['af', 'aight', 'amirite', 'anyways', 'awesomesauce', 'bae', 'bruh', 'btw', 'dope', 'fam', 'fr', 'gg', 'gj', 'glhf', 'grats', 'imho', 'imo', 'lmao', 'lmfao', 'lol', 'nvm', 'omg', 'pls', 'rip', 'salty', 'savage', 'smh', 'tbh', 'thx', 'ttyl', 'ty', 'wp', 'wtf', 'yeet', 'yolo']
# step2 = StopWordsRemover(inputCol='tokens', outputCol='filtered_words', stopWords=w_slang)

# # Preprocess review data: vectorize
# step3 = CountVectorizer(inputCol="filtered_words", outputCol="features")

# # Initiate logistic regression model
# step4 = LogisticRegression(featuresCol='features', labelCol='label')

# # Create pipeline
# pipeline = Pipeline(stages=[step1, step2, step3, step4])

In [13]:
# Preprocess review data: tokenize
step1 = RegexTokenizer(inputCol="review_text", outputCol="tokens", pattern="\\W")

# Preprocess review data: remove stopwords and add slang words generated by ChatGPT
swr = StopWordsRemover(inputCol='tokens', outputCol='filtered_words')
w_slang = swr.getStopWords() + ['af', 'aight', 'amirite', 'anyways', 'awesomesauce', 'bae', 'bruh', 'btw', 'dope', 'fam', 'fr', 'gg', 'gj', 'glhf', 'grats', 'imho', 'imo', 'lmao', 'lmfao', 'lol', 'nvm', 'omg', 'pls', 'rip', 'salty', 'savage', 'smh', 'tbh', 'thx', 'ttyl', 'ty', 'wp', 'wtf', 'yeet', 'yolo']
step2 = StopWordsRemover(inputCol='tokens', outputCol='filtered_words', stopWords=w_slang)

# Preprocess review data: compute term frequency
step3 = HashingTF(inputCol="filtered_words", outputCol="term_freq", numFeatures=2**16)

# Preprocess review data: compute TF-IDF features
step4 = IDF(inputCol="term_freq", outputCol="features")

# Initiate logistic regression model
step5 = LogisticRegression(featuresCol='features', labelCol='label')

# Create pipeline
pipeline = Pipeline(stages=[step1, step2, step3, step4, step5])

In [14]:
# Fit pipeline to training data
pipelineFit = pipeline.fit(df)

23/05/18 22:26:47 WARN DAGScheduler: Broadcasting large task binary with size 5.6 MiB
23/05/18 22:29:47 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
23/05/18 22:30:15 WARN DAGScheduler: Broadcasting large task binary with size 5.6 MiB
23/05/18 22:33:39 WARN DAGScheduler: Broadcasting large task binary with size 5.6 MiB
23/05/18 22:37:26 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
23/05/18 22:37:27 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
23/05/18 22:37:30 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
23/05/18 22:37:31 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
23/05/18 22:37:31 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
23/05/18 22:37:32 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
23/05/18 22:37:32 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
23/05/18 22:37:33 WARN DAGScheduler: Broadcasting larg

In [15]:
# Validate model on test data
predictions = pipelineFit.transform(test)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(test.count())
accuracy

23/05/18 23:29:46 WARN DAGScheduler: Broadcasting large task binary with size 6.0 MiB
23/05/18 23:57:20 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB
23/05/18 23:57:58 WARN DAGScheduler: Broadcasting large task binary with size 6.0 MiB
23/05/19 00:39:58 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
                                                                                

0.8130081300813008

In [16]:
# Save trained model
pipelineFit.save("/home/linas/Desktop/kul/$em 2/advanced analytics in business/assignment 3/model_tf_idf/")

23/05/19 00:40:12 WARN TaskSetManager: Stage 111 contains a task of very large size (1052 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

# Use trained model to make predictions as the stream comes in

In [4]:
# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)
        

# Add capability to predict as the stream comes in
globals()['models_loaded'] = False
globals()['my_model'] = None

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = PipelineModel.load('/home/linas/Desktop/kul/$em 2/advanced analytics in business/assignment 3/model_tf_idf/')
        globals()['models_loaded'] = True
    # Make predictions with loaded model
    df_result = globals()['my_model'].transform(df)
    df_result.select("app_id", "label", "review_id", "review_text", "prediction").show()

In [5]:
# Start streaming and predicting
ssc = StreamingContext(sc, 10)
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)
ssc_t = StreamingThread(ssc)
ssc_t.start()

23/05/19 00:58:39 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.


23/05/19 00:58:42 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/05/19 00:58:42 WARN BlockManager: Block input-0-1684450722200 replicated to only 0 peer(s) instead of 1 peers
23/05/19 00:58:44 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/05/19 00:58:44 WARN BlockManager: Block input-0-1684450724200 replicated to only 0 peer(s) instead of 1 peers
23/05/19 00:58:48 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/05/19 00:58:48 WARN BlockManager: Block input-0-1684450728400 replicated to only 0 peer(s) instead of 1 peers
23/05/19 00:58:53 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/05/19 00:58:53 WARN BlockManager: Block input-0-1684450733200 replicated to only 0 peer(s) instead of 1 peers
23/05/19 00:58:57 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/05/19 00:58:57 WARN BlockManager: Block input-0-1684450737400 replicated to

In [7]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----


23/05/19 00:59:23 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
	at java.net.SocketInputStream.read(SocketInputStream.java:171)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	at java.io.InputStreamReader.read(InputStreamReader.java:184)
	at java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.io.BufferedReader.readLine(BufferedReader.java:324)
	at java.io.BufferedReader.readLine(BufferedReader.java:389)
	at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.getNext(SocketInputDStream.scala:121)
	at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.getNext(SocketInputDStream.scala:119)
	at org.apache.spar



                                                                                

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|2363140|    1|138507859|yep. don't worry ...|
|2311190|    0|138508617|it sucks my scree...|
|1742020|    1|138509366|    👏👏 Next Meme~!|
+-------+-----+---------+--------------------+



23/05/19 00:59:43 WARN DAGScheduler: Broadcasting large task binary with size 1145.2 KiB
23/05/19 00:59:44 WARN DAGScheduler: Broadcasting large task binary with size 1145.2 KiB
                                                                                

+-------+-----+---------+--------------------+----------+
| app_id|label|review_id|         review_text|prediction|
+-------+-----+---------+--------------------+----------+
|2363140|    1|138507859|yep. don't worry ...|       1.0|
|2311190|    0|138508617|it sucks my scree...|       1.0|
|1742020|    1|138509366|    👏👏 Next Meme~!|       1.0|
+-------+-----+---------+--------------------+----------+

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|1742020|    1|138507426|No one reads the ...|
|1742020|    1|138506738|Never in my life ...|
+-------+-----+---------+--------------------+



23/05/19 00:59:46 WARN DAGScheduler: Broadcasting large task binary with size 1145.2 KiB
23/05/19 00:59:47 WARN DAGScheduler: Broadcasting large task binary with size 1145.2 KiB


+-------+-----+---------+--------------------+----------+
| app_id|label|review_id|         review_text|prediction|
+-------+-----+---------+--------------------+----------+
|1742020|    1|138507426|No one reads the ...|       1.0|
|1742020|    1|138506738|Never in my life ...|       1.0|
+-------+-----+---------+--------------------+----------+

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|1940340|    0|138509031|Unfinished game w...|
|1940340|    1|138508821|You will suffer. ...|
|1940340|    1|138508636|He may look like ...|
+-------+-----+---------+--------------------+



23/05/19 00:59:49 WARN DAGScheduler: Broadcasting large task binary with size 1145.2 KiB
23/05/19 00:59:49 WARN DAGScheduler: Broadcasting large task binary with size 1145.2 KiB


+-------+-----+---------+--------------------+----------+
| app_id|label|review_id|         review_text|prediction|
+-------+-----+---------+--------------------+----------+
|1940340|    0|138509031|Unfinished game w...|       0.0|
|1940340|    1|138508821|You will suffer. ...|       1.0|
|1940340|    1|138508636|He may look like ...|       1.0|
+-------+-----+---------+--------------------+----------+

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|1940340|    1|138507971|I like how devs e...|
|1940340|    1|138507795|            gud game|
+-------+-----+---------+--------------------+



23/05/19 00:59:51 WARN DAGScheduler: Broadcasting large task binary with size 1145.2 KiB
23/05/19 00:59:51 WARN DAGScheduler: Broadcasting large task binary with size 1145.2 KiB


+-------+-----+---------+--------------------+----------+
| app_id|label|review_id|         review_text|prediction|
+-------+-----+---------+--------------------+----------+
|1940340|    1|138507971|I like how devs e...|       1.0|
|1940340|    1|138507795|            gud game|       1.0|
+-------+-----+---------+--------------------+----------+

