# Prepare spark 
I am using **HiveContext** that need an embedded database *stored on Current Working Directory*. Because of that you have to **shutdown others notebooks** that use HiveContext.

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext

conf = SparkConf().setAppName("Prova Streaming").setMaster("local[*]")
sc = SparkContext(conf = conf)
sqlCtx = HiveContext(sc)
print ( "Started spark version %s" % (sc.version) )

# Prepare chart environment

In [None]:
%matplotlib inline

# Install tweet library tweepy

In [None]:
! pip install tweepy

# Prepare python code for Tweeter streaming

## Buffered Tweet Receiver
This class receive message from tweeter and memorize them in a buffer.
Using the **retrieve_messages** method, a client, class, can get a DataFrame with message information. This operation also *flush the buffered messages*

This class has also some display capability; it show:
 - The number of messages in the buffer
 - Full dump of the last message

In [None]:
from tweepy.streaming import StreamListener
from ipywidgets import IntProgress
from ipywidgets import Textarea
from IPython import display
from random import randint

# See also http://adilmoujahid.com/posts/2014/07/twitter-analytics/
class BufferedTweeterMessageReceiver(StreamListener):
    
    def __init__(self):
        StreamListener.__init__(self) # - Call superclass constructor
        self._messages = [] # - Initially empty buffer
        self._init_display() 
    
    # - Create some widget
    def _init_display(self):
        # - show how many messages we have in the buffer
        self._progressbarr = IntProgress(description='Arrived Messages', min=0, max=1000)
        display.display(self._progressbarr)
        # - show last message
        self._text = Textarea(description='msg:')
        display.display(self._text)
    
    # - Called by tweepy library: here we receive messages
    def on_data(self, data):
        self._progressbarr.value = len(self._messages)
        self._text.value = data
        self._messages.append(data)
        return True
    
    # - Called by tweepy library: here we receive messages
    def on_error(self, status):
        print(status)
    
    # - Called from the streaming application to receive messages
    def retrieve_messages(self):
        messages = self._messages
        self._messages = []
        
        # messages_DF = sqlCtx.read.json( sc.parallelize(messages))
        # - I can't use the line above for an issue with python to java string convertion
        #   the temporary file is only one. So I can't run more applications concurrently
        fileName = 'tmp.json'
        with open(fileName, 'w') as data_file:    
            for msg in messages:
                data_file.write(msg)
        messages_DF = sqlCtx.read.json(fileName)
        return messages_DF
    

## Main Loop For streeming analysis
This class 
 - initialyze receiver and credential
 - start the tweeter connection *stream*
 - Enter in a loop of:
     * receive chunk of data
     * analyze received chunk of data
     * display something
 - Eventually stop the *connection stream* **and** the loop

In [None]:
from threading import Timer
from tweepy import OAuthHandler, Stream
from ipywidgets import Textarea, Image

import matplotlib.pyplot as plt
import json
import io

# - Skeleton for streaming
#   Keywords and period are configurable only from the constructor. 
#   We could do better.
class TweeterDataReceiver(object):
    
    def __init__(self, keywords, period=10):
        # load credential
        self._init_credential()
        # init message receiver before my widgets so the widgets of 
        # message receiver are inited before mine.
        self._tweeter_msg_rcv = BufferedTweeterMessageReceiver()
        self._init_widgets()
        
        self._keywords = keywords
        self._period = period
        self._stream = None
        self._stop = False
        
    def _init_widgets(self):
        # - Show analysis results history ...
        self._output = Textarea(description='Histories')
        display.display(self._output)
        # ... and charts
        self._img = Image(description='chat', width=480)
        display.display(self._img)
    
    
    # - see http://adilmoujahid.com/posts/2014/07/twitter-analytics/ Step 1
    def _init_credential(self):
        with open('credentials.json') as data_file:    
            credentials = json.load(data_file)
        self._auth = OAuthHandler( credentials['consumer_key'], credentials['consumer_secret'] )
        self._auth.set_access_token( credentials['access_token'], credentials['access_token_secret'] )
    
    
    # - Initialize the tweepy stream and start the loop
    def start(self):
        if self._stream:
            self.stop()
        self._data_history = {}
        self._chunk_counter = 0
        self._stream = Stream(self._auth, self._tweeter_msg_rcv)
        self._stream.filter(track=self._keywords, async=True)
        self._loop()
    
    # - Disconnect the tweepy stream and tell the toop to end
    def stop(self):
        if self._stream:
            self._stream.disconnect()
            self._stream = None
        self._stop = True
    
    # - Main loop 
    def _loop(self):
        if not self._stop:
            self._oneStep()
            def nextStep():
                self._loop()
            Timer(self._period, nextStep, ()).start() 
        else:
            print("Stop")
    
    
    # - This method describe the flow of each iteration
    def _oneStep(self):
        # Read the messages ...
        chunk_messages_dataframe = self._tweeter_msg_rcv.retrieve_messages()
        # ... compute some statistic ...
        chunk_analysis_result = self.analyze_one_chunk(chunk_messages_dataframe)
        # ... memeorize the result together with previous ones ...
        self._merge_analysis_history(chunk_analysis_result)
        # ... update widgets
        self.display_history(self._data_history)
    
    
    # - For each key in the dictionary chunk_analysis_result read the value and 
    #   append it in the same key of self._data_history
    def _merge_analysis_history(self, chunk_analysis_result):
        # - for each result information ...
        for key in chunk_analysis_result:
            if key not in self._data_history:
                self._data_history[key] = [0] * self._chunk_counter
            # ... add it in the right place in history
            value = chunk_analysis_result[key]
            self._data_history[key].append( value )
        
        # - Add necessary 0 to keep all history aligned
        for key in self._data_history:
            if len(self._data_history[key]) <= self._chunk_counter:
                self._data_history[key].append( 0 )
        
        # - Update history length
        self._chunk_counter += 1
    
    
    # - Actually only count the number of messages
    def analyze_one_chunk(self, chunk_DF):
        return {"n" : chunk_DF.count()}
    
    # - Write data into a text area and draw chart
    def display_history(self, data_history):
        
        # write log
        if len(self._output.value) > 0:
            self._output.value += "\n"
        self._output.value += json.dumps(data_history)
        self._output.scroll_to_bottom()
        
        # draw chart
        for key in data_history:
            plt.plot(data_history[key], label=key)
        plt.legend()
        
        # put che chart into an Image widget
        buf = io.BytesIO()
        plt.savefig(buf, format='png')
        plt.clf()
        plt.close()
        buf.seek(0)
        self._img.value = buf.read(100000)
        


# Start Streaming

In [None]:
app1 = TweeterDataReceiver(['scala', 'python'])
app1.start()

## Stop Streaming

In [None]:
app1.stop()

# Now we develop some analysis

## TODO: List languages

In [None]:
class LanguageAnalyzer(TweeterDataReceiver):
    
    def __init__(self, keywords, period=5):
        TweeterDataReceiver.__init__(self, keywords, period)
        
    
    def analyze_one_chunk(self, messages_DF):
        if messages_DF.count() > 0:
            # TODO: get the list of languages
        else :
            return {}


In [None]:
app2 = LanguageAnalyzer(['trump'], period=1)
app2.start()

In [None]:
app2.stop()

## TODO: Look for keyword inside message

### Function used during elaboration

In [None]:
def flatten(l):
    if l != None:
        return # TODO flatten a list of lists into a list
    else:
        return []

def contains(l, w):
    # TODO return 1 if w is an element of l; 0 otherwise

sqlCtx.registerFunction("MY_flatten", lambda l: flatten(l) )
sqlCtx.registerFunction("MY_contains", lambda l, w: contains(l, w) )

In [None]:
class TweetAnalyzer(TweeterDataReceiver):
    
    def __init__(self, keywords, period=10):
        TweeterDataReceiver.__init__(self, keywords, period)
        
    
    def analyze_one_chunk(self, messages_DF):
        if messages_DF.count() > 0:
            # TODO: follow the esercitation
        else :
            return {}


In [None]:
app3 = TweetAnalyzer(['trump', 'curry'], period=10)
app3.start()

In [None]:
app3.stop()

# Usefull for development

In [None]:
test_DF = app1._tweeter_msg_rcv.retrieve_messages()
test_DF.show()

In [None]:
messages_DF = test_DF
messages_DF.printSchema()
result_DF = messages_DF.select("id")
result_DF.show()
            