## Install required packages

```
pip install -r requirements.txt
```

## Check if you have Java

!java -version 

If you do not have java installed,
run 
```
sudo apt-get update
sudo apt-get install openjdk-8-jdk
```

## Working with Spark

First, load pyspark

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkExample.com').getOrCreate()

# We load the csv into an RDD (Resilient Distributed Dataset) named tweetsCSV
sc = spark.sparkContext.textFile("training.1600000.processed.noemoticon.csv")
def process_string(s):
    split = s.split(',')
    if len(split) != 6:
        split[5] = ''.join(split[5:])
        split = split[:6]
    
    for i in range(6):
        split[i] = split[i][1:-1]
    return split
sc = sc.map(lambda s: process_string(s))

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/13 18:54:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/06/13 18:54:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/06/13 18:54:27 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [2]:
# "take" the first 5 items
sc.take(5)

                                                                                

[['0',
  '1467810369',
  'Mon Apr 06 22:19:45 PDT 2009',
  'NO_QUERY',
  '_TheSpecialOne_',
  "@switchfoot http://twitpic.com/2y1zl - Awww that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D"],
 ['0',
  '1467810672',
  'Mon Apr 06 22:19:49 PDT 2009',
  'NO_QUERY',
  'scotthamilton',
  "is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!"],
 ['0',
  '1467810917',
  'Mon Apr 06 22:19:53 PDT 2009',
  'NO_QUERY',
  'mattycus',
  '@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds'],
 ['0',
  '1467811184',
  'Mon Apr 06 22:19:57 PDT 2009',
  'NO_QUERY',
  'ElleCTF',
  'my whole body feels itchy and like its on fire '],
 ['0',
  '1467811193',
  'Mon Apr 06 22:19:57 PDT 2009',
  'NO_QUERY',
  'Karoli',
  "@nationwideclass no it's not behaving at all. i'm mad. why am i here? because I can't see you all over there. "]]

In [3]:
deptColumns = ["target","ids", "date", "flag", "user", "text"]
# Create data frame
tweetsDF = sc.toDF(deptColumns)
tweetsDF.printSchema()
tweetsDF.show(truncate=True)

