In [1]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1680417626835_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('s3://twitterwordcount/Country.csv')
country_lines.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['Afghanistan, AFG', 'Albania, ALB', 'Algeria, ALG', 'American Samoa, ASA', 'Andorra, AND']

In [7]:
# Convert each line into a pair of words
pair_of_words = country_lines.flatMap(lambda line: line.split("\n"))
pair_of_words.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['Afghanistan, AFG', 'Albania, ALB', 'Algeria, ALG', 'American Samoa, ASA', 'Andorra, AND']

In [8]:
# Convert each pair of words into a tuple
country_tuples = pair_of_words.map(lambda word: (word.split(',') [0], word.split(', ')[1]))
country_tuples.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('Afghanistan', 'AFG'), ('Albania', 'ALB'), ('Algeria', 'ALG'), ('American Samoa', 'ASA'), ('Andorra', 'AND')]

In [9]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)

[Row(country='Afghanistan', code='AFG'), Row(country='Albania', code='ALB'), Row(country='Algeria', code='ALG')]

In [10]:
# Read tweets CSV file into RDD of lines
tweet_texts = sc.textFile('s3://twitterwordcount/Tweet.csv')
tweet_texts.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

13995

In [11]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
non_empty_tweet_texts = tweet_texts.filter(lambda x : len(x) > 0)
non_empty_tweet_texts.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

13391

In [12]:
# Perform WordCount on the cleaned tweet texts.
tweet_words = non_empty_tweet_texts.flatMap(lambda line: line.split(" "))
tweet_tuples  = tweet_words.map(lambda word : (word, 1))
word_counts = tweet_tuples.reduceByKey(lambda a, b :(a + b) )
word_counts.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('tweet_text', 1), ('beat', 51), ('them', 70), ('10', 115), ('hours', 59)]

In [13]:
# Create the DataFrame of tweet word counts
tweetDF = sqlContext.createDataFrame(word_counts, ["word", "count"])
tweetDF.printSchema()
tweetDF.take(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = true)

[Row(word='tweet_text', count=1), Row(word='beat', count=51), Row(word='them', count=70), Row(word='10', count=115), Row(word='hours', count=59), Row(word='#FIFA16KING', count=27), Row(word='', count=3292), Row(word='https://t.co/BFnV6jfkBL', count=27), Row(word='@Louis_Tomlinson', count=3), Row(word='@socceraid', count=3)]

In [14]:
# Join the country and tweet data frames (on the appropriate column)
from pyspark.sql.functions import col 
joinedDF = countryDF.alias('c').join(tweetDF.alias('t'),col('c.country') == col('t.word')).select(col('c.code'), col('c.country'), col('t.count'))
joinedDF.printSchema()
joinedDF.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- count: long (nullable = true)

[Row(code='ARG', country='Argentina', count=2), Row(code='ALB', country='Albania', count=1), Row(code='ISR', country='Israel', count=2), Row(code='GER', country='Germany', count=20), Row(code='IND', country='India', count=4)]

In [15]:
# Question 1: number of distinct countries mentioned
joinedDF.count(), joinedDF.select('code').distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(44, 44)

In [16]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
joinedDF.agg(sum("count")).first()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(sum(count)=397)

In [17]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
descSorted = joinedDF.sort(desc("count"))
descSorted.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------+-----+
|code| country|count|
+----+--------+-----+
| NOR|  Norway|   52|
| NGA| Nigeria|   49|
| FRA|  France|   42|
| SVK|Slovakia|   30|
| ENG| England|   25|
+----+--------+-----+
only showing top 5 rows

In [18]:
# Table 2: counts for Wales, Iceland, and Japan.
selectedDF = joinedDF.where((col("country") == "Wales") | (col("country") == "Iceland") | (col("country") == "Japan"))
selectedDF.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+-----+
|code|country|count|
+----+-------+-----+
| JPN|  Japan|    5|
| WAL|  Wales|   19|
| ISL|Iceland|    2|
+----+-------+-----+

In [19]:
selectedDF = joinedDF.where((col("country") == "Kenya") | (col("country") == "Wales") |(col("country") == "Netherlands")).sort(desc("count"))
selectedDF.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+-----+
|code|    country|count|
+----+-----------+-----+
| WAL|      Wales|   19|
| NED|Netherlands|   13|
| KEN|      Kenya|    3|
+----+-----------+-----+

In [20]:
from pyspark.sql.functions import avg 
joinedDF.agg(avg("count")).first()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(avg(count)=9.022727272727273)