# Twitter Analysis

## Introduction

In this project, we are going to play with Twitter data.

The data is represented as rows of of [JSON](https://en.wikipedia.org/wiki/JSON#Example) strings.
It consists of tweets, messages, and a small amount of broken data (cannot be parsed as JSON).

For this project, we will only focus on tweets and ignore all other messages.

## Data overview 

### Tweets

A tweet consists of many data fields. You can learn all about them in the Twitter API doc. We are going to briefly introduce only the data fields that will be used in this homework.

* `created_at`: Posted time of this tweet (time zone is included)
* `id_str`: Tweet ID 
* `text`: Tweet content
* `user`: A JSON object for information about the author of the tweet
    * `id_str`: User ID
    * `name`: User name (may contain spaces)
    * `screen_name`: User screen name (no spaces)
* `retweeted_status`: A JSON object for information about the retweeted tweet (i.e. this tweet is not original but retweeteed some other tweet)
    * All data fields of a tweet except `retweeted_status`
* `entities`: A JSON object for all entities in this tweet
    * `hashtags`: An array for all the hashtags that are mentioned in this tweet
    * `urls`: An array for all the URLs that are mentioned in this tweet


### Users partition

the pickle file contains a partition over 452,743 Twitter users. It contains a Python dictionary `{user_id: partition_id}`. The users are partitioned into 7 groups.

In [1]:
import sys

ON_EMR = not hasattr(sys, 'ps1')
print('this program will run {}'.format(['locally', 'on clusters'][ON_EMR]))

this program will run locally


### OutputLogger

`OutputLogger` object `my_output` is defined to store the results of our program. A function calls to `my_output.append()` method for storing the results in all necessary places. we then write the content of `my_output` to a pickle file.

In [2]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

import pickle

class OutputLogger:
    def __init__(self):
        self.ans = {}

    def append(self, key, value):
        self.ans[key] = value

    def write_to_disk(self):
        filepath = os.path.expanduser("result.pickle")
        with open(filepath, 'wb') as f:
            pickle.dump(self.ans, f)
        
        # If ON_EMR, copy answer.pickle from the driver to HDFS
        if ON_EMR:
            pass

my_output = OutputLogger()

### Measuring runtime


In [4]:
from time import time
import datetime

timer = {}
def save_time(key):
    '''
    Calling save_time with 'key' the first time will record the current time for 'key'.
    Calling save_time with 'key' the second time will record the time (hours:minutes:seconds) 
    it takes from the first calling to the second calling. All results will be saved
    in timer dict.
    Calling save_time with 'key' the third time will overwrite existing data.
    
    Args:
        key: an identifier for this time.
    '''
    
    if key in timer: 
        if type(timer[key]) == str:
            timer[key] = time()
        else:
            timer[key] = str(datetime.timedelta(seconds=round(time() - timer[key])))
    else:
        timer[key] = time()

def print_count(rdd):
    print('Number of elements:', rdd.count())
                
save_time('total time')

## Part 0: Load data to an RDD

In [5]:
save_time("set up sc")

from pyspark import SparkContext

sc = SparkContext()

save_time("set up sc")

23/11/25 21:02:29 WARN Utils: Your hostname, abdou-Nitro-AN515-55 resolves to a loopback address: 127.0.1.1; using 10.188.49.204 instead (on interface wlp0s20f3)
23/11/25 21:02:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/25 21:02:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Read text file


**Tasks:**

1. Make RDD from the data in the file given by `file_path`.
2. Mark the RDD to be cached (so in the next operation data will be loaded in memory)
3. Count the number of elements in the RDD and store the result in `num_tweets`.


***


In [6]:
save_time("read data")

if ON_EMR:
    file_path = ''
else:
    file_path = './data/twitter.txt'

rdd = sc.textFile(file_path).cache()

num_tweets = rdd.count()

my_output.append("num-tweets", num_tweets)
print('Number of elements:', num_tweets)
save_time("read data")

[Stage 0:>                                                          (0 + 2) / 2]

Number of elements: 2150


                                                                                

## Part 1: Parse JSON strings to JSON objects

Python has built-in support for JSON.

In [7]:
import json

json_example = '''
{
    "id": 1,
    "name": "A green door",
    "price": 12.50,
    "tags": ["home", "green"]
}
'''

json_obj = json.loads(json_example)
json_obj

{'id': 1, 'name': 'A green door', 'price': 12.5, 'tags': ['home', 'green']}

### Broken tweets and irrelevant messages

The data may contain broken tweets (invalid JSON strings). So we'll make sure that our code is robust for such cases.

We can filter out such broken tweet by checking if:
* the line is not in json format

In addition, some lines in the input file might not be tweets, but messages that the Twitter server sent to the developer (such as limit notices). Our program should also ignore these messages.

These messages would not contain the `created_at` field and can be filtered out accordingly.
* Check if json object of the broken tweet has a `created_at` field


**Tasks:**

1. Parse raw JSON tweets stored in the RDD you created above to obtain **valid** JSON objects. 
1. From all valid tweets, construct a pair RDD of `(user_id, text)`, where `user_id` is the `id_str` data field of the `user` dictionary ( [Tweets](#Tweets) section above), `text` is the `text` data field.

In [8]:
import json

def safe_parse(raw_json):
    """
    Input is a String
    Output is a JSON object if the tweet is valid and None if not valid
    """
    
    try:
        json_object = json.loads(raw_json)
    except ValueError as e:
        return False
    if "created_at" in json_object and "user" in json_object:
        return json_object
    return False


In [9]:
"""
# construct an RDD of (user_id, text) here.
"""

rdd_pair = rdd.filter(safe_parse).map(lambda tweet: (safe_parse(tweet)["user"]["id_str"], safe_parse(tweet)["text"].encode("utf-8"))).cache()


### Number of unique users

**Tasks:**

1. Count the number of different users in all valid tweets and store the result in `num_unique_users`.



In [10]:
save_time("count unique users")

num_unique_users = rdd_pair.map(lambda u: u[0]).distinct().count()

my_output.append("num-unique-users", num_unique_users)
print('The number of unique users is:', num_unique_users)
save_time("count unique users")

The number of unique users is: 1748


## Part 2: Number of posts from each user partition

### Load the pickle file

Load the Pickle file `users-partition.pickle`, we will get a dictionary which represents a partition over 452,743 Twitter users, `{user_id: partition_id}`. The users are partitioned into 7 groups. For example, if the dictionary is loaded into a variable named `partition`, the partition ID of the user `59458445` is `partition["59458445"]`. These users are partitioned into 7 groups. The partition ID is an integer between 0-6.

Note that the user partition we provide doesn't cover all users appear in the input data.

In [11]:
import subprocess
import pickle

if ON_EMR:
    command = 0
else:
    command = ["cat", "./data/users-partition.pickle"]
    
proc = subprocess.Popen(command, stdout=subprocess.PIPE)
pickle_content = proc.communicate()[0]
partition = pickle.loads(pickle_content)
len(partition)

452743

### Tweets per user partition

**Tasks:**

1. Count the number of posts from group 0, 1, ..., 6, plus the number of posts from users who are not in any partition. Assign users who are not in any partition to the group 7.
1. Put the results of this step into a pair RDD `(group_id, count)` that is sorted by key.
1. Collect the RDD to get `counts_per_part` list.

***


In [12]:
def print_post_count(counts):
    for group_id, count in counts:
        print('Group %d posted %d tweets' % (group_id, count))

In [13]:

def map_user(u, t):
    if u in partition:
        return partition[u], u
    else:
        return 7, u
    
rdd_part = rdd_pair.map(lambda x: map_user(x[0], x[1])).groupByKey()
counts_per_part = rdd_part.map(lambda x: (x[0], len(list(x[1])))).sortByKey().collect()

type(counts_per_part)

list

In [14]:
save_time("count tweets per user partition")


# Following code adds your solution to `my_output`
assert(type(counts_per_part) is list and
       len(counts_per_part) == 8 and
       len(counts_per_part[0]) == 2)
print_post_count(counts_per_part)
my_output.append("counts_per_part", counts_per_part)
save_time("count tweets per user partition")

Group 0 posted 87 tweets
Group 1 posted 242 tweets
Group 2 posted 41 tweets
Group 3 posted 349 tweets
Group 4 posted 101 tweets
Group 5 posted 358 tweets
Group 6 posted 434 tweets
Group 7 posted 521 tweets


## Part 3:  Tokens that are relatively popular in each user partition

In this step, we are going to find tokens that are relatively popular in each user partition.

We define the number of mentions of a token $t$ in a specific user partition $k$ as the number of users from the user partition $k$ that ever mentioned the token $t$ in their tweets. Note that even if some users might mention a token $t$ multiple times or in multiple tweets, a user will contribute at most 1 to the counter of the token $t$.

The number of mentions of a token is equal to the number of users who mentioned this token but NOT the number of tweets that mentioned this token.

Let $N_t^k$ be the number of mentions of the token $t$ in the user partition $k$. Let $N_t^{all} = \sum_{i=0}^7 N_t^{i}$ be the number of total mentions of the token $t$.

We define the relative popularity of a token $t$ in a user partition $k$ as the log ratio between $N_t^k$ and $N_t^{all}$, i.e. 

\begin{equation}
p_t^k = \log \frac{N_t^k}{N_t^{all}}.
\end{equation}



In [15]:
from six import unichr

#!/usr/bin/env python

"""
This code implements a basic, Twitter-aware tokenizer.

A tokenizer is a function that splits a string of text into words. In
Python terms, we map string and unicode objects into lists of unicode
objects.

There is not a single right way to do tokenizing. The best method
depends on the application.  This tokenizer is designed to be flexible
and this easy to adapt to new domains and tasks.  The basic logic is
this:

1. The tuple regex_strings defines a list of regular expression
   strings.

2. The regex_strings strings are put, in order, into a compiled
   regular expression object called word_re.

3. The tokenization is done by word_re.findall(s), where s is the
   user-supplied string, inside the tokenize() method of the class
   Tokenizer.

4. When instantiating Tokenizer objects, there is a single option:
   preserve_case.  By default, it is set to True. If it is set to
   False, then the tokenizer will downcase everything except for
   emoticons.

The __main__ method illustrates by tokenizing a few examples.

I've also included a Tokenizer method tokenize_random_tweet(). If the
twitter library is installed (http://code.google.com/p/python-twitter/)
and Twitter is cooperating, then it should tokenize a random
English-language tweet.


Julaiti Alafate:
  I modified the regex strings to extract URLs in tweets.
"""

__author__ = "Christopher Potts"
__copyright__ = "Copyright 2011, Christopher Potts"
__credits__ = []
__license__ = "Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Unported License: http://creativecommons.org/licenses/by-nc-sa/3.0/"
__version__ = "1.0"
__maintainer__ = "Christopher Potts"
__email__ = "See the author's website"

######################################################################

import re
from html import entities 

######################################################################
# The following strings are components in the regular expression
# that is used for tokenizing. It's important that phone_number
# appears first in the final regex (since it can contain whitespace).
# It also could matter that tags comes after emoticons, due to the
# possibility of having text like
#
#     <:| and some text >:)
#
# Most imporatantly, the final element should always be last, since it
# does a last ditch whitespace-based tokenization of whatever is left.

# This particular element is used in a couple ways, so we define it
# with a name:
emoticon_string = r"""
    (?:
      [<>]?
      [:;=8]                     # eyes
      [\-o\*\']?                 # optional nose
      [\)\]\(\[dDpP/\:\}\{@\|\\] # mouth      
      |
      [\)\]\(\[dDpP/\:\}\{@\|\\] # mouth
      [\-o\*\']?                 # optional nose
      [:;=8]                     # eyes
      [<>]?
    )"""

# The components of the tokenizer:
regex_strings = (
    # Phone numbers:
    r"""
    (?:
      (?:            # (international)
        \+?[01]
        [\-\s.]*
      )?            
      (?:            # (area code)
        [\(]?
        \d{3}
        [\-\s.\)]*
      )?    
      \d{3}          # exchange
      [\-\s.]*   
      \d{4}          # base
    )"""
    ,
    # URLs:
    r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"""
    ,
    # Emoticons:
    emoticon_string
    ,    
    # HTML tags:
     r"""<[^>]+>"""
    ,
    # Twitter username:
    r"""(?:@[\w_]+)"""
    ,
    # Twitter hashtags:
    r"""(?:\#+[\w_]+[\w\'_\-]*[\w_]+)"""
    ,
    # Remaining word types:
    r"""
    (?:[a-z][a-z'\-_]+[a-z])       # Words with apostrophes or dashes.
    |
    (?:[+\-]?\d+[,/.:-]\d+[+\-]?)  # Numbers, including fractions, decimals.
    |
    (?:[\w_]+)                     # Words without apostrophes or dashes.
    |
    (?:\.(?:\s*\.){1,})            # Ellipsis dots. 
    |
    (?:\S)                         # Everything else that isn't whitespace.
    """
    )

######################################################################
# This is the core tokenizing regex:
    
word_re = re.compile(r"""(%s)""" % "|".join(regex_strings), re.VERBOSE | re.I | re.UNICODE)

# The emoticon string gets its own regex so that we can preserve case for them as needed:
emoticon_re = re.compile(regex_strings[1], re.VERBOSE | re.I | re.UNICODE)

# These are for regularizing HTML entities to Unicode:
html_entity_digit_re = re.compile(r"&#\d+;")
html_entity_alpha_re = re.compile(r"&\w+;")
amp = "&amp;"

######################################################################

class Tokenizer:
    def __init__(self, preserve_case=False):
        self.preserve_case = preserve_case

    def tokenize(self, s):
        """
        Argument: s -- any string or unicode object
        Value: a tokenize list of strings; conatenating this list returns the original string if preserve_case=False
        """        
        # Try to ensure unicode:
        try:
            s = str(s)
        except UnicodeDecodeError:
            s = s.encode('string_escape')
            s = str(s)
        # Fix HTML character entitites:
        s = self.__html2unicode(s)
        # Tokenize:
        words = word_re.findall(s)
        # Possible alter the case, but avoid changing emoticons like :D into :d:
        if not self.preserve_case:            
            words = list(map((lambda x : x if emoticon_re.search(x) else x.lower()), words))
        return words

    def tokenize_random_tweet(self):
        """
        If the twitter library is installed and a twitter connection
        can be established, then tokenize a random tweet.
        """
        try:
            import twitter
        except ImportError:
            print("Apologies. The random tweet functionality requires the Python twitter library: http://code.google.com/p/python-twitter/")
        from random import shuffle
        api = twitter.Api()
        tweets = api.GetPublicTimeline()
        if tweets:
            for tweet in tweets:
                if tweet.user.lang == 'en':            
                    return self.tokenize(tweet.text)
        else:
            raise Exception("Apologies. I couldn't get Twitter to give me a public English-language tweet. Perhaps try again")

    def __html2unicode(self, s):
        """
        Internal metod that seeks to replace all the HTML entities in
        s with their corresponding unicode characters.
        """
        # First the digits:
        ents = set(html_entity_digit_re.findall(s))
        if len(ents) > 0:
            for ent in ents:
                entnum = ent[2:-1]
                try:
                    entnum = int(entnum)
                    s = s.replace(ent, unichr(entnum))	
                except:
                    pass
        # Now the alpha versions:
        ents = set(html_entity_alpha_re.findall(s))
        ents = filter((lambda x : x != amp), ents)
        for ent in ents:
            entname = ent[1:-1]
            try:            
                s = s.replace(ent, unichr(entities.name2codepoint[entname]))
            except:
                pass                    
            s = s.replace(amp, " and ")
        return s


In [16]:
from math import log

tok = Tokenizer(preserve_case=False)

def get_rel_popularity(c_k, c_all):
    '''
    Compute the relative popularity of a token.
    
    Args:
        c_k: the number of mentions in the user partition k.
        c_all: the number of all mentions.
        
    Return:
        The relative popularity of the token. It should be a negative number due to the log function. 
    '''
    
    return log(1.0 * c_k / c_all) / log(2)

def print_tokens(tokens, gid=None):
    group_name = "overall"
    if gid is not None:
        group_name = "group %d" % gid
        
    print('=' * 5 + ' ' + group_name + ' ' + '=' * 5)
    for t, n in tokens:
        print("%s\t%.4f" % (t, n))
    print

### Tokenize tweets

**Tasks:**

1. Tokenize the tweets using the `tokenize` function that is a method of the `Tokenizer` class that we have instantiated as `tok`. 
1. Count the number of mentions for each tokens regardless of specific user group and store them in a RDD, which will be used later.
1. Get `num_of_tokens`, which is how many different tokens we have.

---



In [35]:
save_time("count all unique tokens")
def map_user2(u):
    if u in partition:
        return partition[u]
    return 7
v1=rdd_pair.mapValues(lambda t : set(tok.tokenize(t)))\
    .reduceByKey(lambda t, t1: t.union(t1))\
    .map(lambda u: (map_user2(u[0]),list(u[1])))\
    .flatMapValues(lambda t : t).cache()
num_of_tokens = v1.map(lambda u: u[1]).distinct().count()

In [36]:

my_output.append("num-tokens", num_of_tokens)
print("Number of tokens:", num_of_tokens)
save_time("count all unique tokens")

Number of tokens: 8038


### Token popularity

Tokens that are mentioned by too few users are usually not very interesting. So we want to only keep tokens that are mentioned by at least 100 users. Filter out tokens that don't meet this requirement.

**Tasks:**

Compute the two varaibles below:
1. `num_freq_tokens`: an int of how many different tokens we have after the filtering. 
2. `top20`: a list that contains the top 20 most frequent tokens after the filtering.

---

