In [29]:
%pylab inline

Populating the interactive namespace from numpy and matplotlib


In [18]:
from pyspark import SparkContext, SparkConf
from datetime import datetime
from timeit import timeit
from pyspark.sql import Row


In [32]:
tweetDataFrame = sqlContext.read.json('group_assignment/tweets/tweets_extended.json')
tweetDataFrame.printSchema()

root
 |-- created_at: long (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- media_url: string (nullable = true)
 |    |    |    |-- media_url_https: string (nullable = true)
 |    |    |    |-- sizes: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = tr

In [52]:
"""
PART: Select subset of data and parse dates
"""

"""
HELPER FUNCTIONS FOR cleaning and parsing date
"""
def clean_up_row(row):
    """
    cleans up row and return new row object
    """
    return Row(       
        id=row.id,
        hashtags=row.hashtags,
        hashtags_count = len(row.hashtags),
        text=row.text,
        created_at=convert_long_date(row.created_at),
        )

def convert_long_date(long_date):
    return datetime.fromtimestamp(long_date / 1e3)

def convert_date_string(date_string):
    """
    converts e.g. 2015-01-31 12:30:30 +0000
    NOTE: unused currently, but might come handy later..
    """
    if not date_string:
        return None
    return datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S +0000')


"""
Execute query
"""
tweet_data_frame = sqlContext.read.json('group_assignment/tweets/tweets_extended.json')
tweets_subset = tweet_data_frame.selectExpr('id', 'created_at','text','entities.hashtags.text as hashtags').rdd
parsed_tweets = tweets_subset.map(clean_up_row)

"""
PART: EXTRACTING THE MMA TWEETS
"""
import re

def filter_mma(row):
    """
    filters mma row
    """
    #variable contains list of meaningful MMA expressions, after cleaning expressions such as
    #u'TOMMARRS', u'DILEMMA', u'SUMMARY', u'MUHAMMAD_WAKAS',u'SUMMARIZING', u'SUMMARIZED' ....
    
    mma_accepted = ['MMA_BLOG', 'SMMMAGAZINE', 'MMA_CRM', 'MMA', 'BIG', 
                    'BIGDATA', 'SASANALYTICS', 'ANALYTICS', 'ANALYSIS', 
                    'RSTATS', 'STATFACT', 'AMSTATNEWS' 'STATISTICIANS', 
                    'STATS', 'STAT', 'STATISTICAL', 'MASTAT', 'DATASCIENCE', 'SCIENTIST'] 
    
    #start matching
    regex_p1 =  r'(\w*%s\w*)' % 'MMA'
    regex_p2 =  r'(\w*%s\w*)' % 'BIG'
    regex_p3 =  r'(\w*%s\w*)' % 'ANALY'
    regex_p4 =  r'(\w*%s\w*)' % 'STAT'
    regex_p5 =  r'(\w*%s\w*)' % 'SCIENTIST'
    regex_p5 =  r'(\w*%s\w*)' % 'DATASCIENCE'
    results = re.findall(regex_p1, row.text.upper()) + re.findall(regex_p2, row.text.upper()) + re.findall(regex_p3, row.text.upper()) + re.findall(regex_p4, row.text.upper())
    #now see if one of the expression is an 'accepted' expression
    results = [r for r in results if r in mma_accepted]
    
    if len(results) > 0:
        return True
    return False


"""
Execute query
"""
mma_tweets = parsed_tweets.filter(filter_mma)


In [55]:
"""
DOC VOOR STIJN
"""
#altijd zeker zijn data data gesorteerd is. Vermits alles hier distributed gebeurt
# weet ik niet of orde nog behouden wordt... das dus op te zoeken in de doc. Of gewoon doen en niets van zeggen
# tot ge ooit een cluster platlegt....
mma_tweets = mma_tweets.sortBy(lambda x: x.created_at) #sort these bitches out cause I dunno what you crazy RDD functions did to them
#de eerste pakken kunt ge dus
print mma_tweets.take(1)

#de laatste zou ge (als ge binnen RDD blijft) denk ik zou nemen
print mma_tweets.sortBy(lambda x: x.created_at, ascending=False).take(1)

#maar dus collect()[-1] kan ook vermits er niet echt performance issues zijn

#hoe ge van een nieuwe DF maakt met RDD
new_df = sqlContext.createDataFrame(mma_tweets)
#zoals ge ziet werkt select terug
print new_df.selectExpr('created_at as bitchin_mother_fucker').take(1)



felix
[Row(created_at=datetime.datetime(2009, 10, 27, 0, 0), hashtags=[u'M2009'], hashtags_count=1, id=5203805716L, text=u'Will Neafsey (Ford Motor Company) at SAS #M2009 is talking about using analytics in difficult economic times... great opportunities')]
hello
[Row(created_at=datetime.datetime(2015, 11, 15, 20, 31, 40), hashtags=[u'Deep', u'SC15', u'MLHPC2015', u'HPC', u'BigData', u'neuralnetworks'], hashtags_count=6, id=665990572323115008L, text=u'RT @HPC_Guru: Why #Deep Learning works? @ctnzr at #SC15 in #MLHPC2015 workshop \n#HPC #BigData #neuralnetworks  https://t.co/cka7MPs9Jt via \u2026')]
[Row(bitchin_mother_fucker=datetime.datetime(2009, 10, 27, 0, 0))]
