In [1]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [18]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')

In [14]:
# Convert each line into a pair of words
words = country_lines.map(lambda line : line.split(", ")).filter(lambda line: len(line)>1)
words.take(3)

[['Afghanistan', 'AFG'], ['Albania', 'ALB'], ['Algeria', 'ALG']]

In [15]:
# Convert each pair of words into a tuple
country_tuples = words.map(lambda word : (word[0], word[1]))
country_tuples.take(2)

[('Afghanistan', 'AFG'), ('Albania', 'ALB')]

In [16]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code='AFG'),
 Row(country='Albania', code='ALB'),
 Row(country='Algeria', code='ALG')]

In [21]:
# Read tweets CSV file into RDD of lines
tweet_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/users.csv')
tweet_lines.take(2)

['tweet_text',
 'RT @ochocinco: I beat them all for 10 straight hours #FIFA16KING  https://t.co/BFnV6jfkBL']

In [23]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
tweet_filtered_lines = tweet_lines.filter(lambda line: len(line)>0)

In [27]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
tweet_words = tweet_filtered_lines.flatMap(lambda line: line.split(" ")).filter(lambda line: len(line)>0)
tuples = tweet_words.map(lambda word : (word,1))
counts = tuples.reduceByKey(lambda a,b: (a+b))
counts.take(5)

[('https://t.co/fQftAwGAad', 1),
 ('mobile', 1),
 ('#FridayNightTouchdown', 1),
 ('circle', 7),
 ('#thfc', 2)]

In [28]:
# Create the DataFrame of tweet word counts
tweetDF = sqlContext.createDataFrame(tuples, ["word", "count"])
tweetDF.printSchema()
tweetDF.take(3)

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = true)



[Row(word='tweet_text', count=1),
 Row(word='RT', count=1),
 Row(word='@ochocinco:', count=1)]

In [31]:
# Join the country and tweet data frames (on the appropriate column)
joinDF = countryDF.join(tweetDF, countryDF.country == tweetDF.word)
joinDF.take(3)

[Row(country='Thailand', code='THA', word='Thailand', count=1),
 Row(country='Iceland', code='ISL', word='Iceland', count=1),
 Row(country='Iceland', code='ISL', word='Iceland', count=1)]

In [39]:
# Question 1: number of distinct countries mentioned
joinDF.distinct().count()

44

In [55]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
b = joinDF.agg(sum("count").alias("country_sum"))
b.show()

+-----------+
|country_sum|
+-----------+
|        397|
+-----------+



In [90]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
c = joinDF.groupBy("country").agg(sum("count").alias("count_sum")).sort(desc("count_sum"))
c.take(5)

[Row(country='Norway', count_sum=52),
 Row(country='Nigeria', count_sum=49),
 Row(country='France', count_sum=42),
 Row(country='Slovakia', count_sum=30),
 Row(country='England', count_sum=25)]

In [93]:
# Table 2: counts for Wales, Iceland, and Japan.
c.filter(joinDF.country == "Wales").collect()

[Row(country='Wales', count_sum=19)]

In [89]:
c.groupBy().mean("count_sum").collect()

[Row(avg(count_sum)=9.022727272727273)]