In [4]:
import time, json, boto3, re
from dateutil import parser, tz
from datetime import datetime, timedelta
from sentiment import *
from pyspark.sql import SQLContext, Row
import pyspark.sql.functions as sqlfunc
from pyspark.sql.types import *
from tempfile import NamedTemporaryFile


search_terms = []
n_parts = 10

def get_search_json(fname):
    f = open(fname,'r')
    rawdata = f.readlines()
    f.close()
    jdata = json.loads(rawdata[0])
    return jdata

def pool_search_terms(j):
    ''' Short recursive routine to pull out all search terms in search-terms.json '''
    if isinstance(j,dict):
        for j2 in j.values():
            pool_search_terms(j2)
    else:
        search_terms.extend( j )
    return search_terms

def is_cluster_running():
    import boto3
    client = boto3.client('emr')

    ''' list_clusters() is used here to find the current cluster ID
        WARNING: this is a little shaky, as there may be >1 clusters running in production
                 better to search by cluster name as well as state
    '''
    clusters = client.list_clusters(ClusterStates=['RUNNING','WAITING','BOOTSTRAPPING'])['Clusters']

    clusters_exist = len(clusters) > 0
    if clusters_exist:
        cid = clusters[0]['Id']
    else:
        cid = None
    return clusters_exist, cid



def make_json_nostream(tweet):
    ''' Get stringified JSOn from Kafka, attempt to convert to JSON '''
    try:
        return json.loads(tweet.decode('utf-8'))
    except:
        return "error"+str(tweet.decode('utf-8'))


def filter_tweets(item,terms):
    ''' Filters out the tweets we do not want.  Filters include:
            * No non-tweets (eg. delete commands)
            * No retweets 
            * English language only
            * No tweets with links
                - We need to check both entities and media fields for this (is that true?) 
            * Matches at least one of the provided search terms '''
    # Define regex pattern that covers all search terms
    pattern = '|(\s|#|@)'.join(terms)
    return (isinstance(item,dict) and 
            ('delete' not in item.keys()) and
            ('retweeted_status' not in item.keys())                           and 
            (item['lang']=='en')                       and
            (len(item['entities']['urls'])==0)                   and
            ('media' not in item['entities'].keys()) and
            (re.search(pattern,item['text'],re.I) is not None)
           )


def get_relevant_fields(item,json_terms,debate_party):
    ''' Reduce the full set of metadata down to only those we care about, including:
            * timestamp
            * username
            * text of tweet 
            * hashtags
            * geotag coordinates (if any)
            * location (user-defined in profile, not necessarily current location)
    '''
    cands = json_terms['candidates'][debate_party]
    mentioned = []
    # loop over candidates, check if tweet mentions each one
    for name, terms in cands.items():
        p = '|(\s|#|@)'.join(terms) # regex allows for # hashtag, @ mention, or blank space before term
        rgx = re.search(p,item['text'],re.I)
        if rgx: # if candidate-specific search term is matched
            mentioned.append( name ) # add candidate surname to mentioned list
    if len(mentioned) == 0: # if no candidates were mentioned specifically
        mentioned.append( "general" ) # then tweet must be a general reference to the debate
    return (item['id'], 
            {"timestamp":      time.strftime('%Y-%m-%d %H:%M:%S', time.strptime(item['created_at'],'%a %b %d %H:%M:%S +0000 %Y')),
             "username":       item['user']['screen_name'],
             "text":           item['text'].encode('utf8').decode('ascii','ignore'),
             "hashtags":       [el['text'].encode('utf8').decode('ascii','ignore') for el in item['entities']['hashtags']],
             "first_term":     mentioned[0],
             "search_terms":   mentioned,
             "multiple_terms": len(mentioned) > 1
            }
           )


def make_row(d,doPrint=False):
    tid = d[0]
    tdata = d[1]

    return Row(id             =tid,
               username       =tdata['username'],
               timestamp      =tdata['timestamp'],
               hashtags       =tdata['hashtags'] if tdata['hashtags'] is not None else '',
               text           =tdata['text'],
               search_terms   =tdata['search_terms'],
               multiple_terms =tdata['multiple_terms'],
               first_term     =tdata['first_term']
              )

def process(rdd,json_terms,debate_party,domain_name='sentiment',n_parts=10):

    rdd.cache()

    try:

        candidate_dict = {}
        candidate_names = json_terms['candidates'][debate_party].keys()
        candidate_names.append( 'general' )        

        for candidate in candidate_names:
            candidate_dict[candidate] =   {'party':debate_party if candidate is not 'general' else 'general',
                                          'num_tweets':'0',
                                          'sentiment_avg':'',
                                          'sentiment_std':'',
                                          'highest_sentiment_tweet':'',
                                          'lowest_sentiment_tweet':''
                                         }

        # default settings remove words scored 4-6 on the scale (too neutral). 
        # adjust with kwarg stopval, determines 'ignore spread' out from 5. eg. default stopval = 1.0 (4-6)
        labMT = emotionFileReader() 

        # Get the singleton instance of SQLContext
        sqlContext = getSqlContextInstance(rdd.context)

        schema = StructType([StructField("first_term",      StringType()           ),
                             StructField("hashtags",        ArrayType(StringType())),
                             StructField("id",              IntegerType()          ),
                             StructField("multiple_terms",  BooleanType()          ),
                             StructField("search_terms",    ArrayType(StringType())),
                             StructField("text",            StringType()           ),
                             StructField("timestamp",       StringType()           ),
                             StructField("username",        StringType()           )
                            ]
                           )
        # Convert RDD[String] to RDD[Row] to DataFrame
        row_rdd = rdd.map(lambda data: make_row(data))
        df = sqlContext.createDataFrame(row_rdd, schema)

        # how many tweets per candidate per batch?
        df2 = (df.groupBy("first_term")
                 .count()
                 .alias('df2')
              )

        counts = (df2.map(lambda row: row.asDict() )
                     .map(lambda row: (row['first_term'],row['count']))
                  )
        #print 'counts collect'
        #print counts.collect()

        cRdd = rdd.context.parallelize( candidate_names, n_parts )

        def update_dict(d):
            data = d[0]
            data['num_tweets'] = str(d[1]) if d[1] is not None else data['num_tweets']
            return data 

        tmp = (cRdd.map( lambda c: (c, candidate_dict[c]), preservesPartitioning=True )
                               .leftOuterJoin( counts, numPartitions=n_parts )
                               .map( lambda data: (data[0], update_dict(data[1])) )
                               .collect()
                    )
        candidate_dict = { k:v for k,v in tmp }
        # Register as table
        df.registerTempTable("tweets")
        # loop over candidates, check if tweet mentions each candidate
        for candidate in candidate_names:

            accum = rdd.context.accumulator(0)

            query = "SELECT text FROM tweets WHERE first_term='{}'".format(candidate)

            result = sqlContext.sql(query)
            try:
                scored = result.map( lambda x: (emotion(x.text,labMT), x.text) ).cache()
            except Exception,e:
                print 'nothing in scored'
                print str(e)

            scored.foreach(lambda x: accum.add(1))

            if accum.value > 0:
                accum2 = rdd.context.accumulator(0)
                try:
                    scored = scored.filter(lambda score: score[0][0] is not None).cache()
                except Exception,e:
                    print 'nothing left after filtering out no-scores'

                scored.foreach(lambda x: accum2.add(1))
                if accum2.value > 1: # we want at least 2 tweets for highest and lowest scoring
                    high_parts = scored.takeOrdered(1, key = lambda x: -x[0][0])[0]
                    high_scores, high_tweet = high_parts
                    
                    #print 'high scores'
                    #print high_scores
                    high_avg = str(high_scores[0])
                    high_tweet = high_tweet.encode('utf8').decode('ascii','ignore')
                    
                    low_parts  = scored.takeOrdered(1, key = lambda x:  x[0][0])[0]
                    
                    low_scores, low_tweet = low_parts
                    #print 'low scores'
                    #print low_scores
                    low_avg = str(low_scores[0])
                    low_tweet = low_tweet.encode('utf8').decode('ascii','ignore')

                else:
                    high_avg = low_avg = high_tweet = low_tweet = ''


                candidate_dict[candidate]['highest_sentiment_tweet'] = '_'.join([high_avg,high_tweet])
                candidate_dict[candidate]['lowest_sentiment_tweet']  = '_'.join([low_avg,low_tweet])  

                sentiment = (result.map(lambda x: (1,x.text))
                                        .reduceByKey(lambda x,y: ' '.join([str(x),str(y)]))
                                        .map( lambda text: emotion(text[1],labMT) )
                                        .collect()
                                 )
                try:
                    sentiment_avg, sentiment_std = sentiment[0]
                except Exception,e:
                    print str(e)
                    print 'sentiment is empty'

                candidate_dict[candidate]['sentiment_avg'] = str(sentiment_avg)
                candidate_dict[candidate]['sentiment_std'] = str(sentiment_std) 

        attrs = []
        import boto3,json
        client = boto3.client('sdb')

        for cname,cdata in candidate_dict.items():
            attrs.append( {'Name':cname,'Value':json.dumps(cdata),'Replace':True} )

        try:
            # write row of data to SDB
            client.put_attributes(
                DomainName= domain_name,
                ItemName  = str(time.time()),
                Attributes= attrs 
            ) 
        except Exception,e:
            print 'sdb write error: {}'.format(str(e))

        #rdd.foreachPartition(lambda p: write_to_db(p,level='group'))
    except Exception, e:
        print 
        print 'THERE IS AN ERROR!!!!'
        print str(e)
        print
        pass


