In [3]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [4]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')

In [5]:
# Convert each line into a pair of words
words = country_lines.map(lambda line: line.split(", "))
words.take(10)

[['Afghanistan', 'AFG'],
 ['Albania', 'ALB'],
 ['Algeria', 'ALG'],
 ['American Samoa', 'ASA'],
 ['Andorra', 'AND'],
 ['Angola', 'ANG'],
 ['Anguilla', 'AIA'],
 ['Antigua and Barbuda', 'ATG'],
 ['Argentina', 'ARG'],
 ['Armenia', 'ARM']]

In [6]:
# Convert each pair of words into a tuple
country_tuples = words.map(lambda word:tuple(word))
country_tuples.take(10)

[('Afghanistan', 'AFG'),
 ('Albania', 'ALB'),
 ('Algeria', 'ALG'),
 ('American Samoa', 'ASA'),
 ('Andorra', 'AND'),
 ('Angola', 'ANG'),
 ('Anguilla', 'AIA'),
 ('Antigua and Barbuda', 'ATG'),
 ('Argentina', 'ARG'),
 ('Armenia', 'ARM')]

In [7]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code='AFG'),
 Row(country='Albania', code='ALB'),
 Row(country='Algeria', code='ALG')]

In [8]:
# Read tweets CSV file into RDD of lines
#import csv
tweet_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/tweet.csv')\
.map(lambda line:line.split(","))\
.map(lambda line:line[8])
# tweet_lines = rdd.mapPartitions(lambda x: csv.reader(x))
#tweet_lines.count()

# tweet_lines = tweet_lines.mapPartitions(lambda x: csv.reader(x))



In [9]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
clean_tweet_lines = tweet_lines.filter(lambda x: len(x)>16)
#is not None).filter(lambda x: x[1] != "")
#clean_tweet_lines.take(3)
clean_tweet_lines.count()

11183

In [10]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
clean_tweet_lines.take(1)
wc_lines = clean_tweet_lines.flatMap(lambda line: line.split(" "))
wc_tuples = wc_lines.map(lambda word:(word, 1))
#wc_tuples.count()
wc_counts = wc_tuples.reduceByKey(lambda a, b: (a+b))
wc_df = wc_counts.coalesce(1)
wc_df.count()
#wc_df.take(3)

24215

In [12]:
# Create the DataFrame of tweet word counts
tweetCountDF = sqlContext.createDataFrame(wc_df, ["word", "count"])
tweetCountDF.take(3)

[Row(word='', count=1885),
 Row(word='mobile', count=1),
 Row(word='\\n*30', count=1)]

In [13]:
# Join the country and tweet data frames (on the appropriate column)
from pyspark.sql.functions import col
retDF = countryDF.alias('c').join(tweetCountDF.alias('t'), col('c.country') == col('t.word')).select(col('c.code'), col('c.country'), col('t.count'))
retDF.printSchema()
retDF.take(3)

root
 |-- code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- count: long (nullable = true)



[Row(code='THA', country='Thailand', count=1),
 Row(code='ISL', country='Iceland', count=2),
 Row(code='MEX', country='Mexico', count=1)]

In [14]:
# Question 1: number of distinct countries mentioned
retDF.count()

40

In [15]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
retDF.agg(sum("count")).first()

Row(sum(count)=312)

In [17]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
rank = retDF.sort(desc("count"))
rank.show(5)

+----+--------+-----+
|code| country|count|
+----+--------+-----+
| FRA|  France|   36|
| SVK|Slovakia|   30|
| NOR|  Norway|   28|
| NGA| Nigeria|   27|
| ENG| England|   26|
+----+--------+-----+
only showing top 5 rows



In [21]:
# Table 2: counts for Wales, Iceland, and Japan.
selectedDF = retDF.where((col("country") == "Iceland") | 
                         (col("country") == "Netherlands") |
                         (col("country") == "Wales"))
selectedDF.show()

+----+-----------+-----+
|code|    country|count|
+----+-----------+-----+
| ISL|    Iceland|    2|
| WAL|      Wales|   17|
| NED|Netherlands|    8|
+----+-----------+-----+

