In [93]:
from __future__ import division
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import time

from pyspark.sql import functions as F

-------------------------------------------
Time: 2016-04-29 20:44:15
-------------------------------------------



In [94]:
sqlContext = SQLContext(sc)

In [95]:

dataset = sqlContext.read.json("file:///home/bdm/twitter.small")

In [96]:
def user_data(tweet):
    username = tweet.user.screen_name
    favs = tweet.favorite_count
    rts = tweet.retweet_count
    
    return (username, (1.0, float(favs+rts)))

def movie_data(tweet):
    title = tweet.entities.urls[0].display_url
    favs = tweet.favorite_count
    rts = tweet.retweet_count
    
    return (title, (1.0, float(favs+rts)))

userset = dataset.map(user_data)
movieset = dataset.map(movie_data)

#print type(movieset)
#print userset.collect()[:5]
#print movieset.collect()[:5]

In [97]:
batch_interval = 15
window_interval = batch_interval * 2
sliding_interval = batch_interval
ssc = StreamingContext(sc, batch_interval)

In [98]:
def saveUser(time, rdd):
    try:
        from pyspark.sql import SQLContext, Row
        sqlContext = SQLContext(sc)
        """
            Input: (username, (ignored, engaged))
        """
        df = sqlContext.createDataFrame(rdd.map(\
        lambda row: Row(time=time, username=row[0], engaged=row[1][1], ignored=row[1][0])))
        
        df.show()

        df.write.format("org.apache.spark.sql.cassandra")\
        .options(table="usercounts", keyspace="assignment")\
        .save(mode="append")
        
    except:
        pass
    
def saveMovie(time, rdd):
    try:
        from pyspark.sql import SQLContext, Row
        sqlContext = SQLContext(sc)
        """
            Input: (title, (popularity, engagement))
        """
        df = sqlContext.createDataFrame(rdd.map(\
        lambda row: Row(time=time, title=row[0], engagement=row[1][1], popularity=row[1][0])))
        
        df.show()
        
        df.write.format("org.apache.spark.sql.cassandra")\
        .options(table="moviecounts", keyspace="assignment")\
        .save(mode="append")
    except:
        pass

In [99]:
def add_movies(l):
    """
        return (title, (popularity, engagement))
        popularity is 1.0
        engagement is the sum of retweets and favorites
    """
    n = l.split(",")
    return (n[3], (1.0, float(n[4]) + float( n[5]) ) )

def add_users(l):
    """
        return (title, (ignored, engaged))
        ignored is all teets engaged and ignored and so returns 1
        engaged returns 1 only if the tweet received a retweet or favorite
        engagement is the sum of retweets and favorites
    """
    n = l.split(",")
    #engaged = 1.0 if float(n[4]) + float( n[5]) > 0 else 0.0
    return (n[6], (1.0, float(n[4]) + float( n[5]) ))

def add_tuple(t,u):
    """
        to avoid null values one of 2 solutions may have worked
        1. default values for when the other window does not have a matching key
        2. using floats from the beginning as I think pyspark may commit int to
        the schema if it sees one early
    """
    a = t if t else (0.0,0.0)
    b = u if u else (0.0,0.0)
    
    # calculate average of tuples
    # could have used ((x+y)/2 for x,y in zip(t,u)) but am afraid to push Cassandra
    return ( (a[0] + b[0])/2, (a[1] + b[1])/2 )

def add_kv_tuple(t):
    """
        (u'isaac_singer', ((1.0, 0.0), None))

    """
    a = t[1][0] if t[1][0] else (0.0,0.0)
    b = t[1][1] if t[1][1] else (0.0,0.0)
    return (t[0], ( a[0] + b[0],  a[1] + b[1])  )

In [100]:
lines = ssc.textFileStream("hdfs:///user/bdm/speed")
movies = lines.map(lambda line: add_movies(line)).window(window_interval, sliding_interval)
#movies.pprint()
m = movies.reduceByKey( add_tuple  ).transform(lambda rdd: rdd.leftOuterJoin(movieset))
mm = m.map(add_kv_tuple)

#mm.pprint()


users = lines.map(lambda line: add_users(line)).window(window_interval, sliding_interval)
#users.pprint()
u = users.reduceByKey( add_tuple ).transform(lambda rdd: rdd.leftOuterJoin(userset))
u.pprint()
uu = u.map(add_kv_tuple)
uu.pprint()

#m.pprint()
#u.pprint()


mm.foreachRDD(saveMovie)
uu.foreachRDD(saveUser)

# Output
Shows users after the join and how this is mapped to null. Movies is the exact same.  
  
The data going into both tables is shown

In [101]:
ssc.start()

-------------------------------------------
Time: 2016-04-29 20:46:15
-------------------------------------------

-------------------------------------------
Time: 2016-04-29 20:46:15
-------------------------------------------

-------------------------------------------
Time: 2016-04-29 20:46:30
-------------------------------------------

-------------------------------------------
Time: 2016-04-29 20:46:30
-------------------------------------------

-------------------------------------------
Time: 2016-04-29 20:46:45
-------------------------------------------
(u'hsyn_inan', ((1.0, 0.0), None))
(u'mxwout', ((1.0, 0.0), None))
(u'roxannepena', ((1.0, 0.0), None))
(u'_Afinso_', ((1.0, 0.0), None))
(u'mortenbramsen', ((1.0, 0.0), None))
(u'TheOmegaTapes', ((1.0, 0.0), None))
(u'sebutiban', ((1.0, 0.0), None))
(u'LaitjeL', ((1.0, 0.0), None))
(u'StefanHulman', ((1.0, 0.0), None))
(u'PeppermintShore', ((1.0, 0.0), None))
...

-------------------------------------------
Time: 2016-04-

In [102]:
ssc.stop(False)

# Cassandra 

CREATE KEYSPACE assignment WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };

USE assignment;

CREATE TABLE userCounts (
time text,
username text,
ignored double,
engaged double,
primary key(time, username)
);

CREATE TABLE movieCounts (
time text,
title text,
popularity double,
engagement double,
primary key(time, title)
);


Below are the movieTable, userTable and the number of rows in each

<img src="Cassandra Movies Table.png">

<img src="Cassandra Users Table.png">

<img src="Cassandra Rows.png">

In real ife the feed would be fed by a live source where data would be written to the HDFS folder that the textFileStream was watching.