In [3]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [4]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')

In [5]:
# Convert each line into a pair of words
country_pair=country_lines.map(lambda line:line.split(","))
country_pair.take(5)

[['Afghanistan', ' AFG'],
 ['Albania', ' ALB'],
 ['Algeria', ' ALG'],
 ['American Samoa', ' ASA'],
 ['Andorra', ' AND']]

In [6]:
# Convert each pair of words into a tuple
country_tuples=country_pair.map(lambda line:tuple(line))
country_tuples.take(5)

[('Afghanistan', ' AFG'),
 ('Albania', ' ALB'),
 ('Algeria', ' ALG'),
 ('American Samoa', ' ASA'),
 ('Andorra', ' AND')]

In [7]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code=' AFG'),
 Row(country='Albania', code=' ALB'),
 Row(country='Algeria', code=' ALG')]

In [13]:
# Read tweets CSV file into RDD of lines
tweet_lines=sc.textFile("file:///home/cloudera/Downloads/big-data-3/mongodb/tweet_text.csv")

In [14]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
tweet_lines=tweet_lines.filter(lambda line:line)

In [17]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
words=tweet_lines.flatMap(lambda line:line.split(" "))
tuples=words.map(lambda word:(word,1))
counts=tuples.reduceByKey(lambda a,b:a+b)

In [19]:
# Create the DataFrame of tweet word counts
tweetDF=sqlContext.createDataFrame(counts,["country","count"])
tweetDF.printSchema()
tweetDF.take(3)

root
 |-- country: string (nullable = true)
 |-- count: long (nullable = true)



[Row(country='', count=3292),
 Row(country='mobile', count=1),
 Row(country='#FridayNightTouchdown', count=1)]

In [20]:
# Join the country and tweet data frames (on the appropriate column)
merge=tweetDF.join(countryDF,'country')

In [21]:
# Question 1: number of distinct countries mentioned
merge.distinct().count()

44

In [22]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
merge.select(sum('count')).show()

+----------+
|sum(count)|
+----------+
|       397|
+----------+



In [30]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
merge.orderBy(desc("count")).show(3)

+-------+-----+----+
|country|count|code|
+-------+-----+----+
| Norway|   52| NOR|
|Nigeria|   49| NGA|
| France|   42| FRA|
+-------+-----+----+
only showing top 3 rows



In [27]:
# Table 2: counts for Wales, Iceland, and Japan.
merge.filter((merge.country=='Wales')|(merge.country=='Iceland')|(merge.country=='Japan')).show()

+-------+-----+----+
|country|count|code|
+-------+-----+----+
|Iceland|    2| ISL|
|  Wales|   19| WAL|
|  Japan|    5| JPN|
+-------+-----+----+



In [28]:
#Quize Question 5
merge.filter((merge.country=='Kenya')|(merge.country=='Wales')|(merge.country=='Netherlands')).show()

+-----------+-----+----+
|    country|count|code|
+-----------+-----+----+
|      Wales|   19| WAL|
|Netherlands|   13| NED|
|      Kenya|    3| KEN|
+-----------+-----+----+



In [29]:
#Quiz Question 6
from pyspark.sql.functions import mean
merge.select(mean('count')).show()

+-----------------+
|       avg(count)|
+-----------------+
|9.022727272727273|
+-----------------+

