In [None]:
from pyspark import SparkContext
sc = SparkContext("local[2]", "twitter_stream")

In [None]:
execfile("shell_init.py")

In [None]:
CLUSTER_TWEET_NUM = 10
CLUSTER_MATCH_RATIO = 0.6

In [None]:
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext

import hashlib
import json
import re
import psycopg2

with open('AFINN-111.txt') as f:
    sent_dict = [l.split('\t') for l in f]
    sent_dict = {l[0]:int(l[1]) for l in sent_dict}
broadcast_sent_dict = sc.broadcast(sent_dict)

with open('taxonomy_dict.json') as f:
    s = f.read()
    taxonomy_dict = eval(s)
broadcast_taxonomy_dict = sc.broadcast(taxonomy_dict)

with open('english_stopwords.txt') as f:
    stopwords = [l[:-1] for l in f]
broadcast_stopwords = sc.broadcast(stopwords)

In [None]:
ssc = StreamingContext(sc, 5)
zookeeper = ("localhost:2181")
kafka_stream = KafkaUtils.createStream(ssc, zookeeper, "twitter", {"twitter_api_stream":1})

tweets = kafka_stream.map(lambda (x,y):json.loads(y))

tokenize = re.compile('(?u)\\b[a-z][a-z]+\\b')
def tokenize_foo(tweet):
    tweet.update({'tokens':tokenize.findall(tweet['text'].lower())})
    return tweet
tweets = tweets.map(tokenize_foo)

def include_sent(tweet):
    toks = tweet['tokens']
    sent_dict = broadcast_sent_dict.value
    sent = [sent_dict[tok] for tok in toks if tok in sent_dict]
    sent = sum(sent)/float(len(sent)+2)
    tweet.update({'sentiment':sent})
    return tweet
tweets = tweets.map(include_sent)

In [None]:
def get_taxonomy(t):
    text = t['text'].lower()
    text = text.replace("'","").replace("#","").replace("@","")
    taxonomy_dict = broadcast_taxonomy_dict.value
    category = ''
    for cat,kws in taxonomy_dict.iteritems():
        for kw in kws:
            if kw in text:
                category = category+'|'+cat+'|'
                break
    t.update({'categories':category})
    return t
tweets = tweets.map(get_taxonomy)

In [None]:
def check_for_cluster(tweet, con, cur):
    tokens = tweet['tokens']
    tokens = [tok for tok in tokens if tok not in broadcast_stopwords.value]
    tokens_int = []
    for tok in tokens:
        m = hashlib.md5()
        m.update(tok)
        tokens_int.append(int(m.hexdigest()[-7:], 16))
    
    # match words
    tweet_ids = []
    tweet_cluster_id = {}
    for tok in tokens_int:
        cur.execute('select * from word_ids where word_id='+str(tok))
        rows = cur.fetchall()
        for row in rows:
            tweet_cluster_id.update({row[0]:row[2]})
            tweet_ids.append(row[0])
    tweet_ids_acc = {t:0 for t in set(tweet_ids)}
    for tweet_id in tweet_ids:
        tweet_ids_acc[tweet_id] += 1
    
    # Check for cluster
    match_threshold = int(len(tokens_int)*CLUSTER_MATCH_RATIO)
    cluster_acc = {c:0 for c in set(tweet_cluster_id.values())}
    bigest_cluster = {'id':0,'n':0}

    for tweet_id,kw_num in tweet_ids_acc.iteritems():
        if kw_num > match_threshold:
            cluster_id = tweet_cluster_id[tweet_id]
            cluster_acc[cluster_id] += 1
            if bigest_cluster['n'] < cluster_acc[cluster_id]:
                bigest_cluster['id'] = cluster_id
                bigest_cluster['n'] = cluster_acc[cluster_id]
    
    if bigest_cluster['id'] > 0:
        # for existed cluster we lower threshold
        CLUSTER_TWEET_NUM_2 = CLUSTER_TWEET_NUM*0.8
    else:
        CLUSTER_TWEET_NUM_2 = CLUSTER_TWEET_NUM
    
    if bigest_cluster['n'] > CLUSTER_TWEET_NUM_2:
        if bigest_cluster['id'] == 0:
            ## NEW CLUSTER DETECTED
            same_cluster_tweet_ids = [t_id for t_id,kw_n in tweet_ids_acc.iteritems() if kw_n>match_threshold]
            
            cur.execute('select * from cluster_ids')
            all_cluster_ids = cur.fetchall()
            if len(all_cluster_ids) > 0:
                cluster = max(all_cluster_ids)+1
            else:
                cluster = 1
            
            cur.execute("UPDATE word_ids SET cluster_id="+str(cluster)+
                "WHERE id_str=ANY(%s);",(same_cluster_tweet_ids,))
            cur.execute("INSERT INTO cluster_ids VALUES ("+str(cluster)+");")
            con.commit()
        else:
            cluster = bigest_cluster['id']
    else:
        cluster = 0
    
    for tok in tokens_int:
        query = "INSERT INTO word_ids (id_str,word_id,cluster_id) VALUES (%s,%s,%s);"
        cur.execute(query, (tweet['id_str'],tok,cluster))
    con.commit()
    
    return cluster

In [None]:
def send_partition(part):
    con = psycopg2.connect("dbname='twitter' user='postgres'")
    cur = con.cursor()
    for tweet in part:
        check_for_cluster(tweet, con, cur)
        
        field_names = ['id_str', 'categories', 'in_reply_to_user_id', 'sentiment', 
                       'text', 'created_at', 'user_location', 'in_reply_to_status_id',
                       'in_reply_to_screen_name', 'retweet_count', 'favorite_count',
                       'user_name', 'user_screen_name', 'user_id_str']
        cur.execute("""
        INSERT INTO stream
        (
        id_str, categories, in_reply_to_user_id, sentiment, text, created_at, user_location, in_reply_to_status_id,
        in_reply_to_screen_name, retweet_count, favorite_count, user_name, user_screen_name, user_id_str)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);""",[tweet[k] for k in field_names])
    con.commit()
    con.close()
    
tweets.foreachRDD(lambda rdd: rdd.foreachPartition(send_partition))

In [None]:
def printo(t):
    #t = {k:v for (k,v) in t.iteritems() if k in {'sentiment', 'text','categories'}}
    if len(t['categories']) > 0:
        return t
    else:
        return ''
#tweets.map(printo).pprint()
tweets.pprint()

#ssc.awaitTermination(2)
ssc.start()

In [None]:
ssc.stop()