In [1]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [2]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')

In [3]:
# Convert each line into a pair of words
country_words = country_lines.flatMap(lambda line : line.split("\n"))

In [4]:
# Convert each pair of words into a tuple
country_tuples = country_words.map(lambda word : (word.split( ',' )[0], word.split( ', ')[1]))

In [5]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code='AFG'),
 Row(country='Albania', code='ALB'),
 Row(country='Algeria', code='ALG')]

In [6]:
# Read tweets CSV file into RDD of lines
lines = sc.textFile('file:///home/cloudera/Desktop/tweet.csv')
lines.count()

13995

In [7]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter.
lines_non_empty = lines.filter(lambda x :len(x)>0)
lines_non_empty.count()

13391

In [8]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
words = lines_non_empty.flatMap(lambda line: line.split(" "))
tuples = words.map(lambda word: (word, 1))
counts = tuples.reduceByKey(lambda a, b: (a + b))
counts.take(5)

[('', 3292),
 ('https://t.co/fQftAwGAad', 1),
 ('mobile', 1),
 ('#FridayNightTouchdown', 1),
 ('circle', 7)]

In [9]:
# Create the DataFrame of tweet word counts
tweetDF = sqlContext.createDataFrame(counts, ["words", "counts"])
tweetDF.printSchema()
tweetDF.take(5)

root
 |-- words: string (nullable = true)
 |-- counts: long (nullable = true)



[Row(words='', counts=3292),
 Row(words='https://t.co/fQftAwGAad', counts=1),
 Row(words='mobile', counts=1),
 Row(words='#FridayNightTouchdown', counts=1),
 Row(words='circle', counts=7)]

In [10]:
# Join the country and tweet data frames (on the appropriate column)
from pyspark.sql.functions import col
joinedDF = countryDF.alias('c').join(tweetDF.alias('t'),col('c.country') == col('t.words'))
joinedDF.select(col('c.code'),col('c.country'),col('t.counts'))
joinedDF.printSchema()
joinedDF.take(5)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)
 |-- words: string (nullable = true)
 |-- counts: long (nullable = true)



[Row(country='Thailand', code='THA', words='Thailand', counts=1),
 Row(country='Iceland', code='ISL', words='Iceland', counts=2),
 Row(country='Mexico', code='MEX', words='Mexico', counts=1),
 Row(country='Wales', code='WAL', words='Wales', counts=19),
 Row(country='Denmark', code='DEN', words='Denmark', counts=1)]

In [11]:
# Question 1: number of distinct countries mentioned

joinedDF.count(), joinedDF.select('code').distinct().count()

(44, 44)

In [12]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
joinedDF.select(sum("counts")).show()


+-----------+
|sum(counts)|
+-----------+
|        397|
+-----------+



In [13]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
sorted = joinedDF.sort(desc("counts"))
sorted.show(3)


+-------+----+-------+------+
|country|code|  words|counts|
+-------+----+-------+------+
| Norway| NOR| Norway|    52|
|Nigeria| NGA|Nigeria|    49|
| France| FRA| France|    42|
+-------+----+-------+------+
only showing top 3 rows



In [14]:
# Table 2: counts for Wales, Iceland, and Japan.
df = joinedDF.where((col("country")=="Wales") | (col("country")=="Iceland") |(col("country")=="Japan")   )
df.show()

+-------+----+-------+------+
|country|code|  words|counts|
+-------+----+-------+------+
|Iceland| ISL|Iceland|     2|
|  Wales| WAL|  Wales|    19|
|  Japan| JPN|  Japan|     5|
+-------+----+-------+------+



In [15]:
df = joinedDF.where((col("country")=="Kenya") | (col("country")=="Wales") |(col("country")=="Netherlands")   )
df.show()

+-----------+----+-----------+------+
|    country|code|      words|counts|
+-----------+----+-----------+------+
|      Wales| WAL|      Wales|    19|
|Netherlands| NED|Netherlands|    13|
|      Kenya| KEN|      Kenya|     3|
+-----------+----+-----------+------+



In [16]:
from pyspark.sql.functions import *
joinedDF.select(mean('counts')).show()

+-----------------+
|      avg(counts)|
+-----------------+
|9.022727272727273|
+-----------------+

