*__[Twitter Dataset](https://archive.org/details/twitter_cikm_2010)__:  This dataset is a collection of scraped public twitter updates used to study the geolocation data related to twittering. We will use the first 500,000 lines of the tab separated files "test_set_tweets.txt" and "training_set_tweets.txt", where each line is of the form: UserID, TweetID, Tweet, CreatedAt.*

*__[Friend Network Dataset](http://socialcomputing.asu.edu/datasets/Twitter)__: This dataset is a graph dataset that represents the follower relationships among 11 million Twitter users. We will use the first 500,000 lines of the comma separated file "edges.csv", where each line is of the form: UserID, FollowingUserID.*

**We will use MapReduce and command line processing to compute aggregate statistics of the above data.**

## 1) Finding Trends

### 1.1)

**Parse the "test_set_tweets.txt" to extract hashtags (i.e., anything starting with “#”), convert to lower case and remove all non-alpha numeric characters, count the occurrences of each hashtag, and return the top 10 hashtags (i.e., with largest number of occurrences).**

### a) 

**Write a MapReduce approach to accomplish the above task. Include a mapper function, reducer function, and execute function.**

In [7]:
import pandas as pd # import statements
import numpy as np
import re
from collections import OrderedDict

In [40]:
lines = open('twitter_cikm_2010/test_set_tweets_partial.txt').read().splitlines()

**Mapper:**

In [49]:
def map_hashtag(line):
    hashtags = re.findall(r'^#(\S+)', line.strip().lower()) # find all patterns followed by # w/ >= 1 non-whitespace chars
    hashtags.extend(re.findall(r'\s#(\S+)', line.strip().lower()))
    hashtags = [re.sub(r'\W', '', hashtag) for hashtag in hashtags] # remove all non-alphanumeric chars in each hashtag
    return [(hashtag, 1) for hashtag in hashtags] # return tuples containing hashtag and frequency = 1


**Reducer:**

In [50]:
def reduce_find_freq(key, vals):
    return (key, sum(vals))


**Execution:**

In [51]:
def mapreduce_exec(data, mapper, reducer):
    hashtag_freq_tups = map(mapper, data) # call mapper for each line of data
    hashtag_freq_tups = np.concatenate([tup for tup in hashtag_freq_tups if len(tup) > 0]) # extend lists of tuple pairs/line and remove empty lists
    hashtag_freq_tups = [tuple(tup) for tup in hashtag_freq_tups]
    
    hashtag_freq_dict = {tup[0]:[] for tup in hashtag_freq_tups} # construct dict w/ keys as hashtags and values as lists
    for tup in hashtag_freq_tups:
        hashtag_freq_dict[tup[0]].append(int(tup[1])) # add freq to dict value list for each hashtag
    return sorted([reducer(tup[0], tup[1]) for tup in hashtag_freq_dict.items()], key=lambda x: x[1], reverse=True)[0:10] 
        

### b) 

**Report the wall clock runtime of your program when applied to the first 500,000 lines of "test_set_tweets.txt".**

In [52]:
import time # import statements

In [55]:
start_time = time.time()
mapreduce_exec(lines, map_hashtag, reduce_find_freq)
str(time.time() - start_time) + 's'

'5.500615119934082s'

### c) 

**Report the top ten hashtags with their counts.**

In [54]:
mapreduce_exec(lines, map_hashtag, reduce_find_freq)

[('ff', 3522),
 ('nowplaying', 1799),
 ('fb', 1362),
 ('mm', 1017),
 ('fail', 628),
 ('random', 601),
 ('haiti', 586),
 ('shoutout', 516),
 ('musicmonday', 451),
 ('followfriday', 449)]

### d) 

**Write a command line program to accomplish the same task, using e.g., grep, sed, awk, sort.**

```grep -oE "\s#\S+|^#\S+" /Users/Aman/Documents/Jupyter\ Notebooks/CS\ 242/Project\ 3/twitter_cikm_2010/test_set_tweets_partial.txt | tr -dc '[:alnum:]\n' | tr '[:upper:]' '[:lower:]' | sort | uniq -c | sort -r | head -10```

**Output:**

3522 ff
<br>
1799 nowplaying
<br>
1362 fb
<br>
1017 mm
<br>
628 fail
<br>
601 random
<br>
586 haiti
<br>
516 shoutout
<br>
451 musicmonday
<br>
449 followfriday

### e) 

**Run your command line program in a shell script to record the wall clock runtime.**

```time grep -oE "#\S+" /Users/Aman/Documents/Jupyter\ Notebooks/CS\ 242/Project\ 3/twitter_cikm_2010/test_set_tweets_partial.txt | tr -dc '[:alnum:]\n' | tr '[:upper:]' '[:lower:]' | sort | uniq -c | sort -r | head -10```

**Output:**

real	0m4.421s
<br>
user	0m4.547s
<br>
sys	0m0.082s

### f) 

**Discuss how the runtimes compare between the two approaches.**

On average, the UNIX runtime is faster by over a second. This could be because in the python script, two loops must be executed to search for matching patterns, while in UNIX, it can be done with one loop.

### 1.2)

**Use Unix to join the first 500,000 lines from the file "test_set_tweets.txt" and the first 250,000 lines from the file "training_set_tweets.txt" into one file, say "tweets.txt". Parse this file to extract usernames (i.e.,
anything starting with “@”), count the occurrences of each username, and return the top 10 usernames (i.e., with largest number of occurrences).**

```head -250000 /Users/Aman/Documents/Jupyter\ Notebooks/CS\ 242/Project\ 3/twitter_cikm_2010/training_set_tweets.txt > /Users/Aman/Documents/Jupyter\ Notebooks/CS\ 242/Project\ 3/twitter_cikm_2010/training_set_tweets_partial.txt```

```cat /Users/Aman/Documents/Jupyter\ Notebooks/CS\ 242/Project\ 3/twitter_cikm_2010/test_set_tweets_partial.txt /Users/Aman/Documents/Jupyter\ Notebooks/CS\ 242/Project\ 3/twitter_cikm_2010/training_set_tweets_partial.txt > /Users/Aman/Documents/Jupyter\ Notebooks/CS\ 242/Project\ 3/twitter_cikm_2010/tweets.txt```

### a)

**Write a MapReduce approach to accomplish the above task. Include a mapper function, reducer function, and execute function.**

In [41]:
lines = open('twitter_cikm_2010/tweets.txt').read().splitlines()

**Mapper:**

In [66]:
def map_username(line):
    usernames = re.findall(r'^@(\S+)', line.strip()) # find all patterns followed by # w/ >= 1 non-whitespace chars
    usernames.extend(re.findall(r'\s@(\S+)', line.strip()))
    return [('@' + username, 1) for username in usernames] # return tuples containing username and frequency = 1


**Reducer:**

In [67]:
def reduce_find_freq(key, vals):
    return (key, sum(vals))

**Execution:**

In [68]:
def mapreduce_exec(data, mapper, reducer):
    username_freq_tups = map(mapper, data) # call mapper for each line of data
    username_freq_tups = np.concatenate([tup for tup in username_freq_tups if len(tup) > 0]) # extend lists of tuple pairs/line and remove empty lists
    username_freq_tups = [tuple(tup) for tup in username_freq_tups]
    
    username_freq_dict = {tup[0]:[] for tup in username_freq_tups} # construct dict w/ keys as usernames and values as lists
    for tup in username_freq_tups:
        username_freq_dict[tup[0]].append(int(tup[1])) # add freq to dict value list for each username
    return sorted([reducer(tup[0], tup[1]) for tup in username_freq_dict.items()], key=lambda x: x[1], reverse=True)[0:10] 
        

### b)

**Report the wall clock runtime of your program.**

In [60]:
start_time = time.time()
mapreduce_exec(lines, map_hashtag, reduce_find_freq)
str(time.time() - start_time) + 's'

'7.473917007446289s'

### c)

**Report the top ten usernames with their counts.**

In [69]:
mapreduce_exec(lines, map_username, reduce_find_freq)

[('@RevRunWisdom:', 1229),
 ('@listensto', 939),
 ('@DonnieWahlberg', 523),
 ('@OGmuscles', 441),
 ('@addthis', 429),
 ('@breatheitin', 407),
 ('@justinbieber', 354),
 ('@MAV25', 347),
 ('@karlievoice', 304),
 ('@mtgcolorpie', 291)]

### d)

**Write a command line program to accomplish the same task, using e.g., grep, sed, awk, sort.**

``` grep -oE "^@\S+|\s@\S+" /Users/Aman/Documents/Jupyter\ Notebooks/CS\ 242/Project\ 3/twitter_cikm_2010/tweets.txt | awk '{$1=$1};1' | sort | uniq -c | sort -r | head -10```

**Output:**

1229 @RevRunWisdom:
<br>
939 @listensto
<br>
523 @DonnieWahlberg
<br>
441 @OGmuscles
<br>
429 @addthis
<br>
407 @breatheitin
<br>
354 @justinbieber
<br>
347 @MAV25
<br>
304 @karlievoice
<br>
291 @mtgcolorpie

### e)

**Run your command line program in a shell script to record the wall clock runtime.**

```time grep -oE "@\S+" /Users/Aman/Documents/Jupyter\ Notebooks/CS\ 242/Project\ 3/twitter_cikm_2010/tweets.txt | sort | uniq -c | sort -r | head -10```

**Output:**

real	0m18.373s
<br>
user	0m18.771s
<br>
sys	0m0.311s

### f)

**Discuss how the runtimes compare between the two approaches.**

The UNIX runtime is approximately 10 seconds slower than the approach using MapReduce in Python. This could be the case because in UNIX, sorting is occuring twice. In the Python script, there is only one loop.

**Within the file "tweets.txt", find the number of tweets that have at least two hashtags.**

### a) 

**Write a MapReduce approach to accomplish the above task. Include a mapper function, reducer function, and execute function.**

**Mapper:**

In [43]:
def map_hashtag(line):
    hashtags = re.findall(r'^#(\S+)', line.strip()) # find all patterns followed by # w/ >= 1 non-whitespace chars
    hashtags.extend(re.findall(r'\s#(\S+)', line.strip()))
    #print(hashtags)
    return [(line, 1) for hashtag in hashtags] # return tuples containing line and frequency = 1

**Reducer:**

In [37]:
def reduce_find_freq(line, hashtag_freq):
    return (line, sum(hashtag_freq))

**Execution:**

In [45]:
def mapreduce_exec(data, mapper, reducer):
    line_freq_tups = map(mapper, data) # call mapper for each line of data
    line_freq_tups = np.concatenate([tup for tup in line_freq_tups if len(tup) > 1]) # extend lists of tuple pairs/line and remove empty lists
    line_freq_tups = [tuple(tup) for tup in line_freq_tups]
    
    line_freq_dict = {tup[0]:[] for tup in line_freq_tups} # construct dict w/ keys as lines and values as lists
    for tup in line_freq_tups:
        line_freq_dict[tup[0]].append(int(tup[1])) # add freq to dict value list for each line
    return len([tup[0] for tup in [reducer(tup[0], tup[1]) for tup in line_freq_dict.items()] if tup[1] >= 2])


### b)

**Report the wall clock runtime of your program.**

In [206]:
start_time = time.time()
mapreduce_exec(lines, map_hashtag, reduce_find_freq)
str(time.time() - start_time) + 's'

'6.263382911682129s'

### c)

**Report the number of tweets with at least two hashtags.**

In [46]:
mapreduce_exec(lines, map_hashtag, reduce_find_freq)

15004

### d)

**Write a command line program to accomplish the same task, using e.g., grep, sed, awk, sort.**

```grep -o -n -E "^#\S+|\s#\S+" /Users/Aman/Documents/Jupyter\ Notebooks/CS\ 242/Project\ 3/twitter_cikm_2010/tweets.txt | cut -d : -f 1 | uniq -c | awk '{if ($1 >= 2) {print $1}}' | wc -l```

**Output:**

15004

### e)

**Run your command line program in a shell script to record the wall clock runtime.**

```time grep -o -n -E "#\S+" /Users/Aman/Documents/Jupyter\ Notebooks/CS\ 242/Project\ 3/twitter_cikm_2010/tweets.txt | cut -d : -f 1 | uniq -c | awk '{if ($1 >= 2) {print $1}}' | wc -l```

**Output:**

real 0m2.720s
<br>
user 0m1.606s
<br>
sys 0m0.244s

### f)

**Discuss how the runtimes compare between the two approaches.**

UNIX is over 5 seconds faster than MapReduce in Python. This could be because Python is using many complex internal data structures that is taking up more time, compared to UNIX which is simply reading lines and performing operations.

## 2) Finding Reciprocal Followers

**Process the follower network data to determine reciprocal following relationships, i.e., pairs of users that mutually follow each other.**

### a)

**Write a MapReduce approach to accomplish the above task. Include a mapper function, reducer function, and execute function.**

In [50]:
data = pd.read_csv('Twitter-dataset/data/edges_partial.csv', index_col = 0, header=None, names=['Following'])
users = sorted(set(data.index).intersection(data['Following']))
users_followers = {user:[] for user in users}

**Mapper:**

In [47]:
def map_users(user):
    try:
        if user[0] in users_followers[user.name] and user.name in users_followers[user[0]]:
            return ((user.name, user[0]), True)
        else:
            return ((user.name, user[0]), False)
    except KeyError:
        return ((user.name, user[0]), False)

**Reducer:**

In [48]:
def reduce_users(pairs):
    mutual_followers = [pair[0] for pair in pairs if pair[1] == True]
    return mutual_followers
    #return list(set(tuple(sorted(pair)) for pair in mutual_followers))   

**Execution:**

In [49]:
def mapreduce_exec(data, mapper, reducer):
    for user in users_followers:
        users_followers[user].extend([user for user in data.loc[user, 'Following'].values if user in users])
            
    pairs = data.loc[users].apply(mapper, axis=1)
    return reducer(pairs.values)

### b)

**Report the wall clock runtime of your program when applied to the first 500,000 lines of edges.csv.**

In [423]:
start_time = time.time()
mapreduce_exec(data, map_users, reduce_users)
str(time.time() - start_time) + 's'

'6.473400115966797s'

### c)

**Output the results in a text file to use in the next question. This will just be a subset of the original edges.csv file. Report the difference in size between the two versions of the graph with respect to number of unique nodes, and number of edges.**

```python
with open('Twitter-dataset/data/mutuals_friends.txt', 'w') as f:
    for pair in mapreduce_exec(data, map_users, reduce_users):
        f.write(str(pair) + '\n')
```

**File Contents:**

In [53]:
for line in open('Twitter-dataset/data/mutuals_friends.txt').read().splitlines():
    print(line)

(3682, 5276)
(5276, 3682)
(13232, 18205)
(13232, 63255)
(15574, 15926)
(15926, 15574)
(18205, 13232)
(19628, 19821)
(19628, 20033)
(19821, 19628)
(20033, 19628)
(22196, 76473)
(23503, 41422)
(31866, 32002)
(32002, 31866)
(32173, 32452)
(32452, 32173)
(33099, 62167)
(33884, 34046)
(33884, 34101)
(34046, 33884)
(34101, 33884)
(40704, 41039)
(40704, 40997)
(40997, 62623)
(40997, 40704)
(40997, 41039)
(40997, 201063)
(41039, 40704)
(41039, 40997)
(41422, 23503)
(58783, 58875)
(58875, 58783)
(60887, 70696)
(62167, 33099)
(62623, 40997)
(63255, 65435)
(63255, 13232)
(65411, 65435)
(65435, 65411)
(65435, 63255)
(65435, 93260)
(70696, 60887)
(70696, 70772)
(70772, 70696)
(76473, 22196)
(78182, 78464)
(78464, 78182)
(80092, 80096)
(80096, 80092)
(89222, 89350)
(89350, 89222)
(93260, 65435)
(93260, 93427)
(93427, 93260)
(100591, 100721)
(100721, 100591)
(102898, 122546)
(122546, 102898)
(134409, 134410)
(134410, 134409)
(135546, 135684)
(135684, 135546)
(192865, 192899)
(192899, 192865)
(201063,

In [104]:
mutual_friend_edges = mapreduce_exec(data, map_users, reduce_users)
print('# Nodes Original: {}'.format(len(set(data.index).union(set(data['Following'])))))
print('# Edges Original: {}'.format(data.shape[0]))
print('# Nodes New: {}'.format(len(set.union(*[set(pair) for pair in mutual_friend_edges]))))
print('# Edges New: {}'.format(len(mutual_friend_edges)))

# Nodes Original: 249402
# Edges Original: 500000
# Nodes New: 55
# Edges New: 68


The # of nodes and edges in the new symmetric followers graph is significantly lower than in the original graph, as presented above. 

### d)

**Write a command line program to accomplish the same task, using e.g., awk, sort, join.**

```head edges.csv -n 500000 | awk -F, '{if ($1 > $2){var = $1; $1 = $2; $2 = var;} print $0}' | sed 's/\W/,/g' | sort | uniq -d```

**Output:**

100591,100721
<br>
102898,122546
<br>
13232,18205
<br>
13232,63255
<br>
134409,134410
<br>
135546,135684
<br>
15574,15926
<br>
192865,192899
<br>
19628,19821
<br>
19628,20033
<br>
201078,201607
<br>
22196,76473
<br>
23503,41422
<br>
31866,32002
<br>
32173,32452
<br>
33099,62167
<br>
33884,34046
<br>
33884,34101
<br>
3682,5276
<br>
40704,40997
<br>
40704,41039
<br>
40997,201063
<br>
40997,41039
<br>
40997,62623
<br>
58783,58875
<br>
60887,70696
<br>
63255,65435
<br>
65411,65435
<br>
65435,93260
<br>
70696,70772
<br>
78182,78464
<br>
80092,80096
<br>
89222,89350
<br>
93260,93427

### e) 

**Run your command line program in a shell script to record the wall clock runtime.**

```time head edges.csv -n 500000 | awk -F, '{if ($1 > $2){var = $1; $1 = $2; $2 = var;} print $0}' | sed 's/\W/,/g' | sort | uniq -d```

**Output:**

real 0m4.661s
<br>
user 0m5.778s
<br>
sys 0m0.239s

### f)

**Discuss how the runtimes compare between the two approaches.**

Python is faster that UNIX by almost 2 seconds. This is probably because in Python, we are using a subset of the data (interescting the set of people with the set of people they are following produces a set of users that contains a subset of users that are mutual friends).

## 3) Finding Friends of Friends

**Use the symmetric follower graph from the question above. For each pair of friends (i.e., pair of linked users), find the number of friends they have in common.**

### a)

**Write a map reduce approach in python to accomplish this task. Note: you will probably need two map/reduce functions to accomplish this task: one to identify the friends of each user, and another to find the friends they have in common.**

In [114]:
data = pd.read_csv('Twitter-dataset/data/edges_partial.csv', index_col = 0, header=None, names=['Following'])
mutual_friends = list(set(tuple(sorted(eval(pair))) for pair in open('Twitter-dataset/data/mutuals_friends.txt').read().splitlines()))

**Mapper:**

In [116]:
# def map_friends(mutual_friends):
#     return [mutual_friends, [list(data.loc[mutual_friends[0], 'Following'].values), list(data.loc[mutual_friends[1], 'Following'].values)]]

def map_friends(friends):
    friends_of_friends = {friends[0]: [], friends[1]:[]}
    for pair in mutual_friends:
        if friends[0] in pair:
            friends_of_friends[friends[0]].extend([friend for friend in pair if friend != friends[0]])
        if friends[1] in pair:
            friends_of_friends[friends[1]].extend([friend for friend in pair if friend != friends[1]])
    return friends_of_friends

**Reducer:**

In [117]:
# def reduce_common_friends(friends_u1, friends_u2):
#     return len(set(friends_u1).intersection(set(friends_u2)))
    
def reduce_common_friends(friends_of_friends):
    return list(set(list(friends_of_friends.values())[0]).intersection(set(list(friends_of_friends.values())[1])))
        
    

**Execution**

In [118]:
# def mapreduce_exec(mutual_friends):
#     friends = list(map(map_friends, mutual_friends))
#     friends_of_friends = {mut_friend:0 for mut_friend in mutual_friends}
#     for mut_friends in friends:
#         friends_of_friends[mut_friends[0]] = reduce_common_friends(mut_friends[1][0], mut_friends[1][1])
#     return sorted(friends_of_friends.items(), key=lambda kv: kv[1], reverse=True)[0:10]

def mapreduce_exec(mutual_friends):
    friends_of_friends = list(map(map_friends, mutual_friends))
    common_friends = {tuple(pair.keys()):0 for pair in friends_of_friends}
    for friends in friends_of_friends:
        common_friends[tuple(friends.keys())] = len(reduce_common_friends(friends))        
    return sorted(common_friends.items(), key=lambda kv: kv[1], reverse=True)[0:10]

### b)

**Report the top ten pairs and how many friends they have in common.**

In [120]:
for entry in mapreduce_exec(mutual_friends):
    print('Mutual Friends: {}, # Common Friends: {}'.format(entry[0], entry[1]))

Mutual Friends: (40997, 41039), # Common Friends: 1
Mutual Friends: (40704, 40997), # Common Friends: 1
Mutual Friends: (40704, 41039), # Common Friends: 1
Mutual Friends: (22196, 76473), # Common Friends: 0
Mutual Friends: (33884, 34046), # Common Friends: 0
Mutual Friends: (89222, 89350), # Common Friends: 0
Mutual Friends: (93260, 93427), # Common Friends: 0
Mutual Friends: (33884, 34101), # Common Friends: 0
Mutual Friends: (33099, 62167), # Common Friends: 0
Mutual Friends: (58783, 58875), # Common Friends: 0