root
 |-- target: string (nullable = true)
 |-- ids: string (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



[Stage 2:>                                                          (0 + 1) / 1]

+------+----------+--------------------+--------+---------------+--------------------+
|target|       ids|                date|    flag|           user|                text|
+------+----------+--------------------+--------+---------------+--------------------+
|     0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|     0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|     0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|     0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|     0|1467811795|Mon Apr 06 22:20:...|NO_

23/06/01 20:39:39 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 2 (TID 2): Attempting to kill Python Worker
                                                                                

In [4]:
# Select-Where query (selecting all the tweets of use "mimismo")
tweetsDF.where(tweetsDF.user=='starkissed').select(tweetsDF.date).collect()

                                                                                

[Row(date='Mon Apr 06 22:20:31 PDT 2009'),
 Row(date='Fri May 01 23:36:21 PDT 2009'),
 Row(date='Sun May 10 19:35:12 PDT 2009'),
 Row(date='Mon May 18 05:57:55 PDT 2009'),
 Row(date='Fri May 29 09:28:23 PDT 2009'),
 Row(date='Fri May 29 16:40:47 PDT 2009'),
 Row(date='Fri May 29 17:14:18 PDT 2009'),
 Row(date='Sat May 30 06:45:57 PDT 2009'),
 Row(date='Sat May 30 09:35:11 PDT 2009'),
 Row(date='Sun May 31 13:30:47 PDT 2009'),
 Row(date='Tue Jun 02 10:24:34 PDT 2009'),
 Row(date='Wed Jun 03 04:26:07 PDT 2009'),
 Row(date='Fri Jun 05 11:58:06 PDT 2009'),
 Row(date='Mon Jun 15 13:45:23 PDT 2009'),
 Row(date='Mon Jun 15 19:52:47 PDT 2009'),
 Row(date='Mon Jun 15 19:58:11 PDT 2009'),
 Row(date='Mon Jun 15 20:07:56 PDT 2009'),
 Row(date='Tue Jun 16 10:54:52 PDT 2009'),
 Row(date='Tue Jun 16 13:27:11 PDT 2009'),
 Row(date='Tue Jun 16 15:58:45 PDT 2009'),
 Row(date='Tue Jun 16 16:02:00 PDT 2009'),
 Row(date='Tue Jun 16 16:29:21 PDT 2009'),
 Row(date='Wed Jun 17 18:09:47 PDT 2009'),
 Row(date='

In [5]:
tweetsDF.createOrReplaceTempView("tweets")

### You can also use SQL queries 

In [6]:

temp_df = spark.sql("SELECT * FROM tweets LIMIT 10")
temp_df.show()

[Stage 4:>                                                          (0 + 1) / 1]

+------+----------+--------------------+--------+---------------+--------------------+
|target|       ids|                date|    flag|           user|                text|
+------+----------+--------------------+--------+---------------+--------------------+
|     0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|     0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|     0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|     0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|     0|1467811795|Mon Apr 06 22:20:...|NO_

23/06/01 20:39:44 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 4 (TID 11): Attempting to kill Python Worker
                                                                                

## Introduction to NetworkX


In [7]:
import networkx as nx

In [8]:
G = nx.Graph()

G.add_node(234)
G.add_node("hello")
G.add_edge(234,"hello")

print("Nodes:", G.nodes())
print("Edges:", G.edges())



Nodes: [234, 'hello']
Edges: [(234, 'hello')]


In [9]:
G[234]

AtlasView({'hello': {}})

In [10]:
G['hello']

AtlasView({234: {}})

In [11]:
G.add_edge('Alice', 'Bob', know= 10, friends=5)

In [12]:
print("Nodes:", G.nodes())
print("Edges:", G.edges())

Nodes: [234, 'hello', 'Alice', 'Bob']
Edges: [(234, 'hello'), ('Alice', 'Bob')]


In [13]:
G['Bob']

AtlasView({'Alice': {'know': 10, 'friends': 5}})

In [14]:
G['Alice']

AtlasView({'Bob': {'know': 10, 'friends': 5}})

In [15]:
G['Bob']['Alice']['know'] += 1

In [16]:
G['Alice']

AtlasView({'Bob': {'know': 11, 'friends': 5}})

## Analyzing graphs

In [17]:
G.add_edge('Alice', 'Carlos')
G.add_edge('Carlos', 'Dave')
G.add_edge('Dave', 'Bob')
G.add_edge('Alice', 'Eve')

In [18]:
components = nx.connected_components(G)
list(components)

[{234, 'hello'}, {'Alice', 'Bob', 'Carlos', 'Dave', 'Eve'}]

In [19]:
nx.degree(G)

DegreeView({234: 1, 'hello': 1, 'Alice': 3, 'Bob': 2, 'Carlos': 2, 'Dave': 2, 'Eve': 1})

In [20]:
nx.degree(G,'Bob')

2

In [21]:
nx.has_path(G, 'Alice', 'Dave')

True

In [22]:
nx.has_path(G, 'Alice', 'hello')

False

In [23]:
nx.shortest_path(G, 'Alice', 'Dave')

['Alice', 'Carlos', 'Dave']

## Let's connect graphs and Twitter!

In [24]:
import re
wordTokenizerRegex = re.compile("[ ,.;]")

# Split to words and flatten using flatMap (to have one big RDD with all words). Then, we filter only hashtags (starting with "#")
hashTags = sc.flatMap(lambda xs: [x.split(' ') for x in xs]).flatMap(lambda x: x).filter(lambda w: w.startswith("#"))

In [25]:
# how many hashtags did we find
print(hashTags.count())

# examine the first 100
print(hashTags.take(100))



44108
['#itm', '#therapyfail', '#fb', '#TTSC?', '#24', '#gayforpeavy', '#FML', '#3', '#camerafail', '#', '#...We', '#fb', '#travian', '#wow', '#art', '#fail', '#KrispyKreme', '#fb', '#24', '#the', '#twitter', '#fail', '#CNN', '#1', '#Kulula', '#surface', '#c25k', '#drupal', '#epicfail', '#rails', '#mhbigcatch', '#stuckrecord', '#twitteriffic', '#startrek', '#earthquake', '#php', '#poken', '#error', '#tracy', '#fb', '#dwsr', '#Italy', '#Earthquake', '#fantasysurfer', '#titsuptuesday', '#N85', '#heyxboxlive', '#House', '#babypunch', '#cob', '#ticket', '#fb', '#fb', '#terremoto', '#earthquake', '#italy', '#fail', '#&amp;$(#&amp;$!(@#$(!@#*$(#!(', '#@&amp;$(!@#($*(!@#*$(!@#&amp;$(!@#$&amp;!(@#!#@($...', '#itm', '#AutomationAtaCost', '#gsoc', '#rockbox', '#niceweather', '#Berlin', '#goodlife', '#Ambients', '#theinbetweeners', '#Twilight', '#Summers', '#mmuk09', '#i36', '#spotify', '#fb', '#pman', '#spbpuk', '#dreams', '#dreams', '#dwsr', '#fb', '#wolframalpha', '#search', '#semantic-web', '

                                                                                

In [26]:
# count unique words and sort them in descending order of count
countedHashTags = hashTags.map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda tup: tup[1], ascending = False)

                                                                                

In [27]:
#show top 100 words
countedHashTags.take(100)

[('#followfriday', 1861),
 ('#fb', 1728),
 ('#squarespace', 843),
 ('#FF', 503),
 ('#seb-day', 498),
 ('#', 485),
 ('#FollowFriday', 414),
 ('#1', 403),
 ('#musicmonday', 385),
 ('#iranelection', 342),
 ('#ff', 334),
 ('#fail', 288),
 ('#BSB', 273),
 ('#iremember', 263),
 ('#myweakness', 260),
 ('#asot400', 254),
 ('#marsiscoming', 245),
 ('#mcflyforgermany', 230),
 ('#2', 228),
 ('#andyhurleyday', 187),
 ('#inaperfectworld', 177),
 ('#iPhone', 175),
 ('#delongeday', 165),
 ('#haveyouever', 164),
 ('#f1', 163),
 ('#mileymonday', 155),
 ('#trackle', 153),
 ('#tcot', 151),
 ('#spymaster', 149),
 ('#bgt', 146),
 ('#e3', 126),
 ('#iphone', 122),
 ('#followfriday!', 120),
 ('#IranElection', 112),
 ('#E3', 111),
 ('#hhrs', 105),
 ('#clothdiapers', 104),
 ('#dontyouhate', 102),
 ('#goodsex', 102),
 ('#bradiewebb', 102),
 ('#mmwanted', 97),
 ('#flylady', 94),
 ("#tag'", 92),
 ('#asylm', 91),
 ('#BGT', 89),
 ('#chesterday', 87),
 ('#F1', 86),
 ('#hoppusday', 84),
 ('#andyclemmensen', 84),
 ('#e

## Filtering tweets with selected hashtag #musicmonday

In [28]:
temp_df = spark.sql("SELECT * FROM tweets WHERE LOWER(text) LIKE '%#musicmonday%'")
temp_df = temp_df.toPandas()
temp_df.head()

                                                                                

Unnamed: 0,target,ids,date,flag,user,text
0,0,1565096048,Mon Apr 20 05:53:13 PDT 2009,NO_QUERY,_whatshername,wanted to post Butch Walker's &quot;#1 Summer ...
1,0,1565528649,Mon Apr 20 06:58:21 PDT 2009,NO_QUERY,breezydayz,Listening to some Morningwood &quot;Sugarbaby&...
2,0,1833334416,Sun May 17 22:58:17 PDT 2009,NO_QUERY,dimabm,yay for #musicmonday !!! Too bad I can't tweet...
3,0,1833583078,Sun May 17 23:45:36 PDT 2009,NO_QUERY,Galiiit,#musicmonday Green Day 21st Century Breakdown....
4,0,1833589369,Sun May 17 23:46:51 PDT 2009,NO_QUERY,SketchStudios,@MyMelodie Thats a bummer for people who have ...


Once we have the required dataframe - we save it (to avoid having to re-run spark again next time)

In [29]:
temp_df.to_csv("musicmonday.csv")

Now, let's construct a graph

In [30]:
def addMentionedColumn(df):
    
    def mentionsList(txt):
        allWords = [word.strip(""" ,.:'\";""").lower() for word in txt.split()]
        allNames = [word.strip("@") for word in allWords if word.startswith("@")]
        uniqueNames = list(set(allNames))
        return uniqueNames
    
    df["mentioned"] = df["text"].apply(mentionsList)

In [31]:
addMentionedColumn(temp_df)

In [32]:
temp_df.head(10)

Unnamed: 0,target,ids,date,flag,user,text,mentioned
0,0,1565096048,Mon Apr 20 05:53:13 PDT 2009,NO_QUERY,_whatshername,wanted to post Butch Walker's &quot;#1 Summer ...,[]
1,0,1565528649,Mon Apr 20 06:58:21 PDT 2009,NO_QUERY,breezydayz,Listening to some Morningwood &quot;Sugarbaby&...,[]
2,0,1833334416,Sun May 17 22:58:17 PDT 2009,NO_QUERY,dimabm,yay for #musicmonday !!! Too bad I can't tweet...,[]
3,0,1833583078,Sun May 17 23:45:36 PDT 2009,NO_QUERY,Galiiit,#musicmonday Green Day 21st Century Breakdown....,[]
4,0,1833589369,Sun May 17 23:46:51 PDT 2009,NO_QUERY,SketchStudios,@MyMelodie Thats a bummer for people who have ...,[mymelodie]
5,0,1834974520,Mon May 18 04:46:48 PDT 2009,NO_QUERY,acchanosaurus,#musicmonday now listening to @katyperry - thi...,[katyperry]
6,0,1835193221,Mon May 18 05:23:20 PDT 2009,NO_QUERY,piroteknix,I'd really like to say I'm not listening to NI...,[]
7,0,1835316387,Mon May 18 05:42:13 PDT 2009,NO_QUERY,carmelabalboa,I Really Want You - James Blunt #musicmonday T...,[]
8,0,1835708717,Mon May 18 06:35:11 PDT 2009,NO_QUERY,GlmmC,workingworking working need a holiday!!! #mus...,[]
9,0,1835998304,Mon May 18 07:10:25 PDT 2009,NO_QUERY,samuy,#musicmonday NO MONDAY playlist yet. [suggest...,[]


In [33]:
def mentionGraph(df):
    g = nx.Graph()
    
    for  (index, target, ids, date, flag, user, text, mentionedUsers) in df.itertuples():
        for mentionedUser in mentionedUsers: 
            if (user in g) and (mentionedUser in g[user]):
                g[user][mentionedUser]["numberMentions"] += 1
            else:
                g.add_edge(user, mentionedUser, numberMentions=1)
    
    return g

In [34]:
musicGraph = mentionGraph(temp_df)

In [35]:
print("# nodes:", len(musicGraph.nodes()))
print("# edges:", len(musicGraph.edges()))

# nodes: 173
# edges: 98


## Visualize Mention Graph

In [36]:

from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
from plotly.graph_objs import *

init_notebook_mode(connected=True)

In [37]:
import random
def addRandomPositions(graph):
    posDict = dict((node,(random.gauss(0,10),random.gauss(0,10))) for node in graph.nodes())
    nx.set_node_attributes(graph, posDict, "pos")

In [38]:
addRandomPositions(musicGraph)

In [39]:
nx.get_node_attributes(musicGraph, 'pos')['acchanosaurus']

(-13.779285036744607, 5.898142850240584)

In [40]:
def plotNetwork(graph):
    scatters=[]

    for (node1, node2) in graph.edges():
        x0, y0 = graph.nodes[node1]['pos']
        x1, y1 = graph.nodes[node2]['pos']
        edgeWidth = graph[node1][node2]['numberMentions']
        s = Scatter(
                x=[x0, x1],
                y=[y0, y1],
                hoverinfo='none',
                mode='lines', 
                line=Line(width=1 ,color='#888'))
        scatters.append(s)



    for node in graph.nodes():
        xPos, yPos = graph.nodes[node]['pos']
        s = Scatter(
                x=[xPos], 
                y=[yPos], 
                hoverinfo='none',
                mode='markers', 
                marker=dict(
                    color="#888", 
                    size=10,         
                    line=dict(width=2)))
        scatters.append(s)
    
    layout = Layout(showlegend=False)
    fig = Figure(data=scatters, layout=layout)
    iplot(fig, show_link=False )

In [41]:
plotNetwork(musicGraph)


plotly.graph_objs.Line is deprecated.
Please replace it with one of the following more specific types
  - plotly.graph_objs.scatter.Line
  - plotly.graph_objs.layout.shape.Line
  - etc.


