In [2]:
# -*- coding: utf-8 -*-
import ujson as json
import numpy as np
import pickle
from pyspark.sql import SQLContext
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType

# Load Raw Text
enc = lambda text: text.encode('utf-8')

def get_file_paths():
    file_list = '../Data/hw2-files.txt'
    with open(file_list, 'r') as f:
        for line in f:
            yield line.strip()            
    
text = sc.textFile(','.join(get_file_paths())).map(enc).cache()


def print_count(rdd):
    print 'Number of elements:', rdd.count()
    
print_count(text)

# Parse Json
INVALID = "INVALID"
def safe_parse(raw_json):
    try:
        ret = json.loads(raw_json)
        return (ret['user']['id_str'], ret['text'])
    except (ValueError, KeyError):
        return INVALID
    

def print_users_count(count):
    print 'The number of unique users is:', count
    
tweets = text.map(safe_parse).filter(lambda x: x != INVALID).cache()
# users = tweets.map(lambda x: x[0]).distinct()
users = tweets.map(lambda (user, text): user).distinct()
print_users_count(users.count())

Number of elements: 2193
The number of unique users is: 2083


Prefer named rather than index, **explicit** is better than implicit 
```py
# users = tweets.map(lambda x: x[0]).distinct()
users = tweets.map(lambda (user, text): user).distinct() 
```

In [3]:
from operator import add
# User Partition
def get_users_partition():
    with open('../Data/users-partition.pickle', 'r') as f:
        return pickle.load(f)
    
users_partition = get_users_partition()

def print_post_count(counts):
    for group_id, count in counts:
        print 'Group %d posted %d tweets' % (group_id, count)

        
tweets_par = (tweets
        .map(lambda (user, text): (
                users_partition.get(user, 7), 
                (user, text)
            )
        )
        .partitionBy(8)
)

# post_count = tweets_par.map(lambda x: (x[0][0], len(x))) # tuple bracket is mandatory 
post_count = tweets_par.map(lambda (grp, (user, text)): (grp, 1)).reduceByKey(add) # tuple bracket is mandatory 
print_post_count(post_count.sortByKey().collect())

Group 0 posted 81 tweets
Group 1 posted 199 tweets
Group 2 posted 45 tweets
Group 3 posted 313 tweets
Group 4 posted 86 tweets
Group 5 posted 221 tweets
Group 6 posted 400 tweets
Group 7 posted 798 tweets


# Lambda param pattern matching 
Python is able to do pattern matching for $\lambda$ parameters

In [4]:
print tweets_par.map(lambda (grp, (user, text)): text).take(1)

[u'RT @thehill: Poll: Kasich overtakes Trump in Ohio https://t.co/hW1nnWS7Kh https://t.co/KFHpRHqIuf']


In [5]:
# %load happyfuntokenizing.py
#!/usr/bin/env python

"""
This code implements a basic, Twitter-aware tokenizer.

A tokenizer is a function that splits a string of text into words. In
Python terms, we map string and unicode objects into lists of unicode
objects.

There is not a single right way to do tokenizing. The best method
depends on the application.  This tokenizer is designed to be flexible
and this easy to adapt to new domains and tasks.  The basic logic is
this:

1. The tuple regex_strings defines a list of regular expression
   strings.

2. The regex_strings strings are put, in order, into a compiled
   regular expression object called word_re.

3. The tokenization is done by word_re.findall(s), where s is the
   user-supplied string, inside the tokenize() method of the class
   Tokenizer.

4. When instantiating Tokenizer objects, there is a single option:
   preserve_case.  By default, it is set to True. If it is set to
   False, then the tokenizer will downcase everything except for
   emoticons.

The __main__ method illustrates by tokenizing a few examples.

I've also included a Tokenizer method tokenize_random_tweet(). If the
twitter library is installed (http://code.google.com/p/python-twitter/)
and Twitter is cooperating, then it should tokenize a random
English-language tweet.


Julaiti Alafate:
  I modified the regex strings to extract URLs in tweets.
"""

__author__ = "Christopher Potts"
__copyright__ = "Copyright 2011, Christopher Potts"
__credits__ = []
__license__ = "Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Unported License: http://creativecommons.org/licenses/by-nc-sa/3.0/"
__version__ = "1.0"
__maintainer__ = "Christopher Potts"
__email__ = "See the author's website"

######################################################################

import re
import htmlentitydefs

######################################################################
# The following strings are components in the regular expression
# that is used for tokenizing. It's important that phone_number
# appears first in the final regex (since it can contain whitespace).
# It also could matter that tags comes after emoticons, due to the
# possibility of having text like
#
#     <:| and some text >:)
#
# Most imporatantly, the final element should always be last, since it
# does a last ditch whitespace-based tokenization of whatever is left.

# This particular element is used in a couple ways, so we define it
# with a name:
emoticon_string = r"""
    (?:
      [<>]?
      [:;=8]                     # eyes
      [\-o\*\']?                 # optional nose
      [\)\]\(\[dDpP/\:\}\{@\|\\] # mouth      
      |
      [\)\]\(\[dDpP/\:\}\{@\|\\] # mouth
      [\-o\*\']?                 # optional nose
      [:;=8]                     # eyes
      [<>]?
    )"""

# The components of the tokenizer:
regex_strings = (
    # Phone numbers:
    r"""
    (?:
      (?:            # (international)
        \+?[01]
        [\-\s.]*
      )?            
      (?:            # (area code)
        [\(]?
        \d{3}
        [\-\s.\)]*
      )?    
      \d{3}          # exchange
      [\-\s.]*   
      \d{4}          # base
    )"""
    ,
    # URLs:
    r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"""
    ,
    # Emoticons:
    emoticon_string
    ,    
    # HTML tags:
     r"""<[^>]+>"""
    ,
    # Twitter username:
    r"""(?:@[\w_]+)"""
    ,
    # Twitter hashtags:
    r"""(?:\#+[\w_]+[\w\'_\-]*[\w_]+)"""
    ,
    # Remaining word types:
    r"""
    (?:[a-z][a-z'\-_]+[a-z])       # Words with apostrophes or dashes.
    |
    (?:[+\-]?\d+[,/.:-]\d+[+\-]?)  # Numbers, including fractions, decimals.
    |
    (?:[\w_]+)                     # Words without apostrophes or dashes.
    |
    (?:\.(?:\s*\.){1,})            # Ellipsis dots. 
    |
    (?:\S)                         # Everything else that isn't whitespace.
    """
    )

######################################################################
# This is the core tokenizing regex:
    
word_re = re.compile(r"""(%s)""" % "|".join(regex_strings), re.VERBOSE | re.I | re.UNICODE)

# The emoticon string gets its own regex so that we can preserve case for them as needed:
emoticon_re = re.compile(regex_strings[1], re.VERBOSE | re.I | re.UNICODE)

# These are for regularizing HTML entities to Unicode:
html_entity_digit_re = re.compile(r"&#\d+;")
html_entity_alpha_re = re.compile(r"&\w+;")
amp = "&amp;"

######################################################################

class Tokenizer:
    def __init__(self, preserve_case=False):
        self.preserve_case = preserve_case

    def tokenize(self, s):
        """
        Argument: s -- any string or unicode object
        Value: a tokenize list of strings; conatenating this list returns the original string if preserve_case=False
        """        
        # Try to ensure unicode:
        try:
            s = unicode(s)
        except UnicodeDecodeError:
            s = str(s).encode('string_escape')
            s = unicode(s)
        # Fix HTML character entitites:
        s = self.__html2unicode(s)
        # Tokenize:
        words = word_re.findall(s)
        # Possible alter the case, but avoid changing emoticons like :D into :d:
        if not self.preserve_case:            
            words = map((lambda x : x if emoticon_re.search(x) else x.lower()), words)
        return words

    def tokenize_random_tweet(self):
        """
        If the twitter library is installed and a twitter connection
        can be established, then tokenize a random tweet.
        """
        try:
            import twitter
        except ImportError:
            print "Apologies. The random tweet functionality requires the Python twitter library: http://code.google.com/p/python-twitter/"
        from random import shuffle
        api = twitter.Api()
        tweets = api.GetPublicTimeline()
        if tweets:
            for tweet in tweets:
                if tweet.user.lang == 'en':            
                    return self.tokenize(tweet.text)
        else:
            raise Exception("Apologies. I couldn't get Twitter to give me a public English-language tweet. Perhaps try again")

    def __html2unicode(self, s):
        """
        Internal metod that seeks to replace all the HTML entities in
        s with their corresponding unicode characters.
        """
        # First the digits:
        ents = set(html_entity_digit_re.findall(s))
        if len(ents) > 0:
            for ent in ents:
                entnum = ent[2:-1]
                try:
                    entnum = int(entnum)
                    s = s.replace(ent, unichr(entnum))	
                except:
                    pass
        # Now the alpha versions:
        ents = set(html_entity_alpha_re.findall(s))
        ents = filter((lambda x : x != amp), ents)
        for ent in ents:
            entname = ent[1:-1]
            try:            
                s = s.replace(ent, unichr(htmlentitydefs.name2codepoint[entname]))
            except:
                pass                    
            s = s.replace(amp, " and ")
        return s

In [6]:
from math import log


tok = Tokenizer(preserve_case=False)

def get_rel_popularity(c_k, c_all):
    return log(1.0 * c_k / c_all) / log(2)


def print_tokens(tokens, gid = None):
    group_name = "overall"
    if gid is not None:
        group_name = "group %d" % gid
    print '=' * 5 + ' ' + group_name + ' ' + '=' * 5
    for t, n in tokens:
        print "%s\t%.4f" % (enc(t), n)
    print

In [7]:
# Token Vocabulary
V = tweets.flatMap(lambda (user, text): tok.tokenize(enc(text))).distinct() 
print_count(V)

Number of elements: 8979


In [8]:
# Token meet the cut-off Ï„
tau_users = 100

## User -> Token
user_tokens = (tweets
        .flatMap(lambda (user, text): ((user, tk) 
                            for tk in tok.tokenize(text.encode('utf-8'))))
        .groupByKey()
        .flatMapValues(lambda x: (set(x)))
        .cache()
)

# popular or frequent 
tokens_pop = (user_tokens
        .map(lambda (user, tk): (tk, 1))  # tk -> unit count 
        .reduceByKey(lambda a, b: a + b).filter(lambda x: x[1] >= tau_users)  # lambda a, b: a+b instead of sum
)  

# %time tokens_pop.collect()
print_count(tokens_pop)

tokens_top = (tokens_pop
        .map(lambda (user, tk): (tk, user))
        .sortByKey(False)
        .map(lambda (tk, user): (user, tk))
)

print_tokens(tokens_top.take(20))

Number of elements: 52
===== overall =====
:	1386.0000
rt	1237.0000
.	865.0000
\	745.0000
the	621.0000
trump	595.0000
x80	545.0000
xe2	543.0000
to	499.0000
,	489.0000
xa6	457.0000
a	403.0000
is	376.0000
in	296.0000
'	294.0000
of	292.0000
and	287.0000
for	280.0000
!	269.0000
?	210.0000



## Broadcasting
Without broadcasting: "Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. "
 
With broadcasting: "Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks."

In [18]:
tokens_pop_map = tokens_pop.collectAsMap()  # need to be materialized
sc.broadcast(tokens_pop_map) 

group_tokens = (user_tokens.filter(lambda (user, tk): tk in tokens_pop_map)
        .map(lambda (user, tk): (users_partition.get(user, 7), tk))
        .map(lambda x: (x, 1))  # (group, tk) -> unit count 
        .reduceByKey(add)
        .map(lambda ((grp, tk), cnt): (grp, (tk, cnt)))  # gorup -> (tk, count)
        .partitionBy(8)
        .glom()
        .collect()  # must collect to enable index access 
)

for gid in xrange(8):
    token_cnt = sc.parallelize(group_tokens[gid]).map(lambda (grp, (tk, cnt)): (tk, cnt))  # exclude gid 
    # print token_cnt.take(2)  # debug 
    token_score = (token_cnt
            .map(lambda (tk, cnt): (get_rel_popularity(cnt, tokens_pop_map[tk]), tk))
            .sortBy(lambda (score, tk): -score)
            .map(lambda (score, tk): (tk, score))
    )
                  
                                                                    
    print_tokens(token_score.take(10), gid)

===== group 0 =====
...	-3.5648
at	-3.5983
hillary	-4.0484
bernie	-4.1430
not	-4.2479
he	-4.2574
i	-4.2854
s	-4.3309
are	-4.3646
in	-4.4021

===== group 1 =====
#demdebate	-2.4391
-	-2.6202
clinton	-2.7174
amp	-2.7472
&	-2.7472
;	-2.7980
sanders	-2.8745
?	-2.9069
in	-2.9615
if	-2.9861

===== group 2 =====
are	-4.6865
and	-4.7055
bernie	-4.7279
at	-4.7682
sanders	-4.9449
in	-5.0395
donald	-5.0531
a	-5.0697
#demdebate	-5.1396
that	-5.1599

===== group 3 =====
#demdebate	-1.3847
bernie	-1.8535
sanders	-2.1793
of	-2.2356
t	-2.2675
clinton	-2.4179
hillary	-2.4203
the	-2.4330
xa6	-2.4962
that	-2.5160

===== group 4 =====
hillary	-3.8074
sanders	-3.9449
of	-4.0199
what	-4.0875
clinton	-4.0959
at	-4.1832
in	-4.2095
a	-4.2623
on	-4.2854
'	-4.2928

===== group 5 =====
cruz	-2.3344
he	-2.6724
will	-2.7705
are	-2.7796
the	-2.8522
is	-2.8822
that	-2.9119
this	-2.9542
for	-2.9594
of	-2.9804

===== group 6 =====
@realdonaldtrump	-1.1520
cruz	-1.4657
n	-1.4877
!	-1.5479
not	-1.8904
xa6	-1.9172
xe2	-1.

# Without maintianing the global count for each tk

* change key
* Local list aggregation per token
* flatMap to swap key after local aggregation. 

In [21]:
tokens_pop_map = tokens_pop.collectAsMap()  # need to be materialized
sc.broadcast(tokens_pop_map) 

group_tokens = (user_tokens.filter(lambda (user, tk): tk in tokens_pop_map)
        .map(lambda (user, tk): (users_partition.get(user, 7), tk))
        .map(lambda x: (x, 1))  # (group, tk) -> unit count 
        .reduceByKey(add)
        # tk as key
        .map(lambda ((grp, tk), cnt): (tk, (grp, cnt)))  
        # calculate c_all
        .groupByKey()
        .mapValues(lambda itr: (list(itr), sum(cnt for grp, cnt in itr)))
        # grp as key
        .flatMap(lambda (tk, (grp_lst, c_all)): [(grp, (tk, get_rel_popularity(cnt, c_all)))
                 for grp, cnt in grp_lst]
                )
)

group_tokens.take(1)

[(2, (u'and', -4.705475308038391))]