## Pandas exercises
#### The first part oftutorial is intended for windows users, which failed / not able to install the VM in the course "big-data-integration-processing" in coursera. This part will focus on how to load data with pySpark on windows, and focus on ever 
#### 

In [1]:
from pyspark import SparkConf, SparkContext, SQLContext
# Create a spark configuration with 20 threads.
# This code will run locally on master
conf = (SparkConf ()
        . setMaster("local[20]")
        . setAppName("sample app for reading streaming sources")
        . set("spark.executor.memory", "2g"))

sc = SparkContext(conf=conf)

In [2]:
import os
# absolute path to this file
cwd = os.getcwd()
sqlContext = SQLContext(sc)

#df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('final-project/country-list.csv')
country_lines = sc.textFile(os.path.join (cwd, "big-data-3/final-project/country-list.csv"))

In [3]:
# Convert each line into a pair of words
words = country_lines.map(lambda line: line.split(","))
#words = country_lines.flatMap(lambda line: line.split(","))


In [4]:
# Convert each pair of words into a tuple
#Assign initial count value to each word. Next, we will create tuples for each word with an initial count of 1
#country_tuples = words.map(lambda word : (word, 1))

In [5]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(words, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code=' AFG'),
 Row(country='Albania', code=' ALB'),
 Row(country='Algeria', code=' ALG')]

In [6]:
# Read tweets CSV file into RDD of lines
tweet_lines = sc.textFile(os.path.join (cwd, "big-data-3/final-project/tweets.csv"))


In [7]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
#Method 1
tweet_lines_clean = tweet_lines.filter(lambda row: row != '') 

In [8]:
tweet_lines_clean.count()

13391

In [9]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
tweet_words = tweet_lines_clean.flatMap(lambda line: line.split(" "))
tweet_tuples = tweet_words.map(lambda word : (word, 1))
tweet_counts = tweet_tuples.reduceByKey(lambda a,b : (a+b)).sortBy(lambda a: a[1], False)

In [24]:
# Create the DataFrame of tweet word count
import pandas as pd
tweetDF = sqlContext.createDataFrame(tweet_counts, ["word", "count"])
#tweetDF["count"] = pd.to_numeric(tweetDF["count"])
tweetDF.printSchema()
tweetDF.take(5)

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = true)



[Row(word='to', count=5808),
 Row(word='RT', count=4257),
 Row(word='the', count=4245),
 Row(word='when', count=3906),
 Row(word='FIFA', count=3339)]

In [26]:
# Join the country and tweet data frames (on the appropriate column)
join_table = tweetDF.join(countryDF, tweetDF.word == countryDF.country, how='inner').sort(desc("count"))
#tweetDF["count"] = pd.to_numeric(tweetDF["count"])
join_table.printSchema()
join_table.show()

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = true)
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)

+-----------+-----+-----------+----+
|       word|count|    country|code|
+-----------+-----+-----------+----+
|     Norway|   52|     Norway| NOR|
|    Nigeria|   49|    Nigeria| NGA|
|     France|   42|     France| FRA|
|   Slovakia|   30|   Slovakia| SVK|
|    England|   25|    England| ENG|
|    Germany|   20|    Germany| GER|
|      Wales|   19|      Wales| WAL|
|     Russia|   15|     Russia| RUS|
|     Brazil|   13|     Brazil| BRA|
|Netherlands|   13|Netherlands| NED|
|     Canada|   11|     Canada| CAN|
|Switzerland|   10|Switzerland| SUI|
|       Chad|    9|       Chad| CHA|
|   Portugal|    8|   Portugal| POR|
|      Spain|    8|      Spain| ESP|
|     Guinea|    8|     Guinea| GUI|
|     Jordan|    6|     Jordan| JOR|
|       Iraq|    6|       Iraq| IRQ|
|    Georgia|    5|    Georgia| GEO|
|      Japan|    5|      Japan| JP

In [12]:
# Question 1: number of distinct countries mentioned
join_table.count()

44

In [19]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
#join_table.rdd.map(lambda x: (1,x[1])).reduceByKey(lambda x,y: x + y).collect()[0][1]
join_table.describe()

DataFrame[summary: string, word: string, count: string, country: string, code: string]

In [14]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
join_table.show()

DataFrame[word: string, count: bigint, country: string, code: string]

In [15]:
# Table 2: counts for Wales, Iceland, and Japan.
