In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')

In [None]:
# Convert each line into a pair of words
words = country_lines.map(lambda line : line.split(','))
words.take(3)

In [None]:
# Convert each pair of words into a tuple
country_tuples = words.map(lambda word : (word[0],word[1]))

In [None]:
# Create the DataFrame, look at schema and contents
from pyspark.sql.functions import lower, col

countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()

countryDF = countryDF.withColumn('country', lower(col('country')))
countryDF.show(3)

In [None]:
# Read tweets CSV file into RDD of lines
tweet_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/TweetExport.csv')
tweet_lines.take(3)

In [None]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
tweet_words = tweet_lines.flatMap(lambda line: line.split(','))
tweet_words = tweet_words.filter(lambda line: 'tweet_text' in line)
#tweet_words = tweet_words.filter(lambda line: line != '')
tweet_words = tweet_words.flatMap(lambda line: line.split('"tweet_text":'))
tweet_words = tweet_words.filter(lambda line: line != '')
tweet_words.take(5)

In [None]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
words = tweet_words.flatMap(lambda line: line.split(' '))
words = words.filter(lambda line: line != '')
tweet_tuples = words.map(lambda word: (word,1))
tweet_counts = tweet_tuples.reduceByKey(lambda a,b: (a+b))
tweet_counts.take(5)

In [None]:
# Create the DataFrame of tweet word counts
wordDF = sqlContext.createDataFrame(tweet_counts, ["country", "cuenta"])
wordDF.printSchema()

wordDF = wordDF.withColumn('country', lower(col('country')))

wordDF.show(3)

In [None]:
# Join the country and tweet data frames (on the appropriate column)
merge = countryDF.join(wordDF, 'country')
merge.printSchema()
merge.show(5)

In [None]:
# Question 1: number of distinct countries mentioned
len(merge.where('cuenta > 0').collect())

In [None]:
#Get all entries
merge.count()

In [None]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
merge.agg(sum("cuenta")).collect()[0][0]

In [None]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
merge.sort(desc('cuenta')).show(5)

In [None]:
# Table 2: counts for Wales, Iceland, and Japan.
merge.where('country="wales" OR country="kenya" OR country="netherlands"').show(3)

In [None]:
from pyspark.sql.functions import avg
merge.agg(avg("cuenta")).collect()[0][0]