## CSCE 676 :: Data Mining and Analysis :: Texas A&M University :: Fall 2019


# Homework 2

- **100 points [10% of your final grade]**
- **Due Saturday, October 19 by 11:59pm**

**Goals of this homework:** There are five objectives of this homework: 

* Become familiar with Apache Spark and working in a distributed environment in the cloud
* Get hands-on experience designing and running a simple MapReduce data transformation job
* Get hands-on experience using Spark built-in functions; namely, LDA and PageRank
* Design a Pregel algorithm to find tree depth in a network
* Understand and implement Trawling algorithm to find user communities

*Submission instructions:* You should post your notebook to ecampus (look for the homework 2 assignment there). Name your submission **your-uin_hw2.ipynb**, so for example, my submission would be something like **555001234_hw2.ipynb**. Your notebook should be fully executed when you submit ... so run all the cells for us so we can see the output, then submit that. Follow the AWS guide to create a Hadoop/Spark cluster and create an empty Notebook. Copy all the cells in this notebook to the AWS notebook and continue working on your notebook in AWS. When you are done, download your notebook from AWS (navigate to the location on S3 where your notebook is saved and click download) and submit it to ecampus.

## Introduction to the Dataset
We will use a dataset of tweets concerning members of the US congress. The data spans almost a year (from October 3rd, 2018 to September 25th, 2019) covering 577 of the members. Any tweet or retweet posted by the 577 members or directed to them by other Twitter users were collected.

The data is on S3 in a bucket named s3://us-congress-tweets that you can access. There are 277,744,063 tweets. This is a huge dataset so we will not be working directly on this data all the time. Rather we will work on samples or subsets of this data but in some cases, we will ask you to execute your task on the whole dataset.

Below is a summary of all datasets used for this homework:

| Dataset                | Location in S3                                      | Description |
| :---                   | :---                                                | :---
| Congress members       | s3://us-congress-tweets/congress_members.csv        | 577 twitter ids and screen names |
| Raw tweets             | s3://us-congress-tweets/raw/\*.snappy               | the whole json objects of the tweets|
| Sample tweets          | s3://us-congress-tweets/congress-sample-10k.json.gz | 10k sample tweets|
| Trimmed tweets         | s3://us-congress-tweets/trimmed/\*.parquet          | selected fields for all tweets|
| User hashtags          | s3://us-congress-tweets/user_hashtags.csv           | all pairs of <user, hashtag>|
| User replies           | s3://us-congress-tweets/reply_network.csv           | all pairs of <reply_tweet, replied_to_tweet> |
| User mentions           | s3://us-congress-tweets/user_mentions.csv           | all pairs of <src_user_id, src_dest_id, frequency> |

Let's run some exploration below!

In [1]:
# First let's read Twitter ids and screen names of the 577 US congress members

congress_members = spark.read.csv("s3://us-congress-tweets/congress_members.csv", header=True)
congress_members.show()
print("Number of congress members tracked:", congress_members.count())

+------------------+---------------+
|            userid|    screen_name|
+------------------+---------------+
|         776664410|  RepCartwright|
|         240363117|   RepTomMarino|
|837722935095930883| RepScottTaylor|
|        1069124515|     RepLaMalfa|
|818460870573441028|  RepTomGarrett|
|         163570705|     repcleaver|
|          19739126|      GOPLeader|
|          33563161| RepJoseSerrano|
|        2861616083|USRepGaryPalmer|
|        1074518754| SenatorBaldwin|
|         305620929|  Call_Me_Dutch|
|         381152398| RepTerriSewell|
|         834069080| RepDavidRouzer|
|         249787913|  SenatorCarper|
|         188019606|        Clyburn|
|         217543151|SenatorTimScott|
|          39249305| USRepMikeDoyle|
|          33537967|   amyklobuchar|
|         249410485|  SanfordBishop|
|          23124635|    TomColeOK04|
+------------------+---------------+
only showing top 20 rows

('Number of congress members tracked:', 577)


We can use `spark.read.json(...)` without schema to load the tweets into a dataframe but this will be slow for two reasons:
* First, it will make one pass over the data to build a schema of the content, then a second pass to read the content and parse it to the dataframe. 
* It will read all the content of the Tweet JSON objects but we only need few fields for a given task.

Thus we define our own schema something like the following:

In [2]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
twitter_date_format="EEE MMM dd HH:mm:ss ZZZZZ yyyy"

user_schema = StructType([
    StructField('created_at',TimestampType(),True),
    StructField('followers_count',LongType(),True),
    StructField('id',LongType(),True),
    StructField('name',StringType(),True),
    StructField('screen_name',StringType(),True)
])

hashtag_schema = ArrayType(StructType([StructField('text',StringType(),True)]))
user_mentions_schema = ArrayType(StructType([StructField('id',LongType(),True),
                                             StructField('screen_name',StringType(),True)]))
entities_schema = StructType([
    StructField('hashtags',hashtag_schema,True),
    StructField('user_mentions',user_mentions_schema,True)
    ])

retweeted_status_schema =StructType([        
        StructField("id", LongType(), True),
        StructField("in_reply_to_user_id", LongType(), True),
        StructField("in_reply_to_status_id", LongType(), True),
        StructField("created_at", TimestampType(), True),
        StructField("user", user_schema)
    ])

tweet_schema =StructType([
        StructField("text", StringType(), True),
        StructField("id", LongType(), True),
        StructField("in_reply_to_user_id", LongType(), True),
        StructField("in_reply_to_status_id", LongType(), True),
        StructField("created_at", TimestampType(), True),
        StructField("user", user_schema),
        StructField("entities", entities_schema),
        StructField("retweeted_status", retweeted_status_schema)
    ])

