In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import operator
import numpy as np
import matplotlib.pyplot as plt


def main():
    
    conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
    sc = SparkContext(conf=conf)
    ssc = StreamingContext(sc, 10)   # Create a streaming context with batch interval of 10 sec
    ssc.checkpoint("checkpoint")


    pwords = load_wordlist("positive.txt")
    nwords = load_wordlist("negative.txt")
    counts = stream(ssc, pwords, nwords, 100)
    make_plot(counts)


def make_plot(counts):
    """
    Plot the counts for the positive and negative words for each timestep.
    Use plt.show() so that the plot will popup.
    """
    ### YOUR CODE HERE
    p=[]
    n=[]
    s=[]
    for i in counts:
        p.append(i[0][1])
        n.append(i[1][1])
    s=list(range(len(p)))
    plt.xticks(np.arange(min(s), max(s)+1, 1.0))
    plt.plot(s,p,'bo-',label='positive')
    plt.plot(s,n,'go-',label='negative')
    plt.axis([0, 11, 0, 300])
    plt.legend(loc='upper left')
    plt.xlabel('Time step')
    plt.ylabel('Word count')
    plt.savefig('plot.png')
    plt.show()


def load_wordlist(filename):
    """ 
    This function should return a list or set of words from the given filename.
    """
    # YOUR CODE HERE
    words=sc.textFile(filename)
    return words.collect()


def updateFunction(newValues, runningCount):
    if runningCount is None:
            runningCount = 0
    return sum(newValues, runningCount)  

def stream(ssc, pwords, nwords, duration):
    kstream = KafkaUtils.createDirectStream(
        ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
    tweets = kstream.map(lambda x: x[1].encode("ascii","ignore"))

    # Each element of tweets will be the text of a tweet.
    # You need to find the count of all the positive and negative words in these tweets.
    # Keep track of a running total counts and print this at every time step (use the pprint function).
    # YOUR CODE HERE
    words = tweets.flatMap(lambda x:x.split(' '))
    
    def judge(a):
        a=a.lower()
        if a in pwords:
            return ('positive',1)
        if a in nwords:
            return ('negative',1)
        else:
            return ('none',1)
            
    c=words.map(judge).filter(lambda x: (x[0]=='positive' or x[0]=='negative')).reduceByKey(lambda x,y:x+y)
    # Let the counts variable hold the word counts for all time steps
    # You will need to use the foreachRDD function.
    # For our implementation, counts looked like:
    #   [[("positive", 100), ("negative", 50)], [("positive", 80), ("negative", 60)], ...]
    # YOUR CODE HERE
    cc=c.updateStateByKey(updateFunction)
    cc.pprint()
    
    counts=[]
    c.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))
    
    ssc.start()                         # Start the computation
    ssc.awaitTerminationOrTimeout(duration)
    ssc.stop(stopGraceFully=True)

    return counts


if __name__=="__main__":
    main()

