### Install requirements

In [None]:
# !pip install textblob 
# !pip install pyspark

### Import libraries

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import StreamingKMeans
from pyspark.mllib.linalg import Vectors
from textblob import TextBlob
from sklearn.preprocessing import LabelEncoder
import json
import requests
import warnings
warnings.filterwarnings("ignore")


### Set up the environment

In [None]:
HOST = "localhost"
STREAM_PORT = 9999


### Spark session

In [None]:
sc = SparkContext.getOrCreate()
sc.setCheckpointDir("spark_checkpoint")
ssc = StreamingContext(sc, 10)
submissions = ssc.socketTextStream(HOST, STREAM_PORT)
sc.setLogLevel("ERROR")


### Process data

In [None]:
def process_submission(message):

    submission = json.loads(message)

    title = submission['message']
    metadata = submission['metadata']
    author = metadata['author_name']
    date = metadata['date']
    score = metadata['score']
    num_comments = metadata['num_comments']
    upvote_ratio = metadata['upvote_ratio']
    text = metadata['text']
    subreddit_name = metadata['subreddit_name']

    title_polarity, title_subjectivity = TextBlob(title).sentiment
    text_polarity, text_subjectivity = TextBlob(text).sentiment

    return {
        'title': title,
        'text': text,
        'author': author,
        'date': date,
        'score': score,
        'num_comments': num_comments,
        'upvote_ratio': upvote_ratio,
        'text': text,
        'subreddit_name': subreddit_name,
        'subreddit_hash': hash(subreddit_name),
        'title_polarity': title_polarity,
        'title_subjectivity': title_subjectivity,
        'text_polarity': text_polarity,
        'text_subjectivity': text_subjectivity
    }


### Add processing to the pipeline

In [None]:
submissions = submissions.map(process_submission)
# submissions.pprint()
training_data = submissions.map(lambda x: Vectors.dense(
    [x['title_polarity'], x['title_subjectivity'], x['text_polarity'], x['text_subjectivity'], x["subreddit_hash"]]))


### Cluster the data

In [None]:
k = 4
model = StreamingKMeans(k, decayFactor=1.0).setRandomCenters(5, 1.0, 0)
model.trainOn(training_data)
result = model.predictOn(training_data)


### Window operation

In [None]:
pairs = result.map(lambda cluster: (cluster, 1))
# window of size 30s, and slides by 10s (very arbitrary)
cluseter_counts = pairs.reduceByKeyAndWindow(
    lambda x, y: x + y, lambda x, y: x - y, 30, 10)
cluseter_counts.pprint()


### Visualization

In [None]:
# create viz.json if doesn't exist and set every cluster size to 0
with open("viz.json", "w") as f:
    total_data = {}
    for i in range(k):
        total_data[str(i)] = 0
    json.dump(total_data, f)


In [None]:
def update_cluster_sizes(rdd):
    if not rdd.isEmpty():
        for x in rdd.collect():
            with open("viz.json", "r") as f:
                total_data = json.load(f)
            total_data[str(x[0])] += x[1]
            with open("viz.json", "w") as f:
                json.dump(total_data, f)


In [None]:
# Update the cluster sizes in sliding windows
cluseter_counts.foreachRDD(lambda rdd: update_cluster_sizes(rdd))


### Start the processing

In [None]:
ssc.start()
ssc.awaitTermination()


In [None]:
# ssc.stop(stopSparkContext=True, stopGraceFully=True)