## Twitter Data Analysis with Apache Spark

#### Tweets

A tweet consists of many data fields. [Here is an example](https://gist.github.com/arapat/03d02c9b327e6ff3f6c3c5c602eeaf8b). You can learn all about them in the Twitter API doc.

* `created_at`: Posted time of this tweet (time zone is included)
* `id_str`: Tweet ID - we recommend using `id_str` over using `id` as Tweet IDs, becauase `id` is an integer and may bring some overflow problems.
* `text`: Tweet content
* `user`: A JSON object for information about the author of the tweet
    * `id_str`: User ID
    * `name`: User name (may contain spaces)
    * `screen_name`: User screen name (no spaces)
* `retweeted_status`: A JSON object for information about the retweeted tweet (i.e. this tweet is not original but retweeteed some other tweet)
    * All data fields of a tweet except `retweeted_status`
* `entities`: A JSON object for all entities in this tweet
    * `hashtags`: An array for all the hashtags that are mentioned in this tweet
    * `urls`: An array for all the URLs that are mentioned in this tweet

Tweets are collected using the [Twitter Streaming API](https://dev.twitter.com/streaming/overview).


In [11]:
import os

class OutputLogger:
    def __init__(self):
        self.ans = {}

    def append(self, key, value):
        self.ans[key] = value

my_output = OutputLogger()

##### Spark configuration:

In [14]:
import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext()

In [15]:
with open('./hw2-files.txt') as f:
    file_path = [w.strip() for w in f.readlines() if w.strip()]

rdd = sc.textFile(file_path[0])
#rdd.cache()
count = rdd.count()

my_output.append("num-tweets", count)
print('Number of elements:', count)

Number of elements: 2150


* Input is a String
* Output is a JSON object if the tweet is valid and None if not valid
* Output is a JSON object if the tweet is valid (valid if json string starts with "created_at")

`We construct a pair RDD of (user_id, text)`: rdd2

In [16]:
import json

def safe_parse(raw_json):    
    s = raw_json[2:12]
    if(s == "created_at"):
        json_obj = json.loads(raw_json)
        return json_obj
    else:
        return None

In [17]:
l = rdd.collect()
r = []

for s in l:
    obj = safe_parse(s)
    if obj != None:
        r.append((obj['user']['id_str'], obj['text'])) 

rdd2 = sc.parallelize(r)

#### Number of unique users

In [18]:
users_count = rdd2.map(lambda p: p[0]).distinct().count()
print('The number of unique users is:', users_count)

The number of unique users is: 1748


## Number of posts from each user partition

In the Pickle file `users-partition.pickle`, We have a dictionary which represents a partition over 452,743 Twitter users, `{user_id: partition_id}`. The users are partitioned into 7 groups. For example, if the dictionary is loaded into a variable named `partition`, the partition ID of the user `59458445` is `partition["59458445"]`. These users are partitioned into 7 groups. The partition ID is an integer between 0-6.


##### Load the pickle file

In [19]:
import subprocess
import pickle

objects = []

with (open("users-partition2.pickle", "rb")) as f:
    while True:
        try:
            objects.append(pickle.load(f))
        except EOFError:
            break

#### Tweets per user partition

* Next we will count the number of posts from each user partition, 
* Count the number of posts from group 0, 1, ..., 6 ( `We assign users who are not in any partition to the group 7`)

The results of this step is constructed as a pair RDD `(group_id, count)` that is sorted by key.


In [20]:
def print_post_count(counts):
    for group_id, count in counts:
        print('Group %d posted %d tweets' % (group_id, count))

In [21]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

json_data = objects[0]
#{user_id: partition_id, user_id2: partition_id2 ...} to array: [(user_id,partition_id), (user_id2, partition_id2)...]

arr_id_partitions = []

for k, v in json_data.items():
    arr_id_partitions.append((k,v))
    
rdd3 = sc.parallelize(arr_id_partitions)


#Construct DataFrames:

# schema1 = StructType([StructField("Id", StringType(), False), StructField("partition", StringType(), False)])
# schema2 = StructType([StructField("Id", StringType(), False), StructField("text", StringType(), False)])
# df1 = sqlContext.createDataFrame(rdd3, schema1)
# df2 = sqlContext.createDataFrame(rdd2, schema2)
# df_joined = df1.join(df2, on = "Id", how = 'right')


#Join Rdds:
joined_rdd = rdd3.rightOuterJoin(rdd2)

# users with (partition_id == None) assign 7
interm_rdd = joined_rdd.map(lambda p: (7,p[1][1]) if(p[1][0] == None)  else (p[1][0],p[1][1]))

# compute the countes:
result_rdd = interm_rdd.map(lambda p: (p[0], 1)).reduceByKey(lambda p,q: p + q)
counts_per_partition = result_rdd.collect()

assert(type(counts_per_partition) is list and len(counts_per_partition) == 8 and len(counts_per_partition[0]) == 2)
print_post_count(counts_per_partition)

Group 0 posted 87 tweets
Group 1 posted 242 tweets
Group 2 posted 41 tweets
Group 3 posted 349 tweets
Group 4 posted 101 tweets
Group 5 posted 358 tweets
Group 6 posted 434 tweets
Group 7 posted 521 tweets


##  Tokens that are relatively popular in each user partition

In this step, we are going to find tokens that are relatively popular in each user partition.

* Let $N_t^k$ be the number of mentions of the token $t$ in the user partition $k$ (the number of users who belongs the   $k'th$ partition and mention $t$). Let $N_t^{all} = \sum_{i=0}^7 N_t^{i}$ be the number of total mentions of the token $t$.

* We define the relative popularity of a token $t$ in a user partition $k$ as the log ratio between $N_t^k$ and $N_t^{all}$, i.e. 

\begin{equation}
p_t^k = \log \frac{N_t^k}{N_t^{all}}.
\end{equation}

###  Helper Functions

In [27]:
from math import log

tok = Tokenizer(preserve_case=False)

def get_rel_popularity(c_k, c_all):
    return log(1.0 * c_k / c_all) / log(2)


def print_tokens(tokens, gid = None):
    group_name = "overall"
    if gid is not None:
        group_name = "group %d" % gid
    print('=' * 5 + ' ' + group_name + ' ' + '=' * 5)
    for t, n in tokens:
        print("%s\t%.4f" % (t, n))
    print

def print_tokens2(tokens, gid = None):
    group_name = "overall"
    if gid is not None:
        group_name = "group %d" % gid
    print('=' * 5 + ' ' + group_name + ' ' + '=' * 5)
    for t, n in tokens:
        print("%s\t%d" % (t, n))
    print

### Tokenize tweets

* We will count the number of tokens we have in the datasets
* And the number of mentions for each tokens regardless of specific user group.

In [23]:
from TokenizerKit import Tokenizer

In [24]:
num_of_tokens_rdd = interm_rdd.flatMap(lambda p: set(tok.tokenize(p[1])))
num_of_tokens = len(set(num_of_tokens_rdd.collect()))

print("Number of tokens:", num_of_tokens)

Number of tokens: 7677


### Token popularity

In this section we will filter tokens that are mentioned by less than 100 users ( the top 20 most frequent tokens are showed).

In [28]:
from collections import Counter

#num_of_tokens_rdd = interm_rdd.flatMap(lambda p: [(it[0],it[1]) for it in Counter(list(tok.tokenize(p[1]))).items()])

num_of_tokens_rdd = interm_rdd.flatMap(lambda p: [(t,1) for t in set(tok.tokenize(p[1]))])
num_freq_tokens_rdd1 = num_of_tokens_rdd.reduceByKey(lambda x, y: x + y).filter(lambda p: p[1] > 99)
num_freq_tokens = num_freq_tokens_rdd1.count()

#TOP 20:
top20 = sorted(num_freq_tokens_rdd1.collect(), key=lambda t: t[1], reverse=True)
top20 = top20[:20]

print("Number of tokens:", num_freq_tokens)
print_tokens2(top20)

Number of tokens: 51
===== overall =====
:	1294
rt	1147
.	833
the	652
trump	623
…	602
to	543
,	530
in	440
a	414
is	399
!	324
of	308
for	297
and	278
i	229
on	227
he	198
that	197
"	194


### Relative popularity
* The next code compute the relative popularity `in each user group` for all the previous tokens.

In [31]:
pre_join1 = interm_rdd.flatMap(lambda p: [((t,p[0]),1) for t in set(tok.tokenize(p[1]))])
pre_join2 = pre_join1.reduceByKey(lambda x, y: x + y).map(lambda it: (it[0][0],(it[0][1],it[1])))

jointed2 = pre_join2.join(num_freq_tokens_rdd1)

answerRdd = jointed2.map(lambda p: (p[1][0][0],(p[0], get_rel_popularity(p[1][0][1],p[1][1])))).groupByKey()

lists = answerRdd.map(lambda it: list(it[1])).collect()


popular_words_in_each_group = [] #sorted by (popularity "desc" then alphabetically "asc" )

for li in lists:
    tmp = sorted(li, key=lambda t: (-t[1],t[0]))[:10]
    popular_words_in_each_group.append(tmp)

for k in range(8):
    #Just the first 10 in each group
    print_tokens(popular_words_in_each_group[k], k)

===== group 0 =====
with	-3.6668
his	-3.7279
cruz	-3.8074
amp	-3.9341
on	-4.0192
this	-4.0980
to	-4.1306
&	-4.1813
https	-4.1964
what	-4.2352
===== group 1 =====
sanders	-2.2538
hillary	-2.2730
’	-2.3099
gop	-2.4195
bernie	-2.4964
this	-2.6386
are	-2.6439
clinton	-2.6439
that	-2.7152
&	-2.7408
===== group 2 =====
...	-4.1964
donald	-4.2310
with	-4.4037
gop	-4.7415
on	-5.0192
i	-5.0318
he	-5.0444
@berniesanders	-5.1430
https	-5.1964
what	-5.2352
===== group 3 =====
@berniesanders	-0.7737
bernie	-1.4964
sanders	-1.5619
in	-2.2578
hillary	-2.2730
clinton	-2.3219
and	-2.5644
"	-2.5999
will	-2.6761
...	-2.6939
===== group 4 =====
@berniesanders	-3.1430
what	-3.3607
have	-3.4983
this	-3.5131
"	-3.5999
bernie	-3.6033
?	-3.6483
vote	-3.7004
that	-3.7152
it	-3.7249
===== group 5 =====
what	-1.7758
not	-1.8461
his	-1.9730
https	-2.0265
cruz	-2.0849
it	-2.0875
if	-2.1346
on	-2.1541
i	-2.1668
&	-2.1813
===== group 6 =====
@realdonaldtrump	-0.7758
vote	-1.3081
will	-1.4183
have	-1.4279
!	-1.4450
tr