In [1]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [21]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')
country_lines.take(5)


['Afghanistan, AFG',
 'Albania, ALB',
 'Algeria, ALG',
 'American Samoa, ASA',
 'Andorra, AND']

In [25]:
# Convert each line into a pair of words
pair_of_words = country_lines.flatMap(lambda line: line.split("\n"))
pair_of_words.take(5)

['Afghanistan, AFG',
 'Albania, ALB',
 'Algeria, ALG',
 'American Samoa, ASA',
 'Andorra, AND']

In [28]:
# Convert each pair of words into a tuple
country_tuples = pair_of_words.map(lambda word: (word.split(',') [0], word.split(', ')[1]))
country_tuples.take(5)

[('Afghanistan', 'AFG'),
 ('Albania', 'ALB'),
 ('Algeria', 'ALG'),
 ('American Samoa', 'ASA'),
 ('Andorra', 'AND')]

In [29]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code='AFG'),
 Row(country='Albania', code='ALB'),
 Row(country='Algeria', code='ALG')]

In [30]:
# Read tweets CSV file into RDD of lines
tweet_texts = sc.textFile('file:///home/cloudera/Desktop/tweets.csv')
tweet_texts.count()


13995

In [33]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
non_empty_tweet_texts = tweet_texts.filter(lambda x : len(x) > 0)
non_empty_tweet_texts.count()


13391

In [39]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
tweet_words = non_empty_tweet_texts.flatMap(lambda line: line.split(" "))
tweet_tuples  = tweet_words.map(lambda word : (word, 1))
word_counts = tweet_tuples.reduceByKey(lambda a, b :(a + b) )
word_counts.take(5)

[('', 3292),
 ('https://t.co/fQftAwGAad', 1),
 ('mobile', 1),
 ('#FridayNightTouchdown', 1),
 ('circle', 7)]

In [56]:
# Create the DataFrame of tweet word counts
tweetDF = sqlContext.createDataFrame(word_counts, ["word", "count"])
tweetDF.printSchema()
tweetDF.take(10)

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = true)



[Row(word='', count=3292),
 Row(word='https://t.co/fQftAwGAad', count=1),
 Row(word='mobile', count=1),
 Row(word='#FridayNightTouchdown', count=1),
 Row(word='circle', count=7),
 Row(word='#thfc', count=2),
 Row(word='reinstated', count=4),
 Row(word='like?"', count=1),
 Row(word='Bellow', count=1),
 Row(word='now"', count=1)]

In [119]:
# Join the country and tweet data frames (on the appropriate column)
from pyspark.sql.functions import col 
joinedDF = countryDF.alias('c').join(tweetDF.alias('t'),col('c.country') == col('t.word')).select(col('c.code'), col('c.country'), col('t.count'))
joinedDF.printSchema()
joinedDF.take(5)



root
 |-- code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- count: long (nullable = true)



[Row(code='THA', country='Thailand', count=1),
 Row(code='ISL', country='Iceland', count=2),
 Row(code='MEX', country='Mexico', count=1),
 Row(code='WAL', country='Wales', count=19),
 Row(code='DEN', country='Denmark', count=1)]

In [91]:
# Question 1: number of distinct countries mentioned
joinedDF.count(), joinedDF.select('code').distinct().count()



(44, 44)

In [112]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum

joinedDF.agg(sum("count")).first()



Row(sum(count)=397)

In [101]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
descSorted = joinedDF.sort(desc("count"))
descSorted.show(5)


+----+--------+-----+
|code| country|count|
+----+--------+-----+
| NOR|  Norway|   52|
| NGA| Nigeria|   49|
| FRA|  France|   42|
| SVK|Slovakia|   30|
| ENG| England|   25|
+----+--------+-----+
only showing top 5 rows



In [105]:
# Table 2: counts for Wales, Iceland, and Japan.
selectedDF = joinedDF.where((col("country") == "Wales") | (col("country") == "Iceland") | (col("country") == "Japan"))
selectedDF.show()


+----+-------+-----+
|code|country|count|
+----+-------+-----+
| ISL|Iceland|    2|
| WAL|  Wales|   19|
| JPN|  Japan|    5|
+----+-------+-----+



In [116]:
selectedDF = joinedDF.where((col("country") == "Kenya") | (col("country") == "Wales") |(col("country") == "Netherlands")).sort(desc("count"))
selectedDF.show()

+----+-----------+-----+
|code|    country|count|
+----+-----------+-----+
| WAL|      Wales|   19|
| NED|Netherlands|   13|
| KEN|      Kenya|    3|
+----+-----------+-----+



In [118]:
from pyspark.sql.functions import avg 

joinedDF.agg(avg("count")).first()

Row(avg(count)=9.022727272727273)