In [1]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sqlContext.sql("set spark.sql.caseSensitive=true")

DataFrame[key: string, value: string]

In [2]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')

In [3]:
# Convert each line into a pair of words
country_words = country_lines.flatMap(lambda line : line.split("\n"))

In [4]:
country_words.take(5)

['Afghanistan, AFG',
 'Albania, ALB',
 'Algeria, ALG',
 'American Samoa, ASA',
 'Andorra, AND']

In [5]:
# Convert each pair of words into a tuple
def split(line):
    words = line.split(", ")
    return [(words[0], 0), (words[1], 1)]
country_tuples = country_words.flatMap(split)

In [6]:
country_tuples.take(10)

[('Afghanistan', 0),
 ('AFG', 1),
 ('Albania', 0),
 ('ALB', 1),
 ('Algeria', 0),
 ('ALG', 1),
 ('American Samoa', 0),
 ('ASA', 1),
 ('Andorra', 0),
 ('AND', 1)]

In [7]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(15)

root
 |-- country: string (nullable = true)
 |-- code: long (nullable = true)



[Row(country='Afghanistan', code=0),
 Row(country='AFG', code=1),
 Row(country='Albania', code=0),
 Row(country='ALB', code=1),
 Row(country='Algeria', code=0),
 Row(country='ALG', code=1),
 Row(country='American Samoa', code=0),
 Row(country='ASA', code=1),
 Row(country='Andorra', code=0),
 Row(country='AND', code=1),
 Row(country='Angola', code=0),
 Row(country='ANG', code=1),
 Row(country='Anguilla', code=0),
 Row(country='AIA', code=1),
 Row(country='Antigua and Barbuda', code=0)]

In [8]:
country_tuples.count()

422

In [9]:
# Read tweets CSV file into RDD of lines
tweets = sc.textFile('file:///home/cloudera/Downloads/big-data-3/mongodb/tweets.csv')

In [10]:
tweets.take(5)

['tweet_text',
 'RT @ochocinco: I beat them all for 10 straight hours #FIFA16KING  https://t.co/BFnV6jfkBL',
 'RT @NiallOfficial: @Louis_Tomlinson @socceraid when I retired from playing because of my knee . I went and did my uefa A badges in Dublin',
 'RT @GameSeek: Follow & Retweet for your chance to win a copy of FIFA 17 Deluxe Edition (platform of your choice) in our #giveaway! https://…',
 '@CIVARAGI ...I was putting ffs but it autocorrected it too FIFA']

In [11]:
tweets.count()

13995

In [12]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
filtered_tweets = tweets.filter(lambda line: line != '')

In [13]:
filtered_tweets.take(5)

['tweet_text',
 'RT @ochocinco: I beat them all for 10 straight hours #FIFA16KING  https://t.co/BFnV6jfkBL',
 'RT @NiallOfficial: @Louis_Tomlinson @socceraid when I retired from playing because of my knee . I went and did my uefa A badges in Dublin',
 'RT @GameSeek: Follow & Retweet for your chance to win a copy of FIFA 17 Deluxe Edition (platform of your choice) in our #giveaway! https://…',
 '@CIVARAGI ...I was putting ffs but it autocorrected it too FIFA']

In [14]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
tweet_words = filtered_tweets.flatMap(lambda line: line.split(" "))
tweet_tuples = tweet_words.map(lambda word: (word, 1))
tweet_word_count = tweet_tuples.reduceByKey(lambda a, b: a + b)

In [15]:
tweet_word_count.take(15)

[('', 3292),
 ('https://t.co/fQftAwGAad', 1),
 ('mobile', 1),
 ('#FridayNightTouchdown', 1),
 ('circle', 7),
 ('#thfc', 2),
 ('reinstated', 4),
 ('like?"', 1),
 ('Bellow', 1),
 ('now"', 1),
 ('https://t.co/W4QluWGyeq', 1),
 ('https://t.co/qMkpvzgr0Y', 1),
 ('NINTENDO', 1),
 ('year-', 1),
 ('belt', 1)]

In [16]:
# Create the DataFrame of tweet word counts
tweetsDF = sqlContext.createDataFrame(tweet_word_count, ["country", "count"])
tweetsDF.printSchema()
tweetsDF.take(10)

root
 |-- country: string (nullable = true)
 |-- count: long (nullable = true)



[Row(country='', count=3292),
 Row(country='https://t.co/fQftAwGAad', count=1),
 Row(country='mobile', count=1),
 Row(country='#FridayNightTouchdown', count=1),
 Row(country='circle', count=7),
 Row(country='#thfc', count=2),
 Row(country='reinstated', count=4),
 Row(country='like?"', count=1),
 Row(country='Bellow', count=1),
 Row(country='now"', count=1)]

In [17]:
# Join the country and tweet data frames (on the appropriate column)
merge = tweetsDF.join(countryDF.filter(countryDF["code"] == 0), 'country')

In [18]:
merge.printSchema()

root
 |-- country: string (nullable = true)
 |-- count: long (nullable = true)
 |-- code: long (nullable = true)



In [19]:
merge.take(15)

[Row(country='Thailand', count=1, code=0),
 Row(country='Iceland', count=2, code=0),
 Row(country='Mexico', count=1, code=0),
 Row(country='Wales', count=19, code=0),
 Row(country='Denmark', count=1, code=0),
 Row(country='India', count=4, code=0),
 Row(country='Portugal', count=8, code=0),
 Row(country='Poland', count=1, code=0),
 Row(country='Norway', count=52, code=0),
 Row(country='Guinea', count=8, code=0),
 Row(country='Slovakia', count=30, code=0),
 Row(country='Canada', count=11, code=0),
 Row(country='Netherlands', count=13, code=0),
 Row(country='Kenya', count=3, code=0),
 Row(country='Oman', count=1, code=0)]

In [20]:
# Question 1: number of distinct countries mentioned
merge.distinct().count()

44

In [21]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
merge.select("count").describe().show()
merge.select("count").agg({'count' : 'sum'}).show()

+-------+-----------------+
|summary|            count|
+-------+-----------------+
|  count|               44|
|   mean|9.022727272727273|
| stddev|12.62977036076866|
|    min|                1|
|    max|               52|
+-------+-----------------+

+----------+
|sum(count)|
+----------+
|       397|
+----------+



In [22]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
merge.orderBy(desc("count")).show(5)

+--------+-----+----+
| country|count|code|
+--------+-----+----+
|  Norway|   52|   0|
| Nigeria|   49|   0|
|  France|   42|   0|
|Slovakia|   30|   0|
| England|   25|   0|
+--------+-----+----+
only showing top 5 rows



In [23]:
# Table 2: counts for Wales, Iceland, and Japan.
merge.filter(merge["country"] == "Wales").show(1)
merge.filter(merge["country"] == "Netherlands").show(1)
merge.filter(merge["country"] == "Kenya").show(1)
merge.filter(merge["country"] == "Iceland").show(1)
merge.filter(merge["country"] == "Japan").show(1)

+-------+-----+----+
|country|count|code|
+-------+-----+----+
|  Wales|   19|   0|
+-------+-----+----+

+-----------+-----+----+
|    country|count|code|
+-----------+-----+----+
|Netherlands|   13|   0|
+-----------+-----+----+

+-------+-----+----+
|country|count|code|
+-------+-----+----+
|  Kenya|    3|   0|
+-------+-----+----+

+-------+-----+----+
|country|count|code|
+-------+-----+----+
|Iceland|    2|   0|
+-------+-----+----+

+-------+-----+----+
|country|count|code|
+-------+-----+----+
|  Japan|    5|   0|
+-------+-----+----+



In [24]:
merge.select("count").agg({'count' : 'avg'}).show()

+-----------------+
|       avg(count)|
+-----------------+
|9.022727272727273|
+-----------------+