Now we are ready to read the tweets with `spark.read.json` passing our own schema as follows:

In [3]:
tweets = spark.read.option("timestampFormat", twitter_date_format)\
                   .json('s3://us-congress-tweets/congress-sample-10k.json.gz', tweet_schema)\
                   .withColumn('user_id',F.col('user.id'))
tweets.printSchema()

root
 |-- text: string (nullable = true)
 |-- id: long (nullable = true)
 |-- in_reply_to_user_id: long (nullable = true)
 |-- in_reply_to_status_id: long (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- user: struct (nullable = true)
 |    |-- created_at: timestamp (nullable = true)
 |    |-- followers_count: long (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- screen_name: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- user_mentions: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- screen_name: string (nullable = true)
 |-- retweeted_status: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- in_reply_to_user_id: long (nul

## (6 points) Part 1a: Exploratory Data Analysis (Small Scale)

How many unique users and original tweets (i.e. not retweets) are there? 

In [4]:
# your code here for unique users
tweets.select(tweets.user.id).distinct().count()

9735

In [5]:
# your code here for original tweets
tweets.filter(tweets.retweeted_status.isNull()).count()

3327

Who are the ten most mentioned users in the sample?

In [6]:
# code and output here
tweets.select(F.col("user.screen_name"),
              F.explode(tweets.entities.user_mentions.screen_name).alias("mention"))\
      .groupby("mention").count().sort(F.desc("count")).show()

+---------------+-----+
|        mention|count|
+---------------+-----+
|     SenSchumer|  776|
|realDonaldTrump|  751|
|  RepAdamSchiff|  739|
|     marcorubio|  695|
|  SpeakerPelosi|  505|
|    NancyPelosi|  368|
|       RandPaul|  272|
|  ChrisMurphyCT|  223|
|  SenGillibrand|  220|
|   RepMattGaetz|  220|
|     CoryBooker|  214|
|  ChuckGrassley|  206|
|      JeffFlake|  202|
|   SteveScalise|  179|
|   amyklobuchar|  176|
|      GOPLeader|  164|
| RepJerryNadler|  162|
|          POTUS|  159|
|SenKamalaHarris|  152|
| SenJeffMerkley|  141|
+---------------+-----+
only showing top 20 rows



What are the top hashtags used?

In [7]:
# code and output here
tweets.select(F.explode(tweets.entities.hashtags.text).alias("hashtag"))\
      .groupby("hashtag").count().sort(F.desc("count"))\
      .show()

+-----------------+-----+
|          hashtag|count|
+-----------------+-----+
|        Venezuela|  102|
|    TrumpShutdown|   42|
|     MaduroRegime|   29|
|           Maduro|   20|
|             MAGA|   20|
|      NancyPelosi|   19|
|    MuellerReport|   17|
|     ForThePeople|   14|
|      TrumpResign|   14|
|     BuildTheWall|   14|
|        Kavanaugh|   14|
|     GreenNewDeal|   13|
| MyHouseMyAmerica|   12|
|        transport|   12|
|             Cuba|   10|
|        Democrats|   10|
|          Florida|   10|
|    BrowardCounty|    9|
|             EEUU|    8|
|MaduroCrimeFamily|    8|
+-----------------+-----+
only showing top 20 rows



## (4 points) Part 1b: Exploratory Data Analysis (Large Scale)
Repeat the above queries but now against the whole dataset defined in the dataframe below. 

In [8]:
trimmed_files = [x[0] for x in spark.read.csv("s3://us-congress-tweets/trimmed/files.txt").collect()]
tweets_all = spark.read.parquet(*trimmed_files)
tweets_all.printSchema()

root
 |-- text: string (nullable = true)
 |-- id: long (nullable = true)
 |-- in_reply_to_user_id: long (nullable = true)
 |-- in_reply_to_status_id: long (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- user: struct (nullable = true)
 |    |-- created_at: timestamp (nullable = true)
 |    |-- followers_count: long (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- screen_name: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- user_mentions: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- screen_name: string (nullable = true)
 |-- retweeted_status: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- in_reply_to_user_id: long (nul

In [9]:
# your code here for unique users
tweets_all.select(tweets_all.user.id).distinct().count()

10749403

In [10]:
# your code here for original tweets
tweets_all.filter(tweets_all.retweeted_status.isNull()).count()

96887299

In [11]:
# Top mentioned users code and output here
tweets_all.select(F.col("user.screen_name"),
                  F.explode(tweets_all.entities.user_mentions.screen_name).alias("mention"))\
      .groupby("mention").count().sort(F.desc("count")).show()

+---------------+--------+
|        mention|   count|
+---------------+--------+
|realDonaldTrump|22849070|
|LindseyGrahamSC|13420958|
|   senatemajldr|12698227|
|  RepAdamSchiff|12604975|
|     SenSchumer|11879180|
|  SpeakerPelosi|11104065|
|     marcorubio| 9693611|
|     Jim_Jordan| 8450458|
|     SenSanders| 6565118|
|    RepSwalwell| 5890806|
| RepMarkMeadows| 4602132|
|          POTUS| 4534031|
|    RepCummings| 4210449|
| RepJerryNadler| 4195037|
|     CoryBooker| 4122082|
|      GOPLeader| 4052729|
|      SenWarren| 4041194|
|   RepMattGaetz| 3865060|
|       RandPaul| 3734433|
|SenKamalaHarris| 3640507|
+---------------+--------+
only showing top 20 rows



In [12]:
# Top hashtags code and output here
tweets_all.select(F.explode(tweets_all.entities.hashtags.text).alias("hashtag"))\
      .groupby("hashtag").count().sort(F.desc("count"))\
      .show()

+-------------+-------+
|      hashtag|  count|
+-------------+-------+
|    Venezuela|1206418|
|  MoscowMitch|1105708|
|TrumpShutdown| 632069|
|         MAGA| 469507|
|MuellerReport| 405471|
|  NancyPelosi| 359063|
| MaduroRegime| 349757|
|        Trump| 312651|
| BuildTheWall| 311186|
| GreenNewDeal| 272289|
|     BREAKING| 254382|
|    Kavanaugh| 224196|
|    Democrats| 211403|
| ForThePeople| 207904|
|       Maduro| 200107|
|         SOTU| 177483|
|      Mueller| 170592|
|    transport| 160220|
| DoWhatWeSaid| 157293|
|          HR1| 153986|
+-------------+-------+
only showing top 20 rows



## (10 points) Part 2: Textual Analysis (LDA)
Using the LDA algorithm provided by the Spark Machine Learning (ML) library, find out the ten most important topics. Use `s3://us-congress-tweets/trimmed/*.parquet` for this task (you can reuse `tweets_all` dataframe from Part1b). 

You may want to work on a small sample first but report your results on the whole dataset.

Hint: for better results aggregate all tweets for a user into a single document

In [13]:
# your code here
# Preprocessing - split words, filter out stopwords, group by user ids and aggregate their tweets

# Because processing the whole dataset gives me an error that I can't solve
# even under 8 instances, I sampled 70% of the data
data = tweets_all.sample(False, 0.8)
# data = tweets_all
user_tweet_words = data.select("user.id", F.split(data.text, "\s+").alias("text"))

In [14]:
# StopWordsRemover
from pyspark.ml.feature import StopWordsRemover

stopWordsRemover = StopWordsRemover(inputCol="text", outputCol="filteredText")
user_tweet_words = stopWordsRemover.transform(user_tweet_words)

In [15]:
user_tweet_words = user_tweet_words.groupby("id")\
                                   .agg(F.flatten(F.collect_list("filteredText")).alias("aggregated_tweets"))

In [16]:
from pyspark.ml.feature import CountVectorizer

# The maximum and minimum occurrence can be further tuned to get better representative topics
cv = CountVectorizer(inputCol="aggregated_tweets", outputCol="features", maxDF=50, minDF=5)
cvModel = cv.fit(user_tweet_words)
user_tweet_words = cvModel.transform(user_tweet_words)
user_tweet_words.show()

+------+--------------------+--------------------+
|    id|   aggregated_tweets|            features|
+------+--------------------+--------------------+
|  3764|[RT, @AOC:, encou...|      (262144,[],[])|
|  5556|[RT, @kgahlot:, E...|      (262144,[],[])|
| 11938|[@kevburkeie, @pk...|(262144,[239728],...|
| 13518|[RT, @jnewland:, ...|      (262144,[],[])|
| 26543|[Even, young, man...|      (262144,[],[])|
| 35253|[RT, @IsItABikeLa...|(262144,[169427,1...|
| 48763|[@SenSchumer, Dea...|      (262144,[],[])|
| 60033|[RT, @FictiveCame...|      (262144,[],[])|
| 68463|[RT, @RepBrendanB...|      (262144,[],[])|
|193283|[@nostridamusontw...|      (262144,[],[])|
|601963|[@danforhan, GM, ...|(262144,[202644],...|
|660523|[@RepThomasMassie...|      (262144,[],[])|
|734203|[RT, @FlipScreen:...|      (262144,[],[])|
|747203|[RT, @chrisinsili...|      (262144,[],[])|
|763684|[said, say, again...|      (262144,[],[])|
|781066|[RT, @SenGaryPete...|      (262144,[],[])|
|781154|[Listen:, recycle...|  

In [17]:
from pyspark.ml.clustering import LDA

lda = LDA(k=10, optimizer='em')
ldaModel = lda.fit(user_tweet_words)
topics = ldaModel.describeTopics(10)
topics.show()

+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[2, 3, 4, 5, 6, 8...|[0.00870556566196...|
|    1|[0, 1, 12, 13, 30...|[0.01609675647858...|
|    2|[64, 69, 81, 105,...|[0.00132209943962...|
|    3|[10, 24, 28, 29, ...|[0.00395606202193...|
|    4|[50, 59, 62, 72, ...|[0.00125446835767...|
|    5|[42, 66, 157, 160...|[0.00143052106111...|
|    6|[22, 27, 35, 44, ...|[0.00294871844130...|
|    7|[7, 32, 58, 60, 7...|[0.00504062626150...|
|    8|[43, 49, 75, 78, ...|[0.00151722481547...|
|    9|[14, 23, 41, 63, ...|[0.00290916015885...|
+-----+--------------------+--------------------+



For each topic, print out 10 words to describe it

In [18]:
# your code here
vocab = cvModel.vocabulary

topic_rows = topics.collect()
for topic in topic_rows:
    terms = []
    for termIndice in topic[1]:
        terms.append(vocab[termIndice].encode('ascii','ignore'))
    print("Topic" + str(topic[0]) + " : " + str(terms))

Topic0 : ['@Rutherford_Inst', '@WEXWatchdog', '@CCHR', '@johnalexwood', '@NMPoliticsnet', '@haussamen', '@soljourno', '@nmdoh', 'Hiring:', 'GOPArkansas']
Topic1 : ['@AngelCIraq214', '@news_store_com', '@Jstaskin:', '@lowkell', 'HARIHAR', '@Scooterocket', '@screamguitarman:', '"Ratcliffe', '@franceonu', '@laborers435']
Topic2 : ['@BreakingNews', '@SonOfJmkWalkow:', '@GotTeam:', 'Guidestones', 'Radio', 'Tadler', '#comobilit', '@Cr8rBoi', "Paddlin'", '@andrewscheer']
Topic3 : ['2019-02-20,', '2018-12-20,', 'load:', '#manandvan', '@Padres', '@MLBStats', '@LMErdosSCR_APS', '@Republicist1:', '"Hurd', '(R-San']
Topic4 : ['"Kenny', 'Gangstalking', '#TweetTheMuellerReport', "(R-Coppell)'s'", '@Teamsters', '2018-11-15,', '2018-12-13,', '#OFFICE', '2018-12-22,', '#LOCAL']
Topic5 : ['@DomainLandlords:', '#GOPChairwoman', '@ShayEvaSatchel', '@bent_alsaudia10', '@eqibeat:', '@BellaLettie3', '@sandrasmithfox', '@cherrynchester', '#KamalaForThePeople', '@jphoganorg:']
Topic6 : ['Volg', 'online!', '@HR

## (10 points) Part 3a: MapRedce
In this task, design a MapReduce program in python that reads all the original tweets (no retweets) in the sample tweets (`congress-sample-10k.json.gz`) and if a tweet is a reply to another tweet then output a record of the form <src_id, src_user, dst_id, dst_user>.

Create a small cluster (2 or 3 nodes) as per the AWS Guide and then ssh to your cluster and use Hadoop streaming to execute your mapreduce program.

Note: the Hadoop streaming jar file can be found at `/usr/lib/hadoop-mapreduce/hadoop-streaming.jar`

In [19]:
# your mapper function

#!/usr/bin/env python
import sys
import json

def get_tweet(line):
    try:
        tweet = json.loads(line.strip())
    except:
        tweet = {}

    return tweet

for line in sys.stdin:
    tweet = get_tweet(line)

    # original tweets
    if "retweeted_status" not in tweet:
        # reply tweets
        if "in_reply_to_status_id" in tweet and tweet["in_reply_to_status_id"] != None:
            print("<%s, %s, %s, %s>" % (\
                tweet["id"],\
                tweet["user"]["id"],\
                tweet["in_reply_to_status_id"],\
                tweet["in_reply_to_user_id"]\
            ))

In [20]:
# your reducer function
mapreduce.job.reduces=0 (0 reducer, map only)

In [21]:
# your Hadoop job submission command (copy/paste your command from the terminal)
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar
-input s3://us-congress-tweets/congress-sample-10k.json.gz
-output mapreduce/output2
-mapper mapper.py
-reducer NONE
-file mapper.py

In [None]:
Job job_1572548302598_0005 completed successfully
19/10/31 23:42:12 INFO mapreduce.Job: Counters: 35
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=172914
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=103
		HDFS: Number of bytes written=163816
		HDFS: Number of read operations=4
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
		S3: Number of bytes read=12616508
		S3: Number of bytes written=0
		S3: Number of read operations=0
		S3: Number of large read operations=0
		S3: Number of write operations=0
	Job Counters 
		Launched map tasks=1
		Data-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=577632
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=6017
		Total vcore-milliseconds taken by all map tasks=6017
		Total megabyte-milliseconds taken by all map tasks=18484224
	Map-Reduce Framework
		Map input records=10000
		Map output records=2311
		Input split bytes=103
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=105
		CPU time spent (ms)=5650
		Physical memory (bytes) snapshot=526106624
		Virtual memory (bytes) snapshot=4663517184
		Total committed heap usage (bytes)=449839104
	File Input Format Counters 
		Bytes Read=12616508
	File Output Format Counters 
		Bytes Written=163816
19/10/31 23:42:12 INFO streaming.StreamJob: Output directory: mapreduce/output2

How many reply relationships did you get?

In [1]:
# code to read job output and count
output = spark.read.csv("mapreduce/output2")
output.count()

2311

## (5 points) Part 3b: Going Large-Scale with MapReduce

Rerun the same MapReduce job above but on the whole dataset (`s3://us-congress-tweets/raw/*.snappy`).
All the files under `s3://us-congress-tweets/raw` can be read from the following file:

`s3://us-congress-tweets/raw/files.txt`

Use shell scripting to parse this file and prepare the input to your MapReduce job as  comma seperated string of all the files. (e.g. your input should be like this `s3://us-congress-tweets/raw/part-00000.snappy,s3://us-congress-tweets/raw/part-00001.snappy,s3://us-congress-tweets/raw/part-00002.snappy,...`)

Inspecting the job logs, how many files did the job operate on? how many input splits were there?

In [2]:
# Your answer here
mapreduce_files = spark.read.csv("s3://us-congress-tweets/raw/files.txt")
mapreduce_files.show()

+--------------------+
|                 _c0|
+--------------------+
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
|s3://us-congress-...|
+--------------------+
only showing top 20 rows



In [None]:
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar
-mapper mapper.py
-file mapper.py
-reducer NONE
$(./prep_input.sh)
-output mapreduce/output

In [23]:
# prep_input.sh
#!/bin/sh

# ran "aws s3 cp s3://us-congress-tweets/raw/files.txt ." to get file list to local
files='./files.txt'
# inputs=''
# file_num=0

IFS=$'\n'
while read file; do
#    if (( $file_num != 0 ))
#    then
#        inputs+=','
#    fi
#    inputs+=$file
#    ((file_num+=1))
    echo '-input' $file
done < $files

# echo $inputs

In [None]:
# make inputs comma-separated
#!/bin/sh

files='./files.txt'
inputs=''
file_num=0

while read file; do
   if (( $file_num != 0 ))
   then
       inputs+=','
   fi
   inputs+=$file
   ((file_num+=1))
done < $files

echo $inputs

In [24]:
# job logs

How many reply relationships did you get?

In [23]:
# Number of reply records
output = spark.read.csv("mapreduce/output")
output.count()

85493943


## (30 points) Part 4: Graph Analysis
In this task, we would like to compute the longest path in *tweet reply* graphs and then perform some statistical calculations on the result. We will use Pregel implementation from GraphFrames for this task. Ignore paths that are longer than 20.

First, construct your tweet reply network using tweet-reply records in this file `s3://us-congress-tweets/reply_network.csv`.
From this file, use src_id and dst_id. The dst_id is the id of the tweet being replied to and the src_id is the id of the replying tweet.

In [5]:
from graphframes import *
from graphframes.lib import Pregel
sc.setCheckpointDir("hdfs:///tmp/graphframes_checkpoint") # this is needed for any GraphFrames operation

In [6]:
# your network construction code here
data = spark.read.csv("s3://us-congress-tweets/reply_network.csv", header=True)
data.show()

+-------------------+-------------------+-------------------+------------------+
|             src_id|           src_user|             dst_id|          dst_user|
+-------------------+-------------------+-------------------+------------------+
|1047536930651611137| 787776811371487233|1047536497052844032|787776811371487233|
|1047537000385929216|1018859733736779777|1047529220065447936|         970207298|
|1047537019386093568|          461479285|1047490615829827585|          19821744|
|1047537318461026304|         2681991511|1047504406646808576|          39344374|
|1047537380981465088| 831305062785937408|1047517885260750853|           1917731|
|1047537448295813120| 924843693047037952|1047465256023445504|          25073877|
|1047537571146944512|         4268534475|1047504990011572229|           7301572|
|1047537751049064448|           20344298|1047534440757579777|980676000152514560|
|1047538150959136772|          532438672|1047529230203047937|          19417492|
|1047538603008696322|1047529

In [9]:
edges = data.select(F.col("src_id").alias("src"), F.col("dst_id").alias("dst")).cache()
vertices = edges.select(F.col("src").alias("id")).union(edges.select("dst")).distinct()

edges.show()
vertices.show()
print("# of edges:", str(edges.count()))
print("# of vertices:", str(vertices.count()))

+-------------------+-------------------+
|                src|                dst|
+-------------------+-------------------+
|1047536930651611137|1047536497052844032|
|1047537000385929216|1047529220065447936|
|1047537019386093568|1047490615829827585|
|1047537318461026304|1047504406646808576|
|1047537380981465088|1047517885260750853|
|1047537448295813120|1047465256023445504|
|1047537571146944512|1047504990011572229|
|1047537751049064448|1047534440757579777|
|1047538150959136772|1047529230203047937|
|1047538603008696322|1047132430098927617|
|1047538630573678592|1047508807675531264|
|1047538803005636608|1047537257727504384|
|1047539317055377410|1047538522289258499|
|1047539502946967553|1047519678749442048|
|1047539625361887234|1047516105567363073|
|1047539874662928385|1047531068738293760|
|1047540275818651649|1047503978534264834|
|1047540523312078849|1046876489113907201|
|1047540591695777792|1047539231181213696|
|1047540640588996609|1047540255409229825|
+-------------------+-------------

In [10]:
graph = GraphFrame(vertices, edges)

What are the top replied to tweets? (show 20)

In [11]:
# your code here
graph.inDegrees.sort(F.desc("inDegree")).show()

+-------------------+--------+
|                 id|inDegree|
+-------------------+--------+
|1157787985041088513|   94351|
|1048314564826292227|   76396|
|1111289977143545856|   68172|
|1155949756792725510|   65241|
|1137060666223878144|   57764|
|1062461047892787204|   53767|
|1158036816089497601|   50059|
|1144730911889428480|   45905|
|1155949605147648006|   44810|
|1098312693436596226|   41836|
|1150408691713265665|   41825|
|1129831615952236546|   41556|
|1150859069084905472|   39020|
|1144078421670150144|   38687|
|1155132215208161281|   38572|
|1088141172638400512|   37102|
|1155469517092470784|   36961|
|1168938037071482881|   36728|
|1154161356171599877|   36552|
|1131740851909083137|   36386|
+-------------------+--------+
only showing top 20 rows



How many graphs in the reply network? (Hint: use connectedComponents function)

In [14]:
# your code here
connectedComponents = graph.connectedComponents()
connectedComponents.show()

+-------------------+---------+
|                 id|component|
+-------------------+---------+
|1018902211320041474|       26|
|1019644844275388417|       29|
|1047565848481681408|      474|
|1047636184011169792|      917|
|1047777710309892096|     1226|
|1047790518422396928|     1697|
|1047818674684485633|      846|
|1047844212517957632|     1913|
|1047856196042932225|     2040|
|1047872894808854528|     2214|
|1047874785362042881|      517|
|1047888044609753088|     2449|
|1047892115215343616|     2135|
|1047893334587584512|     2436|
|1047926063446339584|     1650|
|1047935603764023298|     3091|
|1047963278176149504|     2460|
|1047983667933564928|     3764|
|1048062759928250368|     2826|
|1048154635733803008|     3325|
+-------------------+---------+
only showing top 20 rows



In [39]:
connectedComponents.select("component").distinct().count()

4865213

Now, design and execute a Pregel program that will calculate the longest paths for all reply graphs in the network. Explain your design.

In [12]:
# your pregel code here
# Since we want to ignore paths longer than 20, we limit # of iterations to 20
# We set the initial path length of each vertex to 0
# and increase path length by 1 each time a vertex receives a messeage

longestPath = graph.pregel\
             .setMaxIter(20)\
             .withVertexColumn("longestPath",
                               F.lit(0),
                               F.coalesce(Pregel.msg(), F.col("longestPath"))
                              )\
             .sendMsgToDst(Pregel.src("longestPath") + 1)\
             .aggMsgs(F.max(Pregel.msg())).run()

In [13]:
longestPath.show()  

+-------------------+-----------+
|                 id|longestPath|
+-------------------+-----------+
|1000097150066348033|          1|
|1000610279376457728|          1|
|1004461558666125314|          1|
|1004552938445066240|          1|
|1005074779672629249|          3|
|1005096402660274176|          1|
|1006976555032219649|          1|
|1007196442971267074|          1|
|1007385497633779712|          1|
|1007520380263780352|          1|
|1007653218682318849|          6|
|1007755524505587712|          1|
|1009465122618839042|          1|
|1010679848828669952|          1|
|1011750104666001408|          1|
|1011825432289796096|          1|
|1012416740448448513|          1|
|1014581448354000903|          1|
|1015396907689103360|          1|
|1015557994271117314|          1|
+-------------------+-----------+
only showing top 20 rows



What is the average longest path length for all reply graphs in the network?

In [15]:
# your code here
longestPath = longestPath.join(connectedComponents, ["id"])
longestPath.show()

+-------------------+-----------+---------+
|                 id|longestPath|component|
+-------------------+-----------+---------+
|1000097150066348033|          1|        0|
|1000610279376457728|          1|        1|
|1004461558666125314|          1|        2|
|1004552938445066240|          1|        3|
|1005074779672629249|          3|        4|
|1005096402660274176|          1|        5|
|1006976555032219649|          1|        6|
|1007196442971267074|          1|        7|
|1007385497633779712|          1|        8|
|1007520380263780352|          1|        9|
|1007653218682318849|          6|       10|
|1007755524505587712|          1|       11|
|1009465122618839042|          1|       12|
|1010679848828669952|          1|       13|
|1011750104666001408|          1|       14|
|1011825432289796096|          1|       15|
|1012416740448448513|          1|       16|
|1014581448354000903|          1|       17|
|1015396907689103360|          1|       18|
|1015557994271117314|          1

In [22]:
componentLongestPath = longestPath.groupby("component").max("longestPath")
componentLongestPath.show()
componentLongestPath.sort(F.desc("max(longestPath)")).show()

+-----------+----------------+
|  component|max(longestPath)|
+-----------+----------------+
|51539608871|               2|
|51539609457|               1|
|51539609521|               2|
|51539609867|               1|
| 8589936972|              16|
|51539610594|               4|
|      62612|               8|
|51539611367|               1|
|       3764|               8|
|51539612216|               2|
| 8589939696|               1|
|51539612235|               1|
|51539612488|               1|
|51539612954|               4|
|34359744093|              14|
|       7747|              16|
|51539613657|               1|
|51539615099|               1|
| 8589942632|               3|
|      15437|              20|
+-----------+----------------+
only showing top 20 rows

+----------+----------------+
| component|max(longestPath)|
+----------+----------------+
|8590015831|              20|
|    149406|              20|
|     81798|              20|
|8589937624|              20|
|     81875|        

In [21]:
componentLongestPath.agg({"max(longestPath)" : "avg"}).show()

+---------------------+
|avg(max(longestPath))|
+---------------------+
|   1.5533420633382342|
+---------------------+



## (30 points) Part 5: Community Detection
User-hashtag relations have been extracted and saved in the file `s3://us-congress-tweets/user_hashtags.csv`. If a user uses a hashtag there will be a record with the userid and the hashtag.

Use the Trawling algorithm discussed in class to find potential user communities in the dataset. (Hint: use FPGrowth in the Spark ML package). Explore different values for the support parameter.

In [2]:
# your code here. Explain all steps.
data = spark.read.csv("s3://us-congress-tweets/user_hashtags.csv")
data = data.select(F.col("_c0").alias("id"), F.col("_c1").alias("hashtag"))\
           .groupby("id").agg(F.collect_list("hashtag").alias("hashtags"))
data.show()

+-------------------+--------------------+
|                 id|            hashtags|
+-------------------+--------------------+
|1000056825981603841| [believeallscience]|
|         1000074318|             [Doral]|
|1000341730582061056|[GIVEAWAY, Summer...|
|         1000356566|            [clowns]|
|1000365529406853121|            [Careem]|
|1000460805366796288|[Management, Lead...|
|1000463993499156481|[MAGA, FreeSpeech...|
|1000480351905632258|[NancyPelosi, TAV...|
|          100048155|    [HungertoHealth]|
|1000785956298018817|[crowdstrike, BRE...|
|1000870665577152513|[StarWarsGalaxysE...|
|1001087313718296578|  [ChowkidaroKaScam]|
|1001099799633022976|[13NovBalochMarty...|
|          100136328|[BlacksforTrump20...|
|1001619711744987142|      [BTSatWembley]|
|1001699058501775362|[ShamInvestigatio...|
|1001838583425175552|  [AyudaHumanitaria]|
|1001846696958185472|[ebike, CleanAir,...|
|1001877127115915264|[MoscowMitch, Mos...|
|1001894911283691520|         [Koya, BTS]|
+----------

In [3]:
from pyspark.ml.fpm import FPGrowth

fpGrowth = FPGrowth(minSupport=0.05, minConfidence=0.05, itemsCol='hashtags')
fpGrowthModel = fpGrowth.fit(data)

In [4]:
fpGrowthModel.freqItemsets.show()
fpGrowthModel.associationRules.show()

+---------------+------+
|          items|  freq|
+---------------+------+
|    [Venezuela]|228140|
|  [MoscowMitch]|194200|
|[TrumpShutdown]|169301|
|[MuellerReport]|164781|
+---------------+------+

+----------+----------+----------+----+
|antecedent|consequent|confidence|lift|
+----------+----------+----------+----+
+----------+----------+----------+----+



In [5]:
fpGrowth = FPGrowth(minSupport=0.02, minConfidence=0.02, itemsCol='hashtags')
fpGrowthModel = fpGrowth.fit(data)
fpGrowthModel.freqItemsets.show()
fpGrowthModel.associationRules.show()

+--------------------+------+
|               items|  freq|
+--------------------+------+
|         [Venezuela]|228140|
|       [MoscowMitch]|194200|
|     [TrumpShutdown]|169301|
|[TrumpShutdown, M...| 68271|
|     [MuellerReport]|164781|
|              [MAGA]|153253|
|       [NancyPelosi]|144953|
|      [GreenNewDeal]|137423|
|         [Kavanaugh]|124906|
|          [BREAKING]|112978|
|             [Trump]|106536|
|              [SOTU]|100974|
|           [Mueller]| 90451|
|               [HR1]| 88041|
|      [ForThePeople]| 87395|
|         [Democrats]| 83774|
|      [MaduroRegime]| 80650|
|[MaduroRegime, Ve...| 68564|
|      [BuildTheWall]| 77876|
|            [Maduro]| 69136|
+--------------------+------+
only showing top 20 rows

+---------------+---------------+-------------------+------------------+
|     antecedent|     consequent|         confidence|              lift|
+---------------+---------------+-------------------+------------------+
| [MaduroRegime]|    [Venezuela]| 0

In [6]:
fpGrowthModel.transform(data).filter(F.size(F.col("prediction")) > 0).show()

+-------------------+--------------------+--------------------+
|                 id|            hashtags|          prediction|
+-------------------+--------------------+--------------------+
|1001877127115915264|[MoscowMich, Leni...|     [TrumpShutdown]|
|1002410259447799808|[DNC, RecallFeins...|      [MaduroRegime]|
|          100311168|[orteguismo, NICA...|      [MaduroRegime]|
|1003808157775245312|[TrumpShutdown, M...|       [MoscowMitch]|
|1004865478274244610|[FirstResponders,...|     [TrumpShutdown]|
|         1005065702|         [Venezuela]|      [MaduroRegime]|
|         1005243704|[MerrickGarland, ...|       [MoscowMitch]|
|1005475619566706688|         [Venezuela]|      [MaduroRegime]|
|1006736266401144832|[Urgente, Caracas...|      [MaduroRegime]|
|1007154784430968834|[NoGlobalism, Men...|     [TrumpShutdown]|
|1007386606217637888|[CalDay, NoIranWa...|      [MaduroRegime]|
|         1007629891|[TreasonCaucus, J...|[MaduroRegime, Mo...|
|         1007762215|       [MoscowMitch

List two user communities you think are interesting. Explain why they are reasonable communities.

You can use https://twitter.com/intent/user?user_id=? to find out more info about the users

In [16]:
import pyspark.sql.types as t

def get_community(community_hashtags):
    def find_all(user_hashtags):
        return community_hashtags.issubset(set(user_hashtags))
    
    filter_udf = F.udf(find_all, t.BooleanType())
    community = data.filter(filter_udf("hashtags"))
    
    return community

Chose the two communities because they frequently appeared in the above association rules

In [18]:
# community 1
hashtags1 = set(["MoscowMitch", "TrumpShutdown"])

community1 = get_community(hashtags1)
community1.show()

+-------------------+--------------------+
|                 id|            hashtags|
+-------------------+--------------------+
|1002752931341299712|[JETMOMforCongres...|
|1006046436272713728|[SundayMotivation...|
|1007386606217637888|[Trump, Impeachme...|
|         1019032993|[HR8, InherentCon...|
|          102259828|[ImpeachTrumpNowA...|
|         1028571146|[WorldCup2019, me...|
|          102871670|[KochBrothers, Jo...|
|1028949579075600384|[SpeakerPelosi, B...|
|          103123025|[HR7, opioids, AG...|
|1033154968465928192|[MoscowMitchMcTra...|
|1035573557546426368|[TrumpShutdown, T...|
|1037646284210008064|[RuleOfLaw, 19thA...|
|1039926455461916673|[ReleaseTheReport...|
|1080427519126528005|[EricGarner, Prot...|
|1081583969194442752|[EndCorruptionNow...|
|1082472422853496832|[BarrHearing, imp...|
|1087237626426359811|[NationalEmergenc...|
|         1107755834|[DianneFeinstein,...|
|          113182207|[JerryNadler, spe...|
|         1151933252|[FreeYoel, FreeMu...|
+----------

In [19]:
# community 2
hashtags2 = set(["MaduroRegime", "Venezuela"])

community2 = get_community(hashtags2)
community2.show()

+-------------------+----------------------+
|                 id|              hashtags|
+-------------------+----------------------+
|1007154784430968834|  [BarrHearings, Ju...|
|1014566827907788800|  [DíaDeLaJuventud,...|
|1017582915377561606|  [Venezuela, terri...|
|1018743778868383744|  [EnVivo, Rusia, R...|
|1024271180826660864|  [Venezuela, Green...|
|1032693985301721090|  [Russia, VENEZUEL...|
|          103607533|  [Venezuela, Colom...|
|1039517587569340416|  [caguaripanolibre...|
|1041801626867118080|  [FNS, Venezuelan,...|
|          104790247|  [23F, Maduro, SEB...|
|          105262982|  [Venezuela, Madur...|
|1054504408787177472|  [Venezuela, Madur...|
|1055358147576127489|[TrishRegan, 党媒, ...|
|1057220887022366720|  [Venezuela, 23Feb...|
|         1061852376|  [URGENTE, HISTÓRI...|
|         1061869964|  [Retador, 23F, AH...|
|1067089074006237185|  [MaduroCrimeFamil...|
|         1068787399|  [ConfirmKavanaugh...|
|         1071723313|  [UltimaHora, Vene...|
|10749651138

What value for support did you choose and why?

##### Answer here
I tried 0.05 and 0.02, and as the results showed, there was not enough interesting information provided by a support of 0.05, which is why I then chose 0.02 as a better support value.

## (10 points) Part 6: Personalized PageRank
Assume you are given a task to recommend Twitter users for the speaker of the House to engage with.

Construct a user-mentions network using relations in `s3://us-congress-tweets/user_mentions.csv`

Run Personalized PageRank with source (id=15764644) and find out top accounts to recommend.

In [44]:
# your network construction code here
edges = spark.read.csv("s3://us-congress-tweets/user_mentions.csv", header=True)
vertices = spark.read.csv("s3://us-congress-tweets/congress_members.csv", header=True)
vertices = vertices.selectExpr("userid as id", "screen_name as name")

edges.show()
vertices.show()
print("# of edges:", str(edges.count()))
print("# of vertices:", str(vertices.count()))

graph = GraphFrame(vertices, edges)

+------------------+------------------+-----+
|               src|               dst|count|
+------------------+------------------+-----+
|917194889275699201|        1249982359|    1|
|917570582555779072|         251918778|    1|
|         633674091|         432895323|   48|
|913222391416934402|         432895323|  261|
|         217574712|          47203904|    1|
|931278145679847424|         320757267|    1|
|897332217646522368|          92186819|    8|
|        2308265716|          18061669|    4|
|822826488500088832|        1249982359|  132|
|728645535566008320|958064770019741696|    1|
|827323185058033665|          14247236|   11|
|          36566383|826629809954553856|    2|
|913125313172918272|         432895323|   90|
|860643294367318016|         432895323|   52|
|866790059802079234|         432895323|  148|
|888055635828502530|          16056306|    2|
|        3141820397|         112006107|    1|
|836681162638532608|         262756641|    1|
|         157952243|         10239

In [46]:
# your Personalized PageRank code here
graph = GraphFrame(graph.outDegrees, edges)
pageranks = graph.pageRank(resetProbability=0.15, maxIter=10, sourceId="15764644")
pageranks.vertices.sort(F.desc("pagerank")).show()

+------------------+---------+--------------------+
|                id|outDegree|            pagerank|
+------------------+---------+--------------------+
|          15764644|      342| 0.16019027695661536|
|          25073877|      117|0.005388416407924...|
|        1249982359|       58|0.004304160304646484|
|          15808765|      691|0.003816526962310861|
|822215679726100480|       37|0.003261404382909...|
|          17494010|      270|0.003066113870118036|
|          29450962|       29|0.002780351747696328|
|         787373558|      148|0.002642578668230067|
|          72198806|      389|0.002578709615593...|
|          29501253|       31|0.002499457112435968|
|          15745368|      432|0.002462101382807848|
|          10615232|        6|0.002417871994118461|
|         970207298|      263|0.002337188857503...|
|         432895323|      321|0.002314610927849...|
|          33537967|      458|0.002236878744709...|
|         476256944|      111|0.002183393909708038|
|        188

In [None]:
# Top 10 accounts to recommend 
# You can use https://twitter.com/intent/user?user_id=? to find out more info about the users
Nancy Pelosi @SpeakerPelosi 15764644
Donald J. @realDonaldTrump 25073877
Leader McConnell @senatemajldr 1249982359
Gory BOOker 👻 @CoryBooker 15808765
President Trump @POTUS 822215679726100480
Chuck Schumer @SenSchumer 17494010
John Lewis @repjohnlewis 29450962
Elijah E. Cummings @RepCummings 787373558
Kirsten Gillibrand @SenGillibrand 72198806
Adam Schiff @RepAdamSchiff 29501253

# Troubleshooting Tips

* If you get "spark not available" error, this most likely means the Kernel is python and not PySpark. Just change the Kernel to PySpark and it should work.


* If your notebook seems stuck (may happen if you force stop a cell), you may need to ssh to your master node and kill the spark application associated with the notebook     
    Use `yarn application -list` to find the application id and then `yarn application -kill app-id` to kill it. After that restart your notebook from the browser.


* If you like, you may also ssh to the master node and run `pyspark` and execute your code directly in the shell.

* If you face difficulties accessing the pages for the jobs for example to see logs and so on then you can open all needed ports when you create the cluster. (e.g. 8088)

* If you want to see logs for a MapReduce job from the terminal use the following command:

    `yarn logs -applicationId <application_id>`


* To kill a MapReduce job use:

    `yarn  application -kill <application_id>`