In [1]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [2]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')
country_lines.take(5)

['Afghanistan, AFG',
 'Albania, ALB',
 'Algeria, ALG',
 'American Samoa, ASA',
 'Andorra, AND']

In [3]:
# Convert each line into a pair of words
country_pairs = country_lines.map(lambda country_line : country_line.split(","))
country_pairs.take(5)

[['Afghanistan', ' AFG'],
 ['Albania', ' ALB'],
 ['Algeria', ' ALG'],
 ['American Samoa', ' ASA'],
 ['Andorra', ' AND']]

In [4]:
# Convert each pair of words into a tuple
country_tuples = country_pairs.map(lambda country_pair: (country_pair[0],country_pair[1]))
country_tuples.take(5)

[('Afghanistan', ' AFG'),
 ('Albania', ' ALB'),
 ('Algeria', ' ALG'),
 ('American Samoa', ' ASA'),
 ('Andorra', ' AND')]

In [5]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code=' AFG'),
 Row(country='Albania', code=' ALB'),
 Row(country='Algeria', code=' ALG')]

In [6]:
# Read tweets CSV file into RDD of lines
tweets = sc.textFile('file:///home/cloudera/Downloads/big-data-3/mongodb/export_twitter.csv')
tweets.count()

13995

In [7]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
tweets = tweets.filter(bool)
tweets.count()

13391

In [8]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
words = tweets.flatMap(lambda tweet: tweet.split(" "))
tuples = words.map(lambda word: (word,1))
tweet_word_counts = tuples.reduceByKey(lambda a,b: (a+b))
tweet_word_counts.take(5)

[('', 3292),
 ('https://t.co/fQftAwGAad', 1),
 ('mobile', 1),
 ('#FridayNightTouchdown', 1),
 ('circle', 7)]

In [9]:
# Create the DataFrame of tweet word counts
tweetDF = sqlContext.createDataFrame(tweet_word_counts, ["word", "count"])
tweetDF.printSchema()
tweetDF.take(10)

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = true)



[Row(word='', count=3292),
 Row(word='https://t.co/fQftAwGAad', count=1),
 Row(word='mobile', count=1),
 Row(word='#FridayNightTouchdown', count=1),
 Row(word='circle', count=7),
 Row(word='#thfc', count=2),
 Row(word='reinstated', count=4),
 Row(word='like?"', count=1),
 Row(word='Bellow', count=1),
 Row(word='now"', count=1)]

In [10]:
# Join the country and tweet data frames (on the appropriate column)
from pyspark.sql.functions import col 
joinedDF = countryDF.alias('c').join(tweetDF.alias('t'),
        col('c.country') == col('t.word')).select(col('c.code'),col('c.country'), col('t.count'))
joinedDF.printSchema()
joinedDF.take(5)

root
 |-- code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- count: long (nullable = true)



[Row(code=' THA', country='Thailand', count=1),
 Row(code=' ISL', country='Iceland', count=2),
 Row(code=' MEX', country='Mexico', count=1),
 Row(code=' WAL', country='Wales', count=19),
 Row(code=' DEN', country='Denmark', count=1)]

In [15]:
# Question 1: number of distinct countries mentioned
joinedDF.count()

44

In [29]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
joinedDF.agg(sum("count")).first()

Row(sum(count)=397)

In [31]:
# Average number of times a country is mentioned
from pyspark.sql.functions import mean
joinedDF.agg(mean("count")).first()

Row(avg(count)=9.022727272727273)

In [22]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
joinedDF.sort(desc("count")).take(3)

[Row(code=' NOR', country='Norway', count=52),
 Row(code=' NGA', country='Nigeria', count=49),
 Row(code=' FRA', country='France', count=42)]

In [30]:
# Table 2: counts for Wales, Kenya, and Netherlands.
joinedDF[(joinedDF["country"] == "Wales") |(joinedDF["country"] == "Kenya") | (joinedDF["country"] == "Netherlands")].take(3)

[Row(code=' WAL', country='Wales', count=19),
 Row(code=' NED', country='Netherlands', count=13),
 Row(code=' KEN', country='Kenya', count=3)]