# Processing
For a test-app with a single producer (twitter-stream) and a single consumer (front-end) with no need to keep the data we go with **Redis** pub-sub as a message broker. (Redis is in-memory.) For production with multiple producers and consumers, high throughput, data persistance, and low tolerance for data loss we go with **Kafka**.

#### Redis:
* Fire and forget is ok
* Speed is a main concern
* Some data loss is tolerable
* Data size is not that big

#### Kafka:
* Reliability is a big concern
* Data persistence required
* Speed is not a big concern
* Data size is huge

In [16]:
!cat stream-redis.py

import os
import sys
import redis
import client


if  __name__ =='__main__':
    '''
    QUERY: keyword1,keyword2,...
    run: python stream-redis.py QUERY GEO LIM
    '''
    pub = redis\
        .StrictRedis(password = os.environ['REDIS_PASS'],
                     host = os.environ['REDIS_HOST'],
                     port = os.environ['REDIS_PORT'],
                     db = 0)
    query = ''
    if len(sys.argv) > 1:
        query = sys.argv[1]

    geo = False
    if len(sys.argv) > 2:
        geo = True

    lim = False
    if len(sys.argv) > 3 and int(sys.argv[3]) > 0:
        lim = int(sys.argv[3])

    twitter = client.TwitterClient()
    twitter.stream(query,
                   geo = geo,
                   broadcast = lambda data: pub.publish('twitter', data),
                   limit = lim)


### Distributed Computation
[](https://kafka-python.readthedocs.io/en/master/index.html)

In [None]:
#!pip install pyspark
#!pip install kafka-python

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
from collections import namedtuple

In [None]:
sc = SparkContext()

In [None]:
ssc = StreamingContext(sc, 10)
sqlContext = SQLContext(sc)
ssc.checkpoint('/tmp/checkpoint')

In [None]:
PORT = 9999
stream = ssc.socketTextStream(os.environ['NODEIP'], PORT)

In [None]:
lines = stream.window(10)
fields = ('tag', 'count')
Tweet = namedtuple('Tweet', fields)

In [None]:
import json

(lines.flatMap( lambda msg: json.loads(msg)['text'].split(' '))
    .filter( lambda word: word.lower().startswith('#'))
    .map( lambda word: ( word.lower(), 1 ))
    .reduceByKey( lambda a, b: a + b )
    .map( lambda rec: Tweet( rec[0], rec[1] ))
    .foreachRDD( lambda rdd: rdd.toDF().sort(desc('count'))
        .limit(10).registerTempTable('tweets')))

In [None]:
# python stream-socket.py QUERY GEO PORT
!cat stream-socket.py

In [None]:
ssc.start()

In [None]:
import time
from IPython import display
import matplotlib.pyplot as plt
%matplotlib inline

time.sleep(30)
top10 = sqlContext.sql('select tag, count from tweets')

plt.rcParams['figure.figsize'] = (16, 6)
fig, ax = plt.subplots()
top10 = top10.toPandas()
tags = top10['tag']
Y = [i for i in range(len(tags))]
X = top10['count']

ax.barh(Y, X, align='center', color='teal')
ax.set_yticks(Y)
ax.set_yticklabels(tags)
ax.invert_yaxis()
ax.set_xlabel('Count')
ax.set_title('Top Hashtags')

plt.show()