# Twitter Analysis

## Introduction

In this homework, we are going to play with Twitter data.

The data is represented as rows of of [JSON](https://en.wikipedia.org/wiki/JSON#Example) strings.
It consists of tweets, messages, and a small amount of broken data (cannot be parsed as JSON).

For this homework, we will only focus on tweets and ignore all other messages.

**Search for `Tasks` to find out what you need to implement. You should only replace**

```
### 
### YOUR CODE HERE
###
```

**with your code.**

## Data overview 

### Tweets

A tweet consists of many data fields. You can learn all about them in the Twitter API doc. We are going to briefly introduce only the data fields that will be used in this homework.

* `created_at`: Posted time of this tweet (time zone is included)
* `id_str`: Tweet ID - we recommend using `id_str` over using `id` as Tweet IDs, becauase `id` is an integer and may bring some overflow problems.
* `text`: Tweet content
* `user`: A JSON object for information about the author of the tweet
    * `id_str`: User ID
    * `name`: User name (may contain spaces)
    * `screen_name`: User screen name (no spaces)
* `retweeted_status`: A JSON object for information about the retweeted tweet (i.e. this tweet is not original but retweeteed some other tweet)
    * All data fields of a tweet except `retweeted_status`
* `entities`: A JSON object for all entities in this tweet
    * `hashtags`: An array for all the hashtags that are mentioned in this tweet
    * `urls`: An array for all the URLs that are mentioned in this tweet


### Users partition

Besides the original tweets, we will provide you with a Pickle file, which contains a partition over 452,743 Twitter users. It contains a Python dictionary `{user_id: partition_id}`. The users are partitioned into 7 groups.

## Grading

### Scale on datasets with different sizes

Your implementation for this assignment will be graded on both correctness and runtimes. You will have a 0 score if your results do not match the expected values for each data set. If your results are correct, your score is dependent on the runtime.

On Vocareum, we provide three different input sizes to test your program: 10 MB, 1 GB, and 10 GB. For any run, we will only be using one of these 3 datasets. **You can switch between different parts on Vocareum to test on different data files.** When you submit, you will be graded only for the part you have selected.

Although there are 3 different parts, **we will only provide the standard answers for the 10 MB part**, which are given as

> ---
> **It should print**

in the write-ups below. If you follow the instructions below to develop your solution for the 10 MB part, it should also produce the correct results for other parts. 

**Don't change any print funcitons that we provided in the starter code.** In the grading process, the correctness of your solution is checked by comparing the content of the variables, NOT the printed outputs. 

### Running locally and on clusters

The Spark application will run locally with limited resources when you develop your program in the Jupyter notebook. However, when you submit your solution for grading, your implementation will be first converted to a python script and then submitted to a Spark cluster with much more resources for correctness and runtime evaluations.
 
Due to resource constraints, you cannot use the Spark cluster while developing your solution. 

In [1]:
# `ON_EMR` indicates whether your program is running locally or on the clusters.
# We assume it will run locally if this script is in the interactive mode (jupyter notebook) 
# and run on clusters if it is invoked by a shell command.
# DO NOT CHANGE THE VALUE OF `ON_EMR` in your program.

import sys

ON_EMR = not hasattr(sys, 'ps1')
print('Your program will run {}'.format(['locally', 'on clusters'][ON_EMR]))

Your program will run locally


### OutputLogger

`OutputLogger` object `my_output` is defined to store the results of your program. We have provided function calls to `my_output.append()` method for storing the results in all necessary places. In the last cell of this file, we write the content of `my_output` to a pickle file which the grader will read in and use for grading.

**We have already appended all necessary variables to `my_output` in the starter code. Thus, don't change anything related to `my_output` or add more results to `my_output`. **

In [2]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

import pickle

class OutputLogger:
    def __init__(self):
        self.ans = {}

    def append(self, key, value):
        self.ans[key] = value

    def write_to_disk(self):
        filepath = os.path.expanduser("answer.pickle")
        with open(filepath, 'wb') as f:
            pickle.dump(self.ans, f)
        
        # If ON_EMR, copy answer.pickle from the driver to HDFS
        if ON_EMR:
            proc = subprocess.Popen(["/usr/local/hadoop/hadoop-2.7.4/bin/hadoop", "fs", "-copyFromLocal", filepath, "/user/spark/answer.pickle"])
            proc.wait()
            os.remove(filepath)

my_output = OutputLogger()

### Measuring runtime

This is a useful cell for debugging. You can use `save_time(key)` at different parts of your code for checking the amount of time a segment takes. We have added it to the different parts of the starter code. You are free to use it where appropriate.

In [3]:
from time import time
import datetime

timer = {}
def save_time(key):
    '''
    Calling save_time with 'key' the first time will record the current time for 'key'.
    Calling save_time with 'key' the second time will record the time (hours:minutes:seconds) 
    it takes from the first calling to the second calling. All results will be saved
    in timer dict.
    Calling save_time with 'key' the third time will overwrite existing data.
    
    Args:
        key: an identifier for this time.
    '''
    
    if key in timer: 
        if type(timer[key]) == str:
            timer[key] = time()
        else:
            timer[key] = str(datetime.timedelta(seconds=round(time() - timer[key])))
    else:
        timer[key] = time()
                
save_time('total time')

## Part 0: Load data to an RDD

In [4]:
save_time("set up sc")

from pyspark import SparkContext

sc = SparkContext()

save_time("set up sc")

### Read text file

For local testing, the data files are in a public directory where you can read the file directly. The path to the data file is hard-coded in the `file_path` variable as part of the starter code.

For submission, we will create this file on our server for testing with the appropriate file path. You should not worry about which file system (i.e. local file system or HDFS) Spark will read data from since the `file_path` variable we provide below will always contain the correct path to the data file.

**Tasks:**

1. Make RDD from the data in the file given by `file_path`.
2. Mark the RDD to be cached (so in the next operation data will be loaded in memory)
3. Count the number of elements in the RDD and store the result in `num_tweets`.

**Hint:** use [`sc.textFile()`](https://spark.apache.org/docs/2.3.0/api/python/pyspark.html#pyspark.SparkContext.textFile) to read the text file into an RDD.

***

**It should print**
```
Number of elements: 2150
```

In [5]:
save_time("read data")

if ON_EMR:
    file_path = '/user/spark/twitter/twitter.txt'
else:
    file_path = './resource/asnlib/publicdata/twitter.txt'

###
### YOUR CODE HERE
###
rdd = sc.textFile(file_path)
rdd = rdd.cache()
num_tweets = rdd.count()

my_output.append("num-tweets", num_tweets)
print('Number of elements:', num_tweets)
save_time("read data")

Number of elements: 225000


In [6]:
# Test: visualize an element:
rdd.sample(False, 1/1250).collect()

['{"created_at":"Wed Feb 10 20:55:41 +0000 2016","id":697524359196348417,"id_str":"697524359196348417","text":"RT @WSHHFANS: My pick for President in 2016 https:\\/\\/t.co\\/OSLVjWAc1j","source":"\\u003ca href=\\"http:\\/\\/twitter.com\\/download\\/iphone\\" rel=\\"nofollow\\"\\u003eTwitter for iPhone\\u003c\\/a\\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":623048154,"id_str":"623048154","name":"pagani \\u26a1\\ufe0f","screen_name":"kevinjoel71","location":"Puerto Rico","url":null,"description":"#FreeOscarLopez","protected":false,"verified":false,"followers_count":447,"friends_count":202,"listed_count":2,"favourites_count":3295,"statuses_count":28933,"created_at":"Sat Jun 30 16:59:52 +0000 2012","utc_offset":null,"time_zone":null,"geo_enabled":false,"lang":"es","contributors_enabled":false,"is_translator":false,"profile_background_color":"13151

## Part 1: Parse JSON strings to JSON objects

Python has built-in support for JSON.

In [7]:
import json

json_example = '''
{
    "id": 1,
    "name": "A green door",
    "price": 12.50,
    "tags": ["home", "green"]
}
'''

json_obj = json.loads(json_example)
json_obj

{'id': 1, 'name': 'A green door', 'price': 12.5, 'tags': ['home', 'green']}

### Broken tweets and irrelevant messages

The data of this assignment may contain broken tweets (invalid JSON strings). So make sure that your code is robust for such cases.

You can filter out such broken tweet by checking if:
* the line is not in json format

In addition, some lines in the input file might not be tweets, but messages that the Twitter server sent to the developer (such as limit notices). Your program should also ignore these messages.

These messages would not contain the `created_at` field and can be filtered out accordingly.
* Check if json object of the broken tweet has a `created_at` field

**Hint:** [Catch the ValueError](http://stackoverflow.com/questions/11294535/verify-if-a-string-is-json-in-python)

**Tasks:**

1. Parse raw JSON tweets stored in the RDD you created above to obtain **valid** JSON objects. 
1. From all valid tweets, construct a pair RDD of `(user_id, text)`, where `user_id` is the `id_str` data field of the `user` dictionary (read [Tweets](#Tweets) section above), `text` is the `text` data field.

In [8]:
import json

def safe_parse(raw_json):
    """
    Input is a String
    Output is a JSON object if the tweet is valid and None if not valid
    """

    ###
    ### YOUR CODE HERE
    ###
    
    # Test if a raw_json is valid json. Returns None if not valid  
    try:
        json_object = json.loads(raw_json)
    except ValueError as e:
        #print('Invalid input string.')
        return None
    
    # Test if "created_at" key is in json_object. Returns None if not.
    if "created_at" in json_object:
        return json_object
    else:
        #print('"created_at" key not found.')
        return None


In [9]:
# Tests
'''safe_parse('{}')              # prints True
safe_parse('{asdf}')          # prints False
safe_parse('{"age":100}')     # prints True
safe_parse('{age:100 }')      # prints False
safe_parse('{"age":100 }')    # prints True

test_string = '{"created_at":"Tue Feb 23 17:42:36 +0000 2016","id":702186809468526592,"id_str":"702186809468526592","text":"RT @C0nservativeGal: Well , I\'m Conservative Southern Bapist and I\'m 4 #Trump-Southern Baptist leader: Donald Trump is not our friend https\\u2026","source":"\\u003ca href=\\"http:\\/\\/www.twitter.com\\" rel=\\"nofollow\\"\\u003eTwitter for Windows\\u003c\\/a\\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":229594092,"id_str":"229594092","name":"paul wagner","screen_name":"bearbear9876","location":null,"url":null,"description":null,"protected":false,"verified":false,"followers_count":47,"friends_count":24,"listed_count":6,"favourites_count":1846,"statuses_count":2849,"created_at":"Wed Dec 22 20:25:15 +0000 2010","utc_offset":null,"time_zone":null,"geo_enabled":false,"lang":"en","contributors_enabled":false,"is_translator":false,"profile_background_color":"C0DEED","profile_background_image_url":"http:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png","profile_background_image_url_https":"https:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png","profile_background_tile":false,"profile_link_color":"0084B4","profile_sidebar_border_color":"C0DEED","profile_sidebar_fill_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"profile_image_url":"http:\\/\\/abs.twimg.com\\/sticky\\/default_profile_images\\/default_profile_0_normal.png","profile_image_url_https":"https:\\/\\/abs.twimg.com\\/sticky\\/default_profile_images\\/default_profile_0_normal.png","default_profile":true,"default_profile_image":true,"following":null,"follow_request_sent":null,"notifications":null},"geo":null,"coordinates":null,"place":null,"contributors":null,"retweeted_status":{"created_at":"Tue Feb 23 17:14:11 +0000 2016","id":702179658717077504,"id_str":"702179658717077504","text":"Well , I\'m Conservative Southern Bapist and I\'m 4 #Trump-Southern Baptist leader: Donald Trump is not our friend https:\\/\\/t.co\\/nYUt4qn6S9","source":"\\u003ca href=\\"http:\\/\\/mobile.twitter.com\\" rel=\\"nofollow\\"\\u003eMobile Web\\u003c\\/a\\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":185906955,"id_str":"185906955","name":"Johanna","screen_name":"C0nservativeGal","location":"USA","url":"http:\\/\\/uspresidentialelectionnews.com","description":"I\'m a kick ass conservachick whose mission is to secure freedom, eliminate liberalism and look good doing it! No DMs #Christian #WhoDatNation #Trump2016","protected":false,"verified":false,"followers_count":34822,"friends_count":28947,"listed_count":306,"favourites_count":19910,"statuses_count":31801,"created_at":"Thu Sep 02 03:13:57 +0000 2010","utc_offset":-18000,"time_zone":"Eastern Time (US & Canada)","geo_enabled":false,"lang":"en","contributors_enabled":false,"is_translator":false,"profile_background_color":"C0DEED","profile_background_image_url":"http:\\/\\/pbs.twimg.com\\/profile_background_images\\/687403539\\/729718c7391e53acaf7d6ac43f3c959e.jpeg","profile_background_image_url_https":"https:\\/\\/pbs.twimg.com\\/profile_background_images\\/687403539\\/729718c7391e53acaf7d6ac43f3c959e.jpeg","profile_background_tile":true,"profile_link_color":"E854E8","profile_sidebar_border_color":"000000","profile_sidebar_fill_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"profile_image_url":"http:\\/\\/pbs.twimg.com\\/profile_images\\/688571288563003392\\/HXtUWGWa_normal.png","profile_image_url_https":"https:\\/\\/pbs.twimg.com\\/profile_images\\/688571288563003392\\/HXtUWGWa_normal.png","default_profile":false,"default_profile_image":false,"following":null,"follow_request_sent":null,"notifications":null},"geo":null,"coordinates":null,"place":null,"contributors":null,"is_quote_status":false,"retweet_count":32,"favorite_count":43,"entities":{"hashtags":[{"text":"Trump","indices":[50,56]}],"urls":[{"url":"https:\\/\\/t.co\\/nYUt4qn6S9","expanded_url":"http:\\/\\/www.al.com\\/news\\/index.ssf\\/2015\\/09\\/southern_baptist_leader_donald.html","display_url":"al.com\\/news\\/index.ssf\\u2026","indices":[113,136]}],"user_mentions":[],"symbols":[]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"low","lang":"en"},"is_quote_status":false,"retweet_count":0,"favorite_count":0,"entities":{"hashtags":[{"text":"Trump","indices":[71,77]}],"urls":[{"url":"https:\\/\\/t.co\\/nYUt4qn6S9","expanded_url":"http:\\/\\/www.al.com\\/news\\/index.ssf\\/2015\\/09\\/southern_baptist_leader_donald.html","display_url":"al.com\\/news\\/index.ssf\\u2026","indices":[139,140]}],"user_mentions":[{"screen_name":"C0nservativeGal","name":"Johanna","id":185906955,"id_str":"185906955","indices":[3,19]}],"symbols":[]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"low","lang":"en","timestamp_ms":"1456249356034"}'
test_json_object = safe_parse(test_string)
#print(test_json_object)
if test_json_object is None:
    print(test_json_object)
else:
    for k, v in test_json_object.items():
        print(k, ' => ', v)'''

'safe_parse(\'{}\')              # prints True\nsafe_parse(\'{asdf}\')          # prints False\nsafe_parse(\'{"age":100}\')     # prints True\nsafe_parse(\'{age:100 }\')      # prints False\nsafe_parse(\'{"age":100 }\')    # prints True\n\ntest_string = \'{"created_at":"Tue Feb 23 17:42:36 +0000 2016","id":702186809468526592,"id_str":"702186809468526592","text":"RT @C0nservativeGal: Well , I\'m Conservative Southern Bapist and I\'m 4 #Trump-Southern Baptist leader: Donald Trump is not our friend https\\u2026","source":"\\u003ca href=\\"http:\\/\\/www.twitter.com\\" rel=\\"nofollow\\"\\u003eTwitter for Windows\\u003c\\/a\\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":229594092,"id_str":"229594092","name":"paul wagner","screen_name":"bearbear9876","location":null,"url":null,"description":null,"protected":false,"verified":false,"followers_count":47

In [10]:
"""
# Remember to construct an RDD of (user_id, text) here.
"""

###
### YOUR CODE HERE
###
rdd_valid_tweets = rdd.map(safe_parse).filter(lambda x: x!=None).map(lambda x: (x['user']['id_str'], x['text']))

In [11]:
# Tests
'''n = rdd_valid_tweets.count()
print('Number of elements: ', n)

m = 5
rdd_sample = rdd_valid_tweets.sample(False, m/n).collect()
print(f'A sample of {len(rdd_sample)} elements:\n')
for item in rdd_sample:
    print(item)'''

"n = rdd_valid_tweets.count()\nprint('Number of elements: ', n)\n\nm = 5\nrdd_sample = rdd_valid_tweets.sample(False, m/n).collect()\nprint(f'A sample of {len(rdd_sample)} elements:\n')\nfor item in rdd_sample:\n    print(item)"

### Number of unique users

**Tasks:**

1. Count the number of different users in all valid tweets and store the result in `num_unique_users`.

**Hint:** use the [`distinct()`](https://spark.apache.org/docs/2.3.0/api/python/pyspark.html#pyspark.RDD.distinct) method.

***

**It should print**
```
The number of unique users is: 1748
```

In [12]:
save_time("count unique users")

###
### YOUR CODE HERE
###
num_unique_users = rdd_valid_tweets.map(lambda x: x[0]).distinct().count()

my_output.append("num-unique-users", num_unique_users)
print('The number of unique users is:', num_unique_users)
save_time("count unique users")

The number of unique users is: 110802


## Part 2: Number of posts from each user partition

### Load the pickle file

Load the Pickle file `users-partition.pickle`, you will get a dictionary which represents a partition over 452,743 Twitter users, `{user_id: partition_id}`. The users are partitioned into 7 groups. For example, if the dictionary is loaded into a variable named `partition`, the partition ID of the user `59458445` is `partition["59458445"]`. These users are partitioned into 7 groups. The partition ID is an integer between 0-6.

Note that the user partition we provide doesn't cover all users appear in the input data.

In [13]:
import subprocess
import pickle

if ON_EMR:
    command = ["/usr/local/hadoop/hadoop-2.7.4/bin/hadoop", "fs", "-cat", "/user/spark/twitter/users-partition.pickle"]
else:
    command = ["cat", "./resource/asnlib/publicdata/users-partition.pickle"]
    
proc = subprocess.Popen(command, stdout=subprocess.PIPE)
pickle_content = proc.communicate()[0]
partition = pickle.loads(pickle_content)
len(partition)

452743

### Tweets per user partition

**Tasks:**

1. Count the number of posts from group 0, 1, ..., 6, plus the number of posts from users who are not in any partition. Assign users who are not in any partition to the group 7.
1. Put the results of this step into a pair RDD `(group_id, count)` that is sorted by key.
1. Collect the RDD to get `counts_per_part` list.

***

**It should print**

```
Group 0 posted 87 tweets
Group 1 posted 242 tweets
Group 2 posted 41 tweets
Group 3 posted 349 tweets
Group 4 posted 101 tweets
Group 5 posted 358 tweets
Group 6 posted 434 tweets
Group 7 posted 521 tweets
```

In [14]:
def print_post_count(counts):
    for group_id, count in counts:
        print('Group %d posted %d tweets' % (group_id, count))

In [15]:
save_time("count tweets per user partition")

###
### YOUR CODE HERE
###

# Convert partition dict to (key, val) RDD 
rdd_partition = sc.parallelize(partition.items())

# Left Outer Join rdd_valid_tweets with rdd_partition based on user_id. If user_id not found in rdd_partition assign 7 as partition
rdd_lojoin_by_user_id = rdd_valid_tweets.leftOuterJoin(rdd_partition)
rdd_by_partition = rdd_lojoin_by_user_id.map(lambda x: (x[1][1] if x[1][1] in range(7) else 7, [x[0], x[1][0]]))
rdd_counts_per_part = sc.parallelize(rdd_by_partition.countByKey().items()).sortByKey()
counts_per_part = rdd_counts_per_part.collect()

# Following code adds your solution to `my_output`
assert(type(counts_per_part) is list and
       len(counts_per_part) == 8 and
       len(counts_per_part[0]) == 2)
print_post_count(counts_per_part)
my_output.append("counts_per_part", counts_per_part)
save_time("count tweets per user partition")

Group 0 posted 9631 tweets
Group 1 posted 27661 tweets
Group 2 posted 5037 tweets
Group 3 posted 29918 tweets
Group 4 posted 14182 tweets
Group 5 posted 25174 tweets
Group 6 posted 28891 tweets
Group 7 posted 81077 tweets


## Part 3:  Tokens that are relatively popular in each user partition

In this step, we are going to find tokens that are relatively popular in each user partition.

We define the number of mentions of a token $t$ in a specific user partition $k$ as the number of users from the user partition $k$ that ever mentioned the token $t$ in their tweets. Note that even if some users might mention a token $t$ multiple times or in multiple tweets, a user will contribute at most 1 to the counter of the token $t$.

Please make sure that the number of mentions of a token is equal to the number of users who mentioned this token but NOT the number of tweets that mentioned this token.

Let $N_t^k$ be the number of mentions of the token $t$ in the user partition $k$. Let $N_t^{all} = \sum_{i=0}^7 N_t^{i}$ be the number of total mentions of the token $t$.

We define the relative popularity of a token $t$ in a user partition $k$ as the log ratio between $N_t^k$ and $N_t^{all}$, i.e. 

\begin{equation}
p_t^k = \log \frac{N_t^k}{N_t^{all}}.
\end{equation}


You can compute the relative popularity by calling the function `get_rel_popularity`.

We load a tweet tokenizer for you in the following cells. This Tokenizer object is called `tok`. Don't forget to execute the two cells below.

You can expand the following cell if needed to see the minutae of the Tokenizer.

In [16]:
#!/usr/bin/env python

"""
This code implements a basic, Twitter-aware tokenizer.

A tokenizer is a function that splits a string of text into words. In
Python terms, we map string and unicode objects into lists of unicode
objects.

There is not a single right way to do tokenizing. The best method
depends on the application.  This tokenizer is designed to be flexible
and this easy to adapt to new domains and tasks.  The basic logic is
this:

1. The tuple regex_strings defines a list of regular expression
   strings.

2. The regex_strings strings are put, in order, into a compiled
   regular expression object called word_re.

3. The tokenization is done by word_re.findall(s), where s is the
   user-supplied string, inside the tokenize() method of the class
   Tokenizer.

4. When instantiating Tokenizer objects, there is a single option:
   preserve_case.  By default, it is set to True. If it is set to
   False, then the tokenizer will downcase everything except for
   emoticons.

The __main__ method illustrates by tokenizing a few examples.

I've also included a Tokenizer method tokenize_random_tweet(). If the
twitter library is installed (http://code.google.com/p/python-twitter/)
and Twitter is cooperating, then it should tokenize a random
English-language tweet.


Julaiti Alafate:
  I modified the regex strings to extract URLs in tweets.
"""

__author__ = "Christopher Potts"
__copyright__ = "Copyright 2011, Christopher Potts"
__credits__ = []
__license__ = "Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Unported License: http://creativecommons.org/licenses/by-nc-sa/3.0/"
__version__ = "1.0"
__maintainer__ = "Christopher Potts"
__email__ = "See the author's website"

######################################################################

import re
from html import entities 

######################################################################
# The following strings are components in the regular expression
# that is used for tokenizing. It's important that phone_number
# appears first in the final regex (since it can contain whitespace).
# It also could matter that tags comes after emoticons, due to the
# possibility of having text like
#
#     <:| and some text >:)
#
# Most imporatantly, the final element should always be last, since it
# does a last ditch whitespace-based tokenization of whatever is left.

# This particular element is used in a couple ways, so we define it
# with a name:
emoticon_string = r"""
    (?:
      [<>]?
      [:;=8]                     # eyes
      [\-o\*\']?                 # optional nose
      [\)\]\(\[dDpP/\:\}\{@\|\\] # mouth      
      |
      [\)\]\(\[dDpP/\:\}\{@\|\\] # mouth
      [\-o\*\']?                 # optional nose
      [:;=8]                     # eyes
      [<>]?
    )"""

# The components of the tokenizer:
regex_strings = (
    # Phone numbers:
    r"""
    (?:
      (?:            # (international)
        \+?[01]
        [\-\s.]*
      )?            
      (?:            # (area code)
        [\(]?
        \d{3}
        [\-\s.\)]*
      )?    
      \d{3}          # exchange
      [\-\s.]*   
      \d{4}          # base
    )"""
    ,
    # URLs:
    r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"""
    ,
    # Emoticons:
    emoticon_string
    ,    
    # HTML tags:
     r"""<[^>]+>"""
    ,
    # Twitter username:
    r"""(?:@[\w_]+)"""
    ,
    # Twitter hashtags:
    r"""(?:\#+[\w_]+[\w\'_\-]*[\w_]+)"""
    ,
    # Remaining word types:
    r"""
    (?:[a-z][a-z'\-_]+[a-z])       # Words with apostrophes or dashes.
    |
    (?:[+\-]?\d+[,/.:-]\d+[+\-]?)  # Numbers, including fractions, decimals.
    |
    (?:[\w_]+)                     # Words without apostrophes or dashes.
    |
    (?:\.(?:\s*\.){1,})            # Ellipsis dots. 
    |
    (?:\S)                         # Everything else that isn't whitespace.
    """
    )

######################################################################
# This is the core tokenizing regex:
    
word_re = re.compile(r"""(%s)""" % "|".join(regex_strings), re.VERBOSE | re.I | re.UNICODE)

# The emoticon string gets its own regex so that we can preserve case for them as needed:
emoticon_re = re.compile(regex_strings[1], re.VERBOSE | re.I | re.UNICODE)

# These are for regularizing HTML entities to Unicode:
html_entity_digit_re = re.compile(r"&#\d+;")
html_entity_alpha_re = re.compile(r"&\w+;")
amp = "&amp;"

######################################################################

class Tokenizer:
    def __init__(self, preserve_case=False):
        self.preserve_case = preserve_case

    def tokenize(self, s):
        """
        Argument: s -- any string or unicode object
        Value: a tokenize list of strings; conatenating this list returns the original string if preserve_case=False
        """        
        # Try to ensure unicode:
        try:
            s = str(s)
        except UnicodeDecodeError:
            s = s.encode('string_escape')
            s = str(s)
        # Fix HTML character entitites:
        s = self.__html2unicode(s)
        # Tokenize:
        words = word_re.findall(s)
        # Possible alter the case, but avoid changing emoticons like :D into :d:
        if not self.preserve_case:            
            words = list(map((lambda x : x if emoticon_re.search(x) else x.lower()), words))
        return words

    def tokenize_random_tweet(self):
        """
        If the twitter library is installed and a twitter connection
        can be established, then tokenize a random tweet.
        """
        try:
            import twitter
        except ImportError:
            print("Apologies. The random tweet functionality requires the Python twitter library: http://code.google.com/p/python-twitter/")
        from random import shuffle
        api = twitter.Api()
        tweets = api.GetPublicTimeline()
        if tweets:
            for tweet in tweets:
                if tweet.user.lang == 'en':            
                    return self.tokenize(tweet.text)
        else:
            raise Exception("Apologies. I couldn't get Twitter to give me a public English-language tweet. Perhaps try again")

    def __html2unicode(self, s):
        """
        Internal metod that seeks to replace all the HTML entities in
        s with their corresponding unicode characters.
        """
        # First the digits:
        ents = set(html_entity_digit_re.findall(s))
        if len(ents) > 0:
            for ent in ents:
                entnum = ent[2:-1]
                try:
                    entnum = int(entnum)
                    s = s.replace(ent, unichr(entnum))	
                except:
                    pass
        # Now the alpha versions:
        ents = set(html_entity_alpha_re.findall(s))
        ents = filter((lambda x : x != amp), ents)
        for ent in ents:
            entname = ent[1:-1]
            try:            
                s = s.replace(ent, unichr(entities.name2codepoint[entname]))
            except:
                pass                    
            s = s.replace(amp, " and ")
        return s


In [17]:
from math import log

tok = Tokenizer(preserve_case=False)

def get_rel_popularity(c_k, c_all):
    '''
    Compute the relative popularity of a token.
    
    Args:
        c_k: the number of mentions in the user partition k.
        c_all: the number of all mentions.
        
    Return:
        The relative popularity of the token. It should be a negative number due to the log function. 
    '''
    
    return log(1.0 * c_k / c_all) / log(2)

def print_tokens(tokens, gid=None):
    group_name = "overall"
    if gid is not None:
        group_name = "group %d" % gid
        
    print('=' * 5 + ' ' + group_name + ' ' + '=' * 5)
    for t, n in tokens:
        print("%s\t%.4f" % (t, n))
    print

### Tokenize tweets

**Tasks:**

1. Tokenize the tweets using the `tokenize` function that is a method of the `Tokenizer` class that we have instantiated as `tok`. 
1. Count the number of mentions for each tokens regardless of specific user group and store them in a RDD, which will be used later.
1. Get `num_of_tokens`, which is how many different tokens we have.

---

**It should print**
```
Number of tokens: 7677
```

In [18]:
save_time("count all unique tokens")

###
### YOUR CODE HERE
###

# Tokenize tweets
rdd_tok = rdd_by_partition.flatMap(lambda x: tok.tokenize(x[1][1]))

# Count tokens
rdd_tok_val = rdd_tok.map(lambda tok: (tok, 1))
rdd_counts_tok = rdd_tok_val.reduceByKey(lambda a, b: a + b)

# Get num_of_tokens
num_of_tokens = rdd_counts_tok.distinct().count()

my_output.append("num-tokens", num_of_tokens)
print("Number of tokens:", num_of_tokens)
save_time("count all unique tokens")

Number of tokens: 176481


### Token popularity

Tokens that are mentioned by too few users are usually not very interesting. So we want to only keep tokens that are mentioned by at least 100 users. Filter out tokens that don't meet this requirement.

**Tasks:**

1. Compute the two varaibles below:
    1. `num_freq_tokens`: an int of how many different tokens we have after the filtering. 
    1. `top20`: a list that contains the top 20 most frequent tokens after the filtering.

---

**It should print**
```
Number of tokens: 46
===== overall =====
:	1046.0000
rt	920.0000
.	767.0000
the	587.0000
trump	560.0000
…	520.0000
to	501.0000
,	497.0000
in	385.0000
a	383.0000
is	382.0000
of	300.0000
!	285.0000
for	275.0000
and	263.0000
on	218.0000
i	216.0000
he	191.0000
that	190.0000
"	181.0000
```

In [19]:
save_time("count overall most popular tokens")

###
### YOUR CODE HERE
###

# Create RDD mapping each token to each user_id. Eliminate duplicates i.e. keep unique pairs (token, user_id).
rdd_tok_by_users = rdd_by_partition.flatMap(lambda x: [(t, x[1][0]) for t in tok.tokenize(x[1][1])]).distinct()

# Create RDD counting the number of unique user_id which mention each token.
rdd_tok_by_user_count = rdd_tok_by_users.groupByKey().map(lambda x: (x[0], len(list(x[1]))))

# Filter tokens mentioned at least by 100 users. Sort by highest occurence.
rdd_tok_by_user_count_filt = rdd_tok_by_user_count.filter(lambda x: x[1] >= 100).sortBy(lambda x: x[1], ascending=False)

# Get number of tokens mentioned at least by 100 users
num_freq_tokens = rdd_tok_by_user_count_filt.count()

# Get a list of top 20 most frequent tokens
top20 = rdd_tok_by_user_count_filt.take(20)

my_output.append("num-freq-tokens", num_freq_tokens)
my_output.append("top-20-tokens", top20)
print("Number of tokens:", num_freq_tokens)
print_tokens(top20)
save_time("count overall most popular tokens")

Number of tokens: 2882
===== overall =====
:	72738.0000
rt	61725.0000
.	52166.0000
the	43342.0000
,	33627.0000
to	33090.0000
…	31795.0000
trump	30498.0000
for	28707.0000
in	28207.0000
a	28076.0000
is	26959.0000
of	24136.0000
and	19967.0000
sanders	18676.0000
bernie	17908.0000
i	16035.0000
?	14852.0000
on	14164.0000
!	14134.0000


### Relative Popularity

For all tokens that are mentioned by at least 100 users, compute their relative popularity in each user group. Then print the top 10 tokens with highest relative popularity in each user group. In case two tokens have same relative popularity, break the tie by printing the alphabetically smaller one first.

**Tasks:**

1. Calculate `popular_10_in_each_group` list, in which the `i`-th element is a list of 10 (token, relative popularity) pair that are top 10 tokens with highest relative popularity in the `i`-th user group.

**Hint:** Let the relative popularity of a token $t$ be $p$. The order of the items will be satisfied by sorting them using (-p, t) as the key.

---

**It should print**
```
===== group 0 =====
with	-3.6088
cruz	-3.6554
his	-3.6582
amp	-3.8651
on	-3.9608
to	-4.0145
&	-4.0875
https	-4.1699
i	-4.1699
what	-4.1699
===== group 1 =====
sanders	-2.2854
gop	-2.4060
hillary	-2.4330
’	-2.4463
bernie	-2.4835
"	-2.6925
are	-2.7249
this	-2.7633
for	-2.8179
about	-2.8346
===== group 2 =====
with	-4.3458
donald	-4.5146
...	-4.7004
gop	-4.7279
i	-4.9475
on	-4.9608
he	-4.9925
…	-5.1155
https	-5.1699
what	-5.1699
===== group 3 =====
bernie	-1.5945
sanders	-1.6609
hillary	-2.2188
and	-2.5154
"	-2.5930
in	-2.6114
will	-2.6160
https	-2.6674
...	-2.7004
you	-2.7004
===== group 4 =====
what	-3.4330
have	-3.4725
bernie	-3.5380
this	-3.5518
it	-3.6881
?	-3.6912
for	-3.7110
about	-3.7415
hillary	-3.7549
that	-3.7625
===== group 5 =====
what	-1.8007
not	-1.8745
https	-2.0000
his	-2.0144
cruz	-2.0704
it	-2.1031
on	-2.1243
&	-2.1399
amp	-2.1489
;	-2.1592
===== group 6 =====
will	-1.3847
have	-1.4725
!	-1.5850
cruz	-1.6919
trump	-1.7199
https	-1.7549
-	-1.7673
;	-1.7807
be	-1.7952
amp	-1.8144
===== group 7 =====
donald	-1.0740
trump	-1.6535
bernie	-1.7790
sanders	-1.7829
’	-1.8613
of	-1.9069
?	-1.9186
with	-1.9307
the	-1.9588
be	-1.9758
```

In [25]:
%%time

save_time("print popular tokens in each group")

###
### YOUR CODE HERE
###

# Create RDD mapping each (partition k, token t) to each user_id.
# Eliminate duplicates (a user may use several times the same token, has to be counted ONE use at most)
rdd_tok_by_users_and_group = rdd_by_partition.flatMap(lambda x: [(x[0], t, x[1][0]) for t in tok.tokenize(x[1][1])]).distinct().map(lambda x: ((x[1], x[0]), x[2]))

# Create RDD counting the number of unique user_id, within each group, who mention each token.
rdd_tok_by_user_and_group_count = rdd_tok_by_users_and_group.groupByKey().map(lambda x: (x[0], len(list([1])))).map(lambda x: (x[0][0], (x[0][1], x[1])))

# Join rdd_tok_by_user_and_group_count and rdd_tok_by_user_count_filt based on token t
rdd_join_by_tok = rdd_tok_by_user_and_group_count.join(rdd_tok_by_user_count_filt).map(lambda x: (x[0], (x[1][0][0], x[1][0][1], x[1][1])))

# Compute relative popularity (p)
rdd_rel_pop = rdd_join_by_tok.map(lambda x: ((x[1][0], (x[0], get_rel_popularity(x[1][1],x[1][2])))))#.sortBy(lambda x: (-x[1][1], x[1][0]))

# Filter top 10 relative popularity in each group
#rdd_rel_pop_top10 = rdd_rel_pop.groupByKey().map(lambda x : (list(x[1])[:10]))
#popular_10_in_each_group = rdd_rel_pop_top10.collect()

rdd_rel_pop_top10 = rdd_rel_pop.groupByKey().map(lambda x : (list(x[1])))
popular_10_in_each_group = rdd_rel_pop_top10.collect()

correction = []
for group in popular_10_in_each_group:
    correction.append(sorted(group, key=lambda x: (-x[1], x[0]))[:10])
    
popular_10_in_each_group = correction

my_output.append("popular_10_in_each_group", popular_10_in_each_group)
for k in range(8):
    print_tokens(popular_10_in_each_group[k], k)
save_time("print popular tokens in each group")

===== group 0 =====
000	-6.6439
163.5	-6.6439
9/11	-6.6439
@xlaurenstephens	-6.6439
beaten	-6.6439
below	-6.6439
c'est	-6.6439
certainly	-6.6439
corrida	-6.6439
generation	-6.6439
===== group 1 =====
000	-6.6439
163.5	-6.6439
9/11	-6.6439
@billboarddance	-6.6439
@xlaurenstephens	-6.6439
beaten	-6.6439
below	-6.6439
c'est	-6.6439
certainly	-6.6439
corrida	-6.6439
===== group 2 =====
000	-6.6439
163.5	-6.6439
9/11	-6.6439
@billboarddance	-6.6439
beaten	-6.6439
c'est	-6.6439
certainly	-6.6439
fiebre	-6.6439
headed	-6.6439
jealous	-6.6439
===== group 3 =====
000	-6.6439
163.5	-6.6439
9/11	-6.6439
@billboarddance	-6.6439
beaten	-6.6439
below	-6.6439
c'est	-6.6439
certainly	-6.6439
corrida	-6.6439
fiebre	-6.6439
===== group 4 =====
000	-6.6439
163.5	-6.6439
9/11	-6.6439
@billboarddance	-6.6439
@xlaurenstephens	-6.6439
beaten	-6.6439
below	-6.6439
c'est	-6.6439
certainly	-6.6439
corrida	-6.6439
===== group 5 =====
000	-6.6439
163.5	-6.6439
9/11	-6.6439
@xlaurenstephens	-6.6439
beaten	-6.6439


## Important: Write your solutions to disk

Following cell write your solutions to disk which would be read in by the grader for grading.

In [21]:
my_output.write_to_disk()

## Runtime statistics of your implementation

When you run locally, the runtime of your implementation printed here will **NOT** be an accurate reflection of how your implementation will run on clusters. The time it takes to run locally might be faster or slower than running on clusters depending on the resources of your device (if you develop on your own device) or the workload of the workbench (if you develop on Vocareum).

If you want a relatively accurate estimate, please submit and see the runtime printed in the grading report. Note that the time estimated here in the grading report only considers the time it takes to run your application. There will be an inevitable overhead in grading to submit the Spark application and collect the logs, which will be included in the total runtime that will be used for grading.

In [22]:
save_time('total time')

for key in timer:
    print(key, timer[key])

total time 0:01:00
set up sc 0:00:02
read data 0:00:05
count unique users 0:00:06
count tweets per user partition 0:00:07
count all unique tokens 0:00:05
count overall most popular tokens 0:00:13
print popular tokens in each group 0:00:20
