# Week 6 - Learning By Doing: Putting Spark to work

# Assignment: Analysis using Spark

In [1]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [2]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')

In [3]:
country_lines.take(5)

['Afghanistan, AFG',
 'Albania, ALB',
 'Algeria, ALG',
 'American Samoa, ASA',
 'Andorra, AND']

In [4]:
# Convert each line into a pair of words
country_pair = country_lines.map(lambda line: tuple(line.split(', ')))
country_pair.take(5)

[('Afghanistan', 'AFG'),
 ('Albania', 'ALB'),
 ('Algeria', 'ALG'),
 ('American Samoa', 'ASA'),
 ('Andorra', 'AND')]

In [5]:
# Convert each pair of words into a tuple
country_tuples = country_lines.map(lambda line: line.split(', '))
country_tuples.take(5)

[['Afghanistan', 'AFG'],
 ['Albania', 'ALB'],
 ['Algeria', 'ALG'],
 ['American Samoa', 'ASA'],
 ['Andorra', 'AND']]

In [6]:
# Create the DataFrame, look at schema and contents
country_df = sqlContext.createDataFrame(country_tuples, ['country', 'code'])
country_df.printSchema()

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



In [7]:
country_df.take(3)

[Row(country='Afghanistan', code='AFG'),
 Row(country='Albania', code='ALB'),
 Row(country='Algeria', code='ALG')]

In [8]:
# Read tweets CSV file into RDD of lines
tweets_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/tweet-text.csv')

In [9]:
tweets_lines.take(5)

['tweet_text',
 'RT @ochocinco: I beat them all for 10 straight hours #FIFA16KING  https://t.co/BFnV6jfkBL',
 'RT @NiallOfficial: @Louis_Tomlinson @socceraid when I retired from playing because of my knee . I went and did my uefa A badges in Dublin',
 'RT @GameSeek: Follow & Retweet for your chance to win a copy of FIFA 17 Deluxe Edition (platform of your choice) in our #giveaway! https://…',
 '@CIVARAGI ...I was putting ffs but it autocorrected it too FIFA']

In [10]:
tweets_lines.count()

13995

In [11]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
tweets_words = tweets_lines.flatMap(lambda line: line.split(' '))
tweets_tuples = tweets_words.map(lambda word: (word, 1))
tweets_counts = tweets_tuples.reduceByKey(lambda a, b: (a + b))
tweets_counts.take(5)

[('', 3896),
 ('mobile', 1),
 ('#FridayNightTouchdown', 1),
 ('Just', 44),
 ('BONUSES,', 1)]

In [12]:
# Create the DataFrame of tweet word counts
tweet_df = sqlContext.createDataFrame(tweets_counts, ['word', 'count'])
tweet_df.printSchema()
tweet_df.take(3)

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = true)



[Row(word='', count=3896),
 Row(word='mobile', count=1),
 Row(word='#FridayNightTouchdown', count=1)]

In [13]:
# Join the country and tweet data frames (on the appropriate column)
join_df = tweet_df.join(country_df, tweet_df.word == country_df.country)
join_df.take(5)

[Row(word='Thailand', count=1, country='Thailand', code='THA'),
 Row(word='Iceland', count=2, country='Iceland', code='ISL'),
 Row(word='Mexico', count=1, country='Mexico', code='MEX'),
 Row(word='Wales', count=19, country='Wales', code='WAL'),
 Row(word='Denmark', count=1, country='Denmark', code='DEN')]

In [14]:
# Question 1: number of distinct countries mentioned
join_df.count()

44

In [15]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
join_df.select(sum('count')).show()

+----------+
|sum(count)|
+----------+
|       397|
+----------+



In [16]:
# Question 3: top three countries and their counts.
from pyspark.sql.functions import desc
join_df.sort(desc('count')).show(3)

+-------+-----+-------+----+
|   word|count|country|code|
+-------+-----+-------+----+
| Norway|   52| Norway| NOR|
|Nigeria|   49|Nigeria| NGA|
| France|   42| France| FRA|
+-------+-----+-------+----+
only showing top 3 rows



In [17]:
# Question 4: counts for Wales, Iceland, and Japan.
join_df[join_df.country == 'Wales'].show()
join_df[join_df.country == 'Iceland'].show()
join_df[join_df.country == 'Japan'].show()

+-----+-----+-------+----+
| word|count|country|code|
+-----+-----+-------+----+
|Wales|   19|  Wales| WAL|
+-----+-----+-------+----+

+-------+-----+-------+----+
|   word|count|country|code|
+-------+-----+-------+----+
|Iceland|    2|Iceland| ISL|
+-------+-----+-------+----+

+-----+-----+-------+----+
| word|count|country|code|
+-----+-----+-------+----+
|Japan|    5|  Japan| JPN|
+-----+-----+-------+----+



In [18]:
# Question 5: average number of times a country is mentioned in tweets.
from pyspark.sql.functions import mean
join_df.select(mean('count')).show()

+-----------------+
|       avg(count)|
+-----------------+
|9.022727272727273|
+-----------------+

