In [1]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [2]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')

In [22]:
# Convert each line into a pair of words
pairs = country_lines.map(lambda line: line.split(","))

In [23]:
# Convert each pair of words into a tuple
country_tuples = pairs.map(lambda words: (words[0], words[1]))

In [24]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code=' AFG'),
 Row(country='Albania', code=' ALB'),
 Row(country='Algeria', code=' ALG')]

In [40]:
# Read tweets CSV file into RDD of lines
tweets = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/tweet_text.csv')

In [47]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
filteredTweets = tweets.filter(lambda line: line != '' and line != None)

In [49]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
tweetsWord = filteredTweets.flatMap(lambda line: line.split(" "))
tweetsWord = tweetsWord.filter(lambda word: word != "" and word != None)
tuples = tweetsWord.map(lambda word: (word, 1))
counts = tuples.reduceByKey(lambda a, b: (a + b))

[('mobile', 1), ('#FridayNightTouchdown', 1), ('Just', 44)]

In [89]:
# Create the DataFrame of tweet word counts
dfCount = sqlContext.createDataFrame(counts, ["word", "count"])
dfCount.printSchema()
dfCount.show(3)

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = true)

+--------------------+-----+
|                word|count|
+--------------------+-----+
|              mobile|    1|
|#FridayNightTouch...|    1|
|                Just|   44|
+--------------------+-----+
only showing top 3 rows



TypeError: 'DataFrame' object is not callable

In [54]:
# Join the country and tweet data frames (on the appropriate column)
joined = dfCount.join(countryDF, dfCount.word == countryDF.country)
joined.show(3)

+--------+-----+--------+----+
|    word|count| country|code|
+--------+-----+--------+----+
|Thailand|    1|Thailand| THA|
| Iceland|    2| Iceland| ISL|
|  Mexico|    1|  Mexico| MEX|
+--------+-----+--------+----+
only showing top 3 rows



In [55]:
# Question 1: number of distinct countries mentioned
joined.count()

44

In [63]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
joined.select(sum("count")).show()

+----------+
|sum(count)|
+----------+
|       397|
+----------+



In [71]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
joined.orderBy(desc("count"), "country").show()

+-----------+-----+-----------+----+
|       word|count|    country|code|
+-----------+-----+-----------+----+
|     Norway|   52|     Norway| NOR|
|    Nigeria|   49|    Nigeria| NGA|
|     France|   42|     France| FRA|
|   Slovakia|   30|   Slovakia| SVK|
|    England|   25|    England| ENG|
|    Germany|   20|    Germany| GER|
|      Wales|   19|      Wales| WAL|
|     Russia|   15|     Russia| RUS|
|     Brazil|   13|     Brazil| BRA|
|Netherlands|   13|Netherlands| NED|
|     Canada|   11|     Canada| CAN|
|Switzerland|   10|Switzerland| SUI|
|       Chad|    9|       Chad| CHA|
|     Guinea|    8|     Guinea| GUI|
|   Portugal|    8|   Portugal| POR|
|      Spain|    8|      Spain| ESP|
|       Iraq|    6|       Iraq| IRQ|
|     Jordan|    6|     Jordan| JOR|
|    Austria|    5|    Austria| AUT|
|    Georgia|    5|    Georgia| GEO|
+-----------+-----+-----------+----+
only showing top 20 rows



In [77]:
# Table 2: counts for Wales, Iceland, and Japan.
joined.select("word", "count").where(joined.word == "France").show()

+------+-----+
|  word|count|
+------+-----+
|France|   42|
+------+-----+



In [90]:
joined.select(mean("count")).show()

+-----------------+
|       avg(count)|
+-----------------+
|9.022727272727273|
+-----------------+

