# Homework 3 Instructions
The goal for this homework is for you to interact with python libraries that facilitate data manipulation and analysis. This homework closely follows the 2023/06/05 [pyspark tutorial](https://github.com/jmpark0808/pyspark/blob/hmw3/pyspark.ipynb).

You will be analyzing a 500,010 tweet dataset (10 of which you will make yourself in **Part 1**). You'll be asked to load the data with pyspark, answer questions about the dataset and then finally visualizing connections between your tweets and mentions based on a hashtag.

## Submission Setup
1. Rename this file as "\<student_id\>_\<username\>_H3"
2. Preview this notebook as an HTML file, i.e., File -> Print Preview
3. `Cmd + P` to get the printing prompt but save the file as **PDF**
4. Submit this renamed file to CrowdMark
    - ***Make sure that the submission file includes the cell outputs***


## Tips
- Refer heavily to the 2023/06/05 tutorial code since this homework is a slight derivation of it
- Refer to documentation for useful methods (e.g., [pyspark.sql.DataFrame](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.html)) and we have provided some useful links in this notebook
- We are evaluating the cell outputs
- Ask questions in piazza and contribute to other peoples posts (anonymously if you are too shy)


# Part 0: Setup

### Install Required Packages
`pip install -r requirements.txt`

Or I suggest installing these in a python virtual environment
```bash
mkdir envs
cd envs
virtualenv msci436 -p python3
source envs/msci436/bin/activate # activate virtual env
pip install -r requirements.txt
```

### Check if you have Java
#### Linux
`!java -version`

If you do not have java installed, run
```
sudo apt-get update
sudo apt-get install openjdk-8-jdk
```

#### macOS
[Here](https://mkyong.com/java/how-to-install-java-on-mac-osx/#homebrew-install-latest-java-on-macos) is the brief guide but note that if `/usr/local/Cellar` doesn't exist for you, follow the bellow steps

1. Install java `brew install java`
2. View java symbolic link instructions `brew info java`
3. Execute the command under "==> Caveats", (for me it's `sudo ln -sfn /opt/homebrew/opt/openjdk/libexec/openjdk.jdk /Library/Java/JavaVirtualMachines/openjdk.jdk`)
4. Validate that everything works with `java -version`

# Part 1: Data Creation
In this part you will add 10 more tweets to [hmw3_tweets.zip](https://github.com/jmpark0808/pyspark/blob/hmw3/hmw3_tweets.zip) (download and unzip it) which containes the first 500,000 tweets from the [Sentiment140](https://www.kaggle.com/datasets/kazanova/sentiment140) dataset. 

1. Use your student number as the 'id' (second column) and your username as the 'user' (fifth column)
2. Create tweets (10 total) mentioning (1 mention per tweet):
    - <span style="color:blue">{'Hollywood_Trey', 'MiDesfileNegro', 'TessMorris', 'amazingphoebe', 'kasey79', 'lost_dog', 'nessie111', 'nuttychris', 'sebby_peek', 'tweetpet'}
    - Example; "@Hollywood_Trey We should definitely nominate Eddie and Kimathi for the TA awards"
3. Include hashtag (defined below) in all your tweets 

This will be your personal data csv for this homework

In [1]:
HASHTAG = "#fail"

#Data added manually to hmw3_tweets.csv file
deptColumns = ["target","ids", "date", "flag", "user", "text"]
data = [("0","20837920","Mon Jun 12 12:45:09 EDT 2023","NO_QUERY","nnonta","This is my test tweet for this homework @Hollywood_Trey {}".format(HASHTAG)),
("0","20837920","Mon Jun 12 12:49:14 EDT 2023","NO_QUERY","nnonta","Sup @MiDesfileNegro, did you see that video I shared? {}".format(HASHTAG)),
("0","20837920","Mon Jun 12 12:51:16 EDT 2023","NO_QUERY","nnonta","OMG @TessMorris, you're hilarious {}".format(HASHTAG)),
("0","20837920","Mon Jun 12 12:51:43 EDT 2023","NO_QUERY","nnonta","Those turtles are so cute @amazingphoebe {}".format(HASHTAG)),
("0","20837920","Mon Jun 12 12:51:56 EDT 2023","NO_QUERY","nnonta","There are wayyy too many @kasey79 {}".format(HASHTAG)),
("0","20837920","Mon Jun 12 12:53:16 EDT 2023","NO_QUERY","nnonta","@lost_dog should definitely nominate Eddie and Kimathi for the TA awards, they're so helpful {}".format(HASHTAG)),
("0","20837920","Mon Jun 12 12:54:44 EDT 2023","NO_QUERY","nnonta","@nessie111 check this out {}".format(HASHTAG)),
("0","20837920","Mon Jun 12 12:54:59 EDT 2023","NO_QUERY","nnonta","It was raining and everything was muddy @nuttychris {}".format(HASHTAG)),
("0","20837920","Mon Jun 12 12:56:19 EDT 2023","NO_QUERY","nnonta","I need to try that pizza place @sebby_peek {}".format(HASHTAG)),
("0","20837920","Mon Jun 12 12:58:07 EDT 2023","NO_QUERY","nnonta","Aww that's really adorable @tweetpet {}".format(HASHTAG))]
print(len(data))

10


# Part 2: Load CSV with pyspark
Uncomment lines 13-14 if `sc.take(5)` outputs are double quoted

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkExample.com').getOrCreate()
#rdd = spark.sparkContext.parallelize(data)
# We load the csv into an RDD (Resilient Distributed Dataset) named tweetsCSV
sc = spark.sparkContext.textFile("hmw3_tweets.csv")

def process_string(s):
    split = s.split(',')
    if len(split) != 6:
        split[5] = ''.join(split[5:]).strip('"')
        split = split[:6]
    
#     for i in range(6):
#         split[i] = split[i][1:-1]
    return split
sc = sc.map(lambda s: process_string(s))
# "take" the first 5 items
sc.take(5)

23/06/13 21:41:06 WARN Utils: Your hostname, Nayeemas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.0.0.245 instead (on interface en0)
23/06/13 21:41:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/13 21:41:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

[['0',
  '1467810369',
  'Mon Apr 06 22:19:45 PDT 2009',
  'NO_QUERY',
  '_TheSpecialOne_',
  "@switchfoot http://twitpic.com/2y1zl - Awww that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D"],
 ['0',
  '1467810672',
  'Mon Apr 06 22:19:49 PDT 2009',
  'NO_QUERY',
  'scotthamilton',
  "is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!"],
 ['0',
  '1467810917',
  'Mon Apr 06 22:19:53 PDT 2009',
  'NO_QUERY',
  'mattycus',
  '@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds'],
 ['0',
  '1467811184',
  'Mon Apr 06 22:19:57 PDT 2009',
  'NO_QUERY',
  'ElleCTF',
  'my whole body feels itchy and like its on fire '],
 ['0',
  '1467811193',
  'Mon Apr 06 22:19:57 PDT 2009',
  'NO_QUERY',
  'Karoli',
  "@nationwideclass no it's not behaving at all. i'm mad. why am i here? because I can't see you all over there. "]]

# Part 3: Create a pyspark DataFrame

In [3]:
# Define data columns
deptColumns = ["target","ids", "date", "flag", "user", "text"]
# TODO: Create data frame
tweetsDF = sc.toDF(deptColumns)
#Checking the total number of rows in DF
print(tweetsDF.count())



500010


                                                                                

In [4]:
# TODO: Visualize the DataFrame schema `printSchema()`
tweetsDF.printSchema()


root
 |-- target: string (nullable = true)
 |-- ids: string (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



In [5]:
# TODO: Show the first 5 rows `show(...)`
tweetsDF.show(5,truncate=True)

[Stage 5:>                                                          (0 + 1) / 1]

+------+----------+--------------------+--------+---------------+--------------------+
|target|       ids|                date|    flag|           user|                text|
+------+----------+--------------------+--------+---------------+--------------------+
|     0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
+------+----------+--------------------+--------+---------------+--------------------+
only showing top 5 rows



23/06/13 21:41:25 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 5 (TID 5): Attempting to kill Python Worker
                                                                                

In [6]:
# TODO: Print the total number of rows
tweetsDF.count()

                                                                                

500010

# Part 4: Query/Explore your Data
## Part 4.1: Using `pyspark.sql.DataFrame.where`

In [7]:
# TODO: Print out all the tweets (tweetsDF.text) made by you (tweetsDF.user)
print(tweetsDF.where(tweetsDF.user == 'nnonta').show(truncate=False))

                                                                                

+------+--------+----------------------------+--------+------+-------------------------------------------------------------------------------------------------+
|target|ids     |date                        |flag    |user  |text                                                                                             |
+------+--------+----------------------------+--------+------+-------------------------------------------------------------------------------------------------+
|0     |20837920|Mon Jun 12 12:45:09 EDT 2023|NO_QUERY|nnonta|This is my test tweet for this homework @Hollywood_Trey #fail                                    |
|0     |20837920|Mon Jun 12 12:49:14 EDT 2023|NO_QUERY|nnonta|Sup @MiDesfileNegro did you see that video I shared? #fail                                       |
|0     |20837920|Mon Jun 12 12:51:16 EDT 2023|NO_QUERY|nnonta|OMG @TessMorris you're hilarious #fail                                                           |
|0     |20837920|Mon Jun 12 12:51:

                                                                                

In [8]:
# TODO: Print the combined number of tweets made by 'lost_dog' and 'nuttychris'
print(tweetsDF.where(tweetsDF.user.isin(['lost_dog', 'nuttychris'])).count())

[Stage 11:>                                                         (0 + 2) / 2]

440


                                                                                

In [9]:
from pyspark.sql.functions import col # might be useful

# TODO: Print the number of tweets that mention user 'amazingphoebe'
print(tweetsDF.where(tweetsDF.text.contains('amazingphoebe')).count())

[Stage 14:>                                                         (0 + 2) / 2]

44


                                                                                

In [10]:
# TODO: Print the 5 most active twitter users 
from pyspark.sql.functions import desc
df = tweetsDF.groupBy('user').count().orderBy(desc('count'))
df.show(5)

                                                                                

+-----------+-----+
|       user|count|
+-----------+-----+
|   lost_dog|  338|
|   tweetpet|  310|
|    webwoke|  264|
|mcraddictal|  210|
|     wowlew|  210|
+-----------+-----+
only showing top 5 rows



## Part 4.2: Using `spark.sql(...)`

In [11]:
tweetsDF.createOrReplaceTempView("tweets") 
# register a table called 'tweets' for querying `createOrReplaceTempView(...)`

In [12]:
# TODO: Select the first 5 entries from the 'user' column
# Tutorial code
temp_df = spark.sql("SELECT user FROM tweets LIMIT 5")
temp_df.show()

[Stage 20:>                                                         (0 + 1) / 1]

+---------------+
|           user|
+---------------+
|_TheSpecialOne_|
|  scotthamilton|
|       mattycus|
|        ElleCTF|
|         Karoli|
+---------------+



23/06/13 21:41:39 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 20 (TID 25): Attempting to kill Python Worker
                                                                                

In [13]:
# TODO: Get the number of distinct/unique users
# https://www.w3schools.com/sql/sql_distinct.asp
temp_df = spark.sql("SELECT DISTINCT user FROM tweets")
print(temp_df.count())

[Stage 21:>                                                         (0 + 2) / 2]

287378


                                                                                

In [14]:
# TODO: Show first 10 tweets that mention user 'amazingphoebe'
# https://www.w3schools.com/sql/trysql.asp?filename=trysql_op_like
temp_df = spark.sql("SELECT * FROM tweets WHERE LOWER(text) LIKE '%amazingphoebe%' LIMIT 10")
temp_df.show()

[Stage 27:>                                                         (0 + 1) / 1]

+------+----------+--------------------+--------+----------+--------------------+
|target|       ids|                date|    flag|      user|                text|
+------+----------+--------------------+--------+----------+--------------------+
|     0|1468989467|Tue Apr 07 04:44:...|NO_QUERY|sebby_peek|@amazingphoebe i ...|
|     0|1551269588|Sat Apr 18 08:36:...|NO_QUERY|sebby_peek|@amazingphoebe i ...|
|     0|1551546241|Sat Apr 18 09:20:...|NO_QUERY|sebby_peek|@amazingphoebe i ...|
|     0|1551799800|Sat Apr 18 10:00:...|NO_QUERY|sebby_peek|@amazingphoebe i ...|
|     0|1551870959|Sat Apr 18 10:11:...|NO_QUERY|sebby_peek|@amazingphoebe i ...|
|     0|1551939092|Sat Apr 18 10:21:...|NO_QUERY|sebby_peek|@amazingphoebe i ...|
|     0|1556117960|Sat Apr 18 21:42:...|NO_QUERY|sebby_peek|@amazingphoebe ye...|
|     0|1556977080|Sun Apr 19 01:19:...|NO_QUERY|sebby_peek|@amazingphoebe mu...|
|     0|1556996258|Sun Apr 19 01:26:...|NO_QUERY|sebby_peek|@amazingphoebe do...|
|     0|15570027

23/06/13 21:41:46 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 27 (TID 34): Attempting to kill Python Worker
                                                                                

# Part 5: Hashtag Statistics

In [15]:
hashTags = sc.flatMap(lambda xs: [x.split(' ') for x in xs]).flatMap(lambda x: x).filter(lambda w: w.startswith("#"))

# Print the total number of hashtags found in the dataset
hashTags.count()
print(hashTags.count())

[Stage 29:>                                                         (0 + 2) / 2]

10551


                                                                                

In [16]:
# TODO: Count unique hashtags and sort them in descending order of count
#changing each hastag with a tuple, reducing everything and addding the count, then sorting
countedHashTags = hashTags.map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda tup: tup[1], ascending = False)

# Print the top 10 hashtags
countedHashTags.take(10)

                                                                                

[('#fb', 586),
 ('#fail', 168),
 ('#', 132),
 ('#asot400', 111),
 ('#squarespace', 102),
 ('#iremember', 88),
 ('#1', 85),
 ('#e3', 76),
 ('#2', 75),
 ('#bgt', 71)]

# Part 6: Create A Mentions Column
In this section you will parse the tweet text for mentions (e.g. @tweetpet) and place them in a separate data column

## Part 6.1: Create a Pandas DataFrame and add Mentions Column

In [17]:
# TODO: Convert to a pandas dataframe `toPandas()`
tweets_PDF = tweetsDF.toPandas()

# Visualize first 5 rows
tweets_PDF.head(5)

                                                                                

Unnamed: 0,target,ids,date,flag,user,text
0,0,1467810369,Mon Apr 06 22:19:45 PDT 2009,NO_QUERY,_TheSpecialOne_,@switchfoot http://twitpic.com/2y1zl - Awww th...
1,0,1467810672,Mon Apr 06 22:19:49 PDT 2009,NO_QUERY,scotthamilton,is upset that he can't update his Facebook by ...
2,0,1467810917,Mon Apr 06 22:19:53 PDT 2009,NO_QUERY,mattycus,@Kenichan I dived many times for the ball. Man...
3,0,1467811184,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,ElleCTF,my whole body feels itchy and like its on fire
4,0,1467811193,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,Karoli,@nationwideclass no it's not behaving at all. ...


In [18]:
# TODO: Create a column which lists the mentions present for each tweet 
# Tutorial code

def addMentionedColumn(df):
    def mentionsList(txt):
        allWords = [word.strip(""" ,.:'\";""").lower() for word in txt.split()]
        allNames = [word.strip("@") for word in allWords if word.startswith("@")]
        uniqueNames = list(set(allNames))
        return uniqueNames
    
    df["mentioned"] = df["text"].apply(mentionsList)

addMentionedColumn(tweets_PDF)
# Visualize the first 5 rows of this new dataset (should include 'mentioned' column)
tweets_PDF.head(5)

Unnamed: 0,target,ids,date,flag,user,text,mentioned
0,0,1467810369,Mon Apr 06 22:19:45 PDT 2009,NO_QUERY,_TheSpecialOne_,@switchfoot http://twitpic.com/2y1zl - Awww th...,[switchfoot]
1,0,1467810672,Mon Apr 06 22:19:49 PDT 2009,NO_QUERY,scotthamilton,is upset that he can't update his Facebook by ...,[]
2,0,1467810917,Mon Apr 06 22:19:53 PDT 2009,NO_QUERY,mattycus,@Kenichan I dived many times for the ball. Man...,[kenichan]
3,0,1467811184,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,ElleCTF,my whole body feels itchy and like its on fire,[]
4,0,1467811193,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,Karoli,@nationwideclass no it's not behaving at all. ...,[nationwideclass]


## Part 6.2: Filter out tweets that don't include `HASHTAGS`
Use [pandas.DataFrame.loc](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.loc.html) method to filter pandas dataframe (e.g.,`new_pdf = old_pdf.loc[(old_pdf['user']=="Karol")]`)

Note that you will need to find out how to search if a column (e.g., tweets_PDF['text']) contains a `HASHTAG`. Refer to [this](https://stackoverflow.com/questions/11350770/filter-pandas-dataframe-by-substring-criteria) stack overflow link

** Search for `HASHTAG + " "` adding trailing space to only search for whole words

In [19]:
# TODO: Only store tweets with HASHTAG
tweets_PDF_filtered = tweets_PDF.loc[tweets_PDF['text'].str.contains(HASHTAG)]

# Print the number of tweets containting the hashtag
print(tweets_PDF_filtered.count())

# Print the first couple tweets
import pandas as pd
pd.set_option('display.max_colwidth', None)
tweets_PDF_filtered.head(5)

target       180
ids          180
date         180
flag         180
user         180
text         180
mentioned    180
dtype: int64


Unnamed: 0,target,ids,date,flag,user,text,mentioned
1171,0,1468099958,Mon Apr 06 23:42:46 PDT 2009,NO_QUERY,hypatiadotca,Slideshare's embed code is annoying me. Sorry about the tiny embed #fail,[]
1444,0,1468156574,Tue Apr 07 00:01:29 PDT 2009,NO_QUERY,rschu,It seems that Twitter lost some updates yesterday - again!! #twitter #fail,[]
3413,0,1468590446,Tue Apr 07 02:35:41 PDT 2009,NO_QUERY,Morledge,Considering new business name which means new logo and website. Finding it very difficult to pick a name though. Been 2 weeks #fail,[]
8121,0,1470072342,Tue Apr 07 08:15:52 PDT 2009,NO_QUERY,mrsnickhodge,Arrrggh #failwhale strikes again,[]
8573,0,1470241061,Tue Apr 07 08:46:10 PDT 2009,NO_QUERY,mdrisser,Major #failwhale for Twitter today,[]


# Part 7: Graph Nodes
## Part 7.1: Create graph of mentions

In [20]:
import networkx as nx
# TODO: Create a graph of user nodes linked by mentions (Tutorial code)

def mentionGraph(df):
    g = nx.Graph()
    
    for  (index, target, ids, date, flag, user, text, mentionedUsers) in df.itertuples():
        for mentionedUser in mentionedUsers: 
            if (user in g) and (mentionedUser in g[user]): #only looking at user and mentioned user
                g[user][mentionedUser]["numberMentions"] += 1
            else:
                g.add_edge(user, mentionedUser, numberMentions=1)
    
    return g

hashtagGraph = mentionGraph(tweets_PDF_filtered)


In [21]:
# Print the number of nodes attached to your username (Tutorial code)
nx.degree(hashtagGraph,'nnonta')

10

## Part 7.1: Visualize graph of mentions

In [43]:
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
from plotly.graph_objs import Scatter, Layout, Figure

init_notebook_mode(connected=True)

import random
random.seed(0) # deterministic
def addRandomPositions(graph):
    posDict = dict((node,(random.gauss(0,10),random.gauss(0,10))) for node in graph.nodes())
    nx.set_node_attributes(graph, posDict, "pos")
    
addRandomPositions(hashtagGraph)

# TODO: Print your node's position using the 'pos' attributed added to the graph (Tutorial code)
nx.get_node_attributes(hashtagGraph, 'pos')['nnonta']

(-8.644280191091848, 0.29434843699984803)

In [44]:
def plotNetwork(graph):
    scatters=[]

    for (node1, node2) in graph.edges():
        # NOTE: node1, and node2 are the usernames
        # TODO: Change the color of the edge line if one of the nodes is yours
        color = 'blue'
        x0, y0 = graph.nodes[node1]['pos']
        x1, y1 = graph.nodes[node2]['pos']
        edgeWidth = graph[node1][node2]['numberMentions']
        s = Scatter(
                x=[x0, x1],
                y=[y0, y1],
                hoverinfo='none',
                mode='lines', 
                line=Line(width=1 ,color=color))
        scatters.append(s)

    for node in graph.nodes():
        # TODO: Change the color of your node
        color = 'red'
        xPos, yPos = graph.nodes[node]['pos']
        s = Scatter(
                x=[xPos], 
                y=[yPos], 
                hoverinfo='name',
                name=node,
                mode='markers', 
                marker=dict(
                    color=color, 
                    size=10,         
                    line=dict(width=2)))
        scatters.append(s)
    
    layout = Layout(showlegend=False)
    fig = Figure(data=scatters, layout=layout)
    iplot(fig, show_link=False )

plotNetwork(hashtagGraph)

ValueError: Mime type rendering requires nbformat>=4.2.0 but it is not installed