# Soccer Tweet Analysis

Código utilizado para responder as questões durante uma das provas na especialização em Big Data.

In [1]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [2]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')

In [3]:
# Convert each line into a pair of words
words = country_lines.flatMap(lambda line : line.split(", "))

In [29]:
# Convert each pair of words into a tuple
tuples = country_lines.map(lambda line : (line.split(',')[0], line.split(',')[1])).collect()

In [30]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code=' AFG'),
 Row(country='Albania', code=' ALB'),
 Row(country='Algeria', code=' ALG')]

In [37]:
# Read tweets CSV file into RDD of lines
tweet_lines = sc.textFile('file:///home/cloudera/Desktop/tweet_texts')

In [38]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
tweet_clean=tweet_lines.filter(lambda x: len(x)>0) 

In [39]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
t_words = tweet_clean.flatMap(lambda line : line.split(" "))
t_tuples = t_words.map(lambda word : (word,1))
counts = t_tuples.reduceByKey(lambda a, b: (a + b))

In [44]:
# Create the DataFrame of tweet word counts
countDF = sqlContext.createDataFrame(counts, ["word", "count"])
countDF.printSchema()
countDF.take(3)

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = true)



[Row(word='', count=3292),
 Row(word='https://t.co/fQftAwGAad', count=1),
 Row(word='mobile', count=1)]

In [50]:
# Join the country and tweet DataFrames (on the appropriate column)
merge = countryDF.join(countDF, countryDF.country == countDF.word)
merge.printSchema()
merge.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)
 |-- word: string (nullable = true)
 |-- count: long (nullable = true)



[Row(country='Thailand', code=' THA', word='Thailand', count=1),
 Row(country='Iceland', code=' ISL', word='Iceland', count=2),
 Row(country='Mexico', code=' MEX', word='Mexico', count=1)]

In [53]:
# Question 1: number of distinct countries mentioned
merge.filter(merge["count"] > 0).distinct().count()

44

In [54]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
merge.select(sum('count')).show()

+----------+
|sum(count)|
+----------+
|       397|
+----------+



In [56]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
merge.sort(desc("count")).show()

+-----------+----+-----------+-----+
|    country|code|       word|count|
+-----------+----+-----------+-----+
|     Norway| NOR|     Norway|   52|
|    Nigeria| NGA|    Nigeria|   49|
|     France| FRA|     France|   42|
|   Slovakia| SVK|   Slovakia|   30|
|    England| ENG|    England|   25|
|    Germany| GER|    Germany|   20|
|      Wales| WAL|      Wales|   19|
|     Russia| RUS|     Russia|   15|
|     Brazil| BRA|     Brazil|   13|
|Netherlands| NED|Netherlands|   13|
|     Canada| CAN|     Canada|   11|
|Switzerland| SUI|Switzerland|   10|
|       Chad| CHA|       Chad|    9|
|     Guinea| GUI|     Guinea|    8|
|      Spain| ESP|      Spain|    8|
|   Portugal| POR|   Portugal|    8|
|       Iraq| IRQ|       Iraq|    6|
|     Jordan| JOR|     Jordan|    6|
|      Japan| JPN|      Japan|    5|
|    Austria| AUT|    Austria|    5|
+-----------+----+-----------+-----+
only showing top 20 rows



In [58]:
# Table 2: counts for Wales, Iceland, and Japan.
from pyspark.sql.functions import *
merge.select(mean('count')).show()

+-----------------+
|       avg(count)|
+-----------------+
|9.022727272727273|
+-----------------+

