## Read Parquet and Normalize

The goal of this script is to ingest the Parquet files and generate an artificial key to facilitate a join

#### Imports

In [1]:
import os 

# Move the execution of the folder up one directory - for importing functions
os.chdir('..')

from pyspark.sql import SparkSession
from etl.read_normalize import ingest_parquet, create_country_ids, create_country_code_ids, join_country_and_olympics, \
    fuzzy_match



In [2]:
spark = SparkSession.builder.appName("OlympicCountryDataPipeline").getOrCreate()

# Setting log level from warn to error - suppressing warnings regarding window specs without partitioning
spark.sparkContext.setLogLevel("ERROR")

25/01/16 07:06:44 WARN Utils: Your hostname, Coles-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.235 instead (on interface en0)
25/01/16 07:06:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/16 07:06:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Reading parquet files

In [3]:
# Declare countries path and ingest
countries_input = "datasets/countries.parquet"
df_countries = ingest_parquet(input_path = countries_input, spark = spark)

# Declare olympics path and ingest
olympics_input = "datasets/olympic_combined.parquet"
df_olympics = ingest_parquet(input_path = olympics_input, spark = spark)

                                                                                

In [4]:
df_countries.show(10)

                                                                                

+------------------+--------------------+-----------+----------+---------------------+--------------------------+-------------+--------------------------------+--------------+----------------+---------------+--------------+-------------+-------------+-------+---------+---------+-----------+--------+-------+
|      Country_Name|              Region| Population|Area_sq_mi|Pop_Density_per_sq_mi|Coastline_coast_area_ratio|Net_migration|Infant_mortality_per_1000_births|GDP_per_capita|Literacy_percent|Phones_per_1000|Arable_percent|Crops_percent|Other_percent|Climate|Birthrate|Deathrate|Agriculture|Industry|Service|
+------------------+--------------------+-----------+----------+---------------------+--------------------------+-------------+--------------------------------+--------------+----------------+---------------+--------------+-------------+-------------+-------+---------+---------+-----------+--------+-------+
|      Afghanistan |ASIA (EX. NEAR EA...|3.1056997E7|  647500.0|         

#### Country Code Mapping

The below code is not necessary to run in this script, as the join_country_and_olympics function calls these functions within. This is included to show what the intermediate id tables look like.

Importantly, these functions sort the distinct country names and country codes alphabetically and assign a monotonically increasing integer to each as an ID. Since there are more country names than distinct country codes, this alphabetic matching is not very accurate in terms of the resulting join; however, it showcases the desired goal of any join, which is to have a common mapping set. See further down in the script for a more dynamic way of joining. 

In [5]:
# Add Country_ID to df_countries
df_countries_id = create_country_ids(df_countries)

# Add Country_Code_ID to df_olympics
df_olympics_id = create_country_code_ids(df_olympics)


In [6]:
df_olympics_id.show(5)
df_countries_id.show(5)

                                                                                

+------------+---------------+
|Country_Code|Country_Code_ID|
+------------+---------------+
|         AFG|              1|
|         AIN|              2|
|         ALB|              3|
|         ALG|              4|
|         ARG|              5|
+------------+---------------+
only showing top 5 rows

+---------------+----------+
|   Country_Name|Country_ID|
+---------------+----------+
|   Afghanistan |         1|
|       Albania |         2|
|       Algeria |         3|
|American Samoa |         4|
|       Andorra |         5|
+---------------+----------+
only showing top 5 rows



## Joining countries and olympics

The below script uses the join function on the original tables to join the tables together on the monotonically increasing ID's. Within this function, the above ID functions are called and the resulting join generated.

In [7]:
df_countries_olympics = join_country_and_olympics(df_countries, df_olympics)

In [8]:
df_countries_olympics.show(10)

+------------+----+------+------+-----+----+---------------+-----------------+--------------------+-----------+----------+---------------------+--------------------------+-------------+--------------------------------+--------------+----------------+---------------+--------------+-------------+-------------+-------+---------+---------+-----------+--------+-------+----------+
|Country_Code|Gold|Silver|Bronze|Total|Year|Country_Code_ID|     Country_Name|              Region| Population|Area_sq_mi|Pop_Density_per_sq_mi|Coastline_coast_area_ratio|Net_migration|Infant_mortality_per_1000_births|GDP_per_capita|Literacy_percent|Phones_per_1000|Arable_percent|Crops_percent|Other_percent|Climate|Birthrate|Deathrate|Agriculture|Industry|Service|Country_ID|
+------------+----+------+------+-----+----+---------------+-----------------+--------------------+-----------+----------+---------------------+--------------------------+-------------+--------------------------------+--------------+-----------

## Extra - Fuzzy Matching! For Fun

The below code shows how you could create your own dictionary to pair 3 letter country codes to their names. This is not perfect, but gets closer to the actual result expected!

In [10]:
## Example of a way to use fuzzy match to get closer to the real country combinations

df_countries_olympics_fuzzy = fuzzy_match(
    spark,
    df_countries,
    df_olympics
)

df_countries_olympics_fuzzy.show(15)

+------------+----+------+------+-----+----+------------+--------------------+-----------+----------+---------------------+--------------------------+-------------+--------------------------------+--------------+----------------+---------------+--------------+-------------+-------------+-------+---------+---------+-----------+--------+-------+
|Country_Code|Gold|Silver|Bronze|Total|Year|Country_Name|              Region| Population|Area_sq_mi|Pop_Density_per_sq_mi|Coastline_coast_area_ratio|Net_migration|Infant_mortality_per_1000_births|GDP_per_capita|Literacy_percent|Phones_per_1000|Arable_percent|Crops_percent|Other_percent|Climate|Birthrate|Deathrate|Agriculture|Industry|Service|
+------------+----+------+------+-----+----+------------+--------------------+-----------+----------+---------------------+--------------------------+-------------+--------------------------------+--------------+----------------+---------------+--------------+-------------+-------------+-------+---------+--

In [11]:
# Write the fuzzy joined data as parquet

df_countries_olympics_fuzzy.write.parquet("country_olympics_join.parquet")

                                                                                