# Streaming Movie Reviews

In this hands-on exercise we will look at data, which is not bounded. In many applications data is continuously updated. The data that we will be working with comes from the Internet Movie Database (IMDB) app. Users who have set their app up to connect with Twitter will automatically produce a tweet everytime they rate a movie in the app. It is possible to subscribe to tweets as they are produced, but for simplicity we will simulate this process by streaming historic data.

To work with streaming data in Spark we need to create a StreamingContext that plays a similar role to the SparkContext of a batch application. We also need to set an interval of how often we want to process data. Here we will set it to 10 seconds.

In [17]:
from pyspark.streaming import StreamingContext
batch_interval=10
stream_context = StreamingContext(sc, batch_interval)

We can now manipulate the streaming data similarly to what we would do with batch data, but the difference is that the processing is repeated every 10 seconds with the data that have arrived since last run.

We are faking the stream of reviews by hooking up to a bucket.

In [18]:
stream_of_reviews=stream_context.textFileStream("gs://big-data-streaming-examples")

In [19]:

local_data={}
local_data["total_count"]=0
local_data["one_line"]=""
local_data["latest_processing_time"]=""



def count_and_keep_one(time, rdd):
    data=rdd.collect()
    local_data["latest_processing_time"]=time
    local_data["total_count"] += len(data)
    if len(data)>0:
        local_data["one_line"]=data[0]
    

stream_of_reviews.map(lambda x: x).foreachRDD(count_and_keep_one)

In [28]:
print("Number of lines processed: "+str(local_data["total_count"]))
print("Latest processing time: "+str(local_data["latest_processing_time"]))
print("Example of a line from latest batch: "+local_data["one_line"])

Number of lines processed: 2520
Latest processing time: 2019-04-06 16:35:10
Example of a line from latest batch: {"movie": "Killer Elite (2011)", "time": "2013-05-16 19:47:04", "user": "134511613", "timestamp": 1368733624, "rating": 6}


In [21]:
stream_context.start()

In [15]:
stream_context.stop(False)

In [None]:
from pyspark.streaming import StreamingContext
batchIntervalSeconds = 10
ssc = StreamingContext(sc, batchIntervalSeconds)
lines=ssc.textFileStream("gs://big-data-streaming-examples")
ratings=lines.map(lambda x: json.loads(x)["rating"]).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

rating_dist={}
for i in range(11):
    rating_dist[i]=0
    
def process(time, rdd):
    for r in rdd.collect():
        rating_dist[r[0]]+=r[1]
    
ratings.foreachRDD(process)


In [None]:
ssc.start()

In [None]:
ssc.stop()

In [11]:
rating_dist

NameError: name 'rating_dist' is not defined

In [None]:
latest_review

In [None]:
import json

In [None]:
import matplotlib.pyplot as plt; plt.rcdefaults()
import numpy as np
import matplotlib.pyplot as plt

r = range(11)

frequency = list(map(lambda x: rating_dist[x], r))
 
plt.bar(r, frequency, align='center', alpha=0.5)
plt.xticks(r, r)
plt.ylabel('Usage')
plt.title('Rating frequency')
 
plt.show()