def update_tz(d,dtype):
    ''' Updates time zone for date stamp to US EST (the time zone of the debates) '''
    def convert_timezone(item):
        from_zone = tz.gettz('UTC')
        to_zone = tz.gettz('America/New_York')
        dt = parser.parse(item['timestamp'])
        utc = dt.replace(tzinfo=from_zone)
        return utc.astimezone(to_zone)
    
    if dtype == "sql":
        return Row(id=d[0], time=convert_timezone(d[1]))
    elif dtype == "pandas":
        return convert_timezone(d[1])


# From Thouis 'Ray' Jones CS205
def quiet_logs(sc):
    ''' Shuts down log printouts during execution '''
    logger = sc._jvm.org.apache.log4j
    logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)
    logger.LogManager.getLogger("akka").setLevel(logger.Level.WARN)
    logger.LogManager.getLogger("amazonaws").setLevel(logger.Level.WARN)




# from docs: http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations
def getSqlContextInstance(sparkContext):
    ''' Lazily instantiated global instance of SQLContext '''
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
    return globals()['sqlContextSingletonInstance']



In [8]:
codec = "org.apache.hadoop.io.compress.GzipCodec"

party_of_debate = "dem"

hours = [20,21,22,23]

head_path = "/Users/andrew/git-local/"
data_path = "data/gardenhose/"
reduced_path = "data/reduced/"
debate_date = "/oct13/"
debate_date_numeric = "2015-10-13-"

path = head_path + data_path + party_of_debate + debate_date + debate_date_numeric
tail = "*.gz"


search_terms = []
search_json_fname = head_path+'search-terms.json'
# Load nested JSON of search terms
jdata = get_search_json(search_json_fname)
# Collect all search terms in JSON into search_terms list
search_terms = set(pool_search_terms(jdata))


output = []

In [9]:
for hour in range(19,24):
    full_path = path + str(hour) + tail
    rdd = sc.textFile(full_path,8)
    filtered = (rdd.map(make_json_nostream) 
            .filter(lambda tweet: filter_tweets(tweet,search_terms))
            .map(lambda tweet: get_relevant_fields(tweet,jdata,party_of_debate))
            .cache()
            )
    output = filtered.collect()
    fname = head_path + reduced_path + party_of_debate + debate_date + str(hour) + ".txt"
    f = open(fname, 'w')
    for row in output:
        row[1]['tweet_id'] = row[0]
        tdata = row[1]
        f.write(json.dumps(tdata))
    f.close()
   