In [1]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [4]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')


In [14]:
# Convert each line into a pair of words

mapped = country_lines.flatMap(lambda line : line.split("\n"))
mapped.take(5)

['Afghanistan, AFG',
 'Albania, ALB',
 'Algeria, ALG',
 'American Samoa, ASA',
 'Andorra, AND']

In [16]:
# Convert each pair of words into a tuple
tupled = mapped.map(lambda tup: (tup.split(",")[0], tup.split(",")[1]))
tupled.take(5)

[('Afghanistan', ' AFG'),
 ('Albania', ' ALB'),
 ('Algeria', ' ALG'),
 ('American Samoa', ' ASA'),
 ('Andorra', ' AND')]

In [18]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(tupled, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code=' AFG'),
 Row(country='Albania', code=' ALB'),
 Row(country='Algeria', code=' ALG')]

In [27]:
# Read tweets CSV file into RDD of lines
tweetlines = sc.textFile('file:///home/cloudera/Downloads/tweets.csv')
tweetlines.take(1)
tweetlines.count()

13995

In [28]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
removed_nas = tweetlines.filter(lambda tweet: len(tweet) > 0)
removed_nas.take(2)
removed_nas.count()

13391

In [32]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
splitted = removed_nas.flatMap(lambda line: line.split(" "))
tupled = splitted.map(lambda word: (word,1))
word_count = tupled.reduceByKey(lambda one,two: (one+two))
word_count.take(4)

[('', 3292),
 ('https://t.co/fQftAwGAad', 1),
 ('mobile', 1),
 ('#FridayNightTouchdown', 1)]

In [34]:
# Create the DataFrame of tweet word counts
dataframe = sqlContext.createDataFrame(word_count,["word", "count"])
dataframe.printSchema()


root
 |-- word: string (nullable = true)
 |-- count: long (nullable = true)



In [36]:
dataframe.head(3)

[Row(word='', count=3292),
 Row(word='https://t.co/fQftAwGAad', count=1),
 Row(word='mobile', count=1)]

In [39]:
from pyspark.sql.functions import col
# Join the country and tweet data frames (on the appropriate column)
joined = countryDF.alias("country").join(dataframe.alias("tweets"), col('country.country')  == col('tweets.word')).select(col("country.code"), col("country.country"), col("tweets.count"))
joined.take(5)

[Row(code=' THA', country='Thailand', count=1),
 Row(code=' ISL', country='Iceland', count=2),
 Row(code=' MEX', country='Mexico', count=1),
 Row(code=' WAL', country='Wales', count=19),
 Row(code=' DEN', country='Denmark', count=1)]

In [41]:
# Question 1: number of distinct countries mentioned
print(joined.count())

44


In [42]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
joined.agg(sum("count")).first()

Row(sum(count)=397)

In [45]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
descending = joined.sort(desc("count"))
descending.take(3)

[Row(code=' NOR', country='Norway', count=52),
 Row(code=' NGA', country='Nigeria', count=49),
 Row(code=' FRA', country='France', count=42)]

In [73]:
#How many times was France mentioned in a tweet
france = joined.where((col("country") =="France"))
france.show()


+----+-------+-----+
|code|country|count|
+----+-------+-----+
| FRA| France|   42|
+----+-------+-----+



In [67]:
# Table 2: counts for Kenya, Wales, and Netherlands.
a = joined.where((col("country") =="Kenya"))
a.show()

+----+-------+-----+
|code|country|count|
+----+-------+-----+
| KEN|  Kenya|    3|
+----+-------+-----+



In [70]:
b = joined.where((col("country") =="Wales"))
b.show()

+----+-------+-----+
|code|country|count|
+----+-------+-----+
| WAL|  Wales|   19|
+----+-------+-----+



In [69]:
c = joined.where((col("country") =="Netherlands"))
c.show()

+----+-----------+-----+
|code|    country|count|
+----+-----------+-----+
| NED|Netherlands|   13|
+----+-----------+-----+



In [71]:
#All together
joined_count = joined.where((col("country") =="Kenya")|(col("country") =="Wales")|(col("country") =="Netherlands"))
joined_count = joined_count.sort(desc("count"))

In [66]:
joined_count.show()

+----+-------+-----+
|code|country|count|
+----+-------+-----+
| WAL|  Wales|   19|
| JPN|  Japan|    5|
| ISL|Iceland|    2|
+----+-------+-----+



In [74]:
from pyspark.sql.functions import avg
joined.agg(avg("count")).first()

Row(avg(count)=9.022727272727273)