In [1]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [2]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')

In [4]:
country_lines.take(1)

['Afghanistan, AFG']

In [5]:
# Convert each line into a pair of words
country_pairs = country_lines.map(lambda s: s.split(", "))

In [8]:
# Convert each pair of words into a tuple
country_tuples = country_pairs.map(lambda wd: tuple(wd))

In [9]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code='AFG'),
 Row(country='Albania', code='ALB'),
 Row(country='Algeria', code='ALG')]

In [13]:
# Read tweets CSV file into RDD of lines
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/mongodb/tweets.csv')

In [14]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
country_lines_ne = country_lines.filter(lambda x: bool(x))

In [15]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
words = country_lines_ne.flatMap(lambda x: x.split())
tuples = words.map(lambda w: (w, 1))
counts = tuples.reduceByKey(lambda a, b: a + b)

In [20]:
# Create the DataFrame of tweet word counts
wcDF = sqlContext.createDataFrame(counts, ["country", "count"])
wcDF.printSchema()
wcDF.take(3)

root
 |-- country: string (nullable = true)
 |-- count: long (nullable = true)



[Row(country='https://t.co/fQftAwGAad', count=1),
 Row(country='mobile', count=1),
 Row(country='#FridayNightTouchdown', count=1)]

In [22]:
# Join the country and tweet data frames (on the appropriate column)
merge = countryDF.join(wcDF, "country")

In [28]:
# Question 1: number of distinct countries mentioned
merge[merge["count"] > 0].select("country").distinct().count()

44

In [32]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum

merge.select(sum("count")).collect()

[Row(sum(count)=397)]

In [34]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc

merge.sort(desc("count")).take(3)

[Row(country='Norway', code='NOR', count=52),
 Row(country='Nigeria', code='NGA', count=49),
 Row(country='France', code='FRA', count=42)]

In [36]:
# Table 2: counts for Wales, Iceland, and Japan.
merge.filter(merge["country"].isin(["Kenya", "Wales", "Netherlands"])).collect()

[Row(country='Wales', code='WAL', count=19),
 Row(country='Netherlands', code='NED', count=13),
 Row(country='Kenya', code='KEN', count=3)]

In [39]:
from pyspark.sql.functions import mean

merge.select(mean("count")).collect()

[Row(avg(count)=9.022727272727273)]