# **Data Exploration**

In this notebook we'll look at one of the first elements involved in any data engineering project - getting to know what the inputs might look like.

This notebook was adapted from a Netflix data engineering workshop which you can find here: https://github.com/NFLX-WIBD/WIBD-Workshops-2018.


### Running Pyspark in Colab

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.3.2 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. Follow the steps to install the dependencies:

In [0]:
%%capture
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.0-preview2/spark-3.0.0-preview2-bin-hadoop3.2.tgz
!tar -xvf spark-3.0.0-preview2-bin-hadoop3.2.tgz  
!pip install -q findspark

Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Set the location of Java and Spark by running the following code:

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-preview2-bin-hadoop3.2"

Run a local spark session to test your installation:

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Congrats! Your Colab is ready to run Pyspark. Let's explore some data from the 

In [4]:
!unzip data.zip

Archive:  data.zip
   creating: data/
  inflating: data/.DS_Store          
   creating: __MACOSX/
   creating: __MACOSX/data/
  inflating: __MACOSX/data/._.DS_Store  
  inflating: data/WDICountry.csv     
  inflating: __MACOSX/data/._WDICountry.csv  
  inflating: data/WDIData.csv        
  inflating: __MACOSX/data/._WDIData.csv  
  inflating: data/WDISeries.csv      
  inflating: __MACOSX/data/._WDISeries.csv  


### Step 1 - What inputs do you have?

> Indented block



In [5]:
!ls data

WDICountry.csv	WDIData.csv  WDISeries.csv


### Step 2 - Get your tools setup

As part of this we will be using [PySpark](http://spark.apache.org/docs/2.1.1/api/python/index.html) to inspect the data on hand and also gather some basic details.

In [0]:
import pandas as pd

from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
#Helper for pretty formatting for Spark DataFrames
def showDF(df, limitRows =  20, truncate = True):
    if(truncate):
        pd.set_option('display.max_colwidth', 50)
    else:
        pd.set_option('display.max_colwidth', -1)
    pd.set_option('display.max_rows', limitRows)
    display(df.limit(limitRows).toPandas())
    pd.reset_option('display.max_rows')

In [0]:
#Creating a spark session
spark = SparkSession.builder.appName("Data Exploration").getOrCreate()

### Step 3 - Look inside your data

We need to look at how our data is composed:
1. Format
2. Structure
3. Size
4. Dimensions

In this example our input is a CSV file with a header.  Let's try to see what the data looks like

#### Read The Data

In [0]:
#Read the file into a Spark Data Frame
country = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("data/WDICountry.csv")

#### Inspect the schema of the file you just read

In [10]:
country.printSchema()

root
 |-- Country Code: string (nullable = true)
 |-- Short Name: string (nullable = true)
 |-- Table Name: string (nullable = true)
 |-- Long Name: string (nullable = true)
 |-- 2-alpha code: string (nullable = true)
 |-- Currency Unit: string (nullable = true)
 |-- Special Notes: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Income Group: string (nullable = true)
 |-- WB-2 code: string (nullable = true)
 |-- National accounts base year: string (nullable = true)
 |-- National accounts reference year: integer (nullable = true)
 |-- SNA price valuation: string (nullable = true)
 |-- Lending category: string (nullable = true)
 |-- Other groups: string (nullable = true)
 |-- System of National Accounts: string (nullable = true)
 |-- Alternative conversion factor: string (nullable = true)
 |-- PPP survey year: string (nullable = true)
 |-- Balance of Payments Manual in use: string (nullable = true)
 |-- External debt Reporting status: string (nullable = true)
 |-- Sys

#### Take a look at some sample data

You can run <dataframe>.show() to look at the sample data.  However the output is not well formatted so we will use our helper function to look at the data.

In [11]:
showDF(country, truncate = False)

Unnamed: 0,Country Code,Short Name,Table Name,Long Name,2-alpha code,Currency Unit,Special Notes,Region,Income Group,WB-2 code,National accounts base year,National accounts reference year,SNA price valuation,Lending category,Other groups,System of National Accounts,Alternative conversion factor,PPP survey year,Balance of Payments Manual in use,External debt Reporting status,System of trade,Government Accounting concept,IMF data dissemination standard,Latest population census,Latest household survey,Source of most recent Income and expenditure data,Vital registration complete,Latest agricultural census,Latest industrial data,Latest trade data,Latest water withdrawal data
0,ABW,Aruba,Aruba,Aruba,AW,Aruban florin,SNA data for 2000-2011 are updated from official government statistics; 1994-1999 from UN databases. Base year has changed from 1995 to 2000.,Latin America & Caribbean,High income,AW,2000,,Value added at basic prices (VAB),,,Country uses the 1993 System of National Accounts methodology,,2011,BPM5 (Converted into BPM6 by IMF),,General trade system,,Enhanced General Data Dissemination System (e-GDDS),2010,,,Yes,,,2016.0,
1,AFG,Afghanistan,Afghanistan,Islamic State of Afghanistan,AF,Afghan afghani,"Fiscal year end: March 20; reporting period for national accounts data is calendar year, estimated to insure consistency between national accounts and fiscal data. National accounts data are sourced from the IMF and differ from the Central Statistics Organization numbers due to exclusion of the opium economy.",South Asia,Low income,AF,2002/03,,Value added at basic prices (VAB),IDA,HIPC,Country uses the 1993 System of National Accounts methodology,,,BPM6,Actual,General trade system,Consolidated central government,Enhanced General Data Dissemination System (e-GDDS),1979,"Demographic and Health Survey, 2015","Integrated household survey (IHS), 2011",,,,2016.0,2000.0
2,AGO,Angola,Angola,People's Republic of Angola,AO,Angolan kwanza,,Sub-Saharan Africa,Lower middle income,AO,2002,,Value added at producer prices (VAP),IBRD,,Country uses the 1993 System of National Accounts methodology,1991–96,2011,BPM6,Actual,Special trade system,Budgetary central government,Enhanced General Data Dissemination System (e-GDDS),2014,"Demographic and Health Survey, 2015/16","Integrated household survey (IHS), 2008/09",,,,2016.0,2005.0
3,ALB,Albania,Albania,Republic of Albania,AL,Albanian lek,,Europe & Central Asia,Upper middle income,AL,Original chained constant price data are rescaled.,1996.0,Value added at basic prices (VAB),IBRD,,Country uses the 2008 System of National Accounts methodology,,Rolling,BPM6,Actual,Special trade system,Consolidated central government,Enhanced General Data Dissemination System (e-GDDS),2011,"Demographic and Health Survey, 2008/09","Living Standards Measurement Study Survey (LSMS), 2012",Yes,2012,2013.0,2016.0,2006.0
4,AND,Andorra,Andorra,Principality of Andorra,AD,Euro,WB-3 code changed from ADO to AND to align with ISO code.,Europe & Central Asia,High income,AD,2000,,Value added at basic prices (VAB),,,Country uses the 1993 System of National Accounts methodology,,,,,General trade system,,,2011. Population data compiled from administrative registers.,,,Yes,,,,
5,ARB,Arab World,Arab World,Arab World,1A,,Arab World aggregate. Arab World is composed of members of the League of Arab States.,,,1A,,,,,,,,,,,,,,,,,,,,2016.0,
6,ARE,United Arab Emirates,United Arab Emirates,United Arab Emirates,AE,U.A.E. dirham,,Middle East & North Africa,High income,AE,2010,,Value added at basic prices (VAB),,,Country uses the 1993 System of National Accounts methodology,,2011,,,Special trade system,Consolidated central government,Enhanced General Data Dissemination System (e-GDDS),2010,"World Health Survey, 2003",,,2012,1985.0,2016.0,2005.0
7,ARG,Argentina,Argentina,Argentine Republic,AR,Argentine peso,"National Institute of Statistics and Census revised national accounts from 2004-2015. Argentina, which was temporarily unclassified in July 2016 pending release of revised national accounts statistics, is classified as upper middle income for FY17 as of September 29, 2016.",,,,,,,,,,,,,,,,,,,,,,,,
8,ARM,Armenia,Armenia,Republic of Armenia,AM,Armenian dram,,Europe & Central Asia,Lower middle income,AM,Original chained constant price data are rescaled.,1996.0,Value added at basic prices (VAB),IBRD,,Country uses the 2008 System of National Accounts methodology,1990–95,2011,BPM6,Actual,General trade system,Consolidated central government,Special Data Dissemination Standard (SDDS),2011,"Demographic and Health Survey, 2015/16","Integrated household survey (IHS), 2015",Yes,2014,,2016.0,2012.0
9,ASM,American Samoa,American Samoa,American Samoa,AS,U.S. dollar,New base Year 2009,East Asia & Pacific,Upper middle income,AS,2009,,,,,Country uses the 2008 System of National Accounts methodology,,2011 (household consumption only).,,,Special trade system,,,2010,,,Yes,2008,,2016.0,


#### Get Some Basic Stats

In [12]:
#Count the number of records in the dataframe
country.count()

263

#### Examining Dimensions
##### How many different regions do the various countries belong to ?

In [13]:
showDF(country.select('Region').distinct(), truncate = False)

Unnamed: 0,Region
0,Latin America & Caribbean
1,South Asia
2,Sub-Saharan Africa
3,Europe & Central Asia
4,
5,Middle East & North Africa
6,East Asia & Pacific
7,North America


##### How many different income groups do we have across countries?

In [14]:
showDF(country.select('Income Group').distinct(), truncate = False)

Unnamed: 0,Income Group
0,High income
1,Low income
2,Lower middle income
3,Upper middle income
4,


#### By applying the same steps as we did for the "WDICountry.csv" dataset, we can see what the rest of the datasets look like

###### WDISeries.csv

In [0]:
series = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("data/WDISeries.csv")

In [16]:
series.printSchema()

root
 |-- Series Code: string (nullable = true)
 |-- Topic: string (nullable = true)
 |-- Indicator Name: string (nullable = true)
 |-- Short definition: string (nullable = true)
 |-- Long definition: string (nullable = true)
 |-- Unit of measure: string (nullable = true)
 |-- Periodicity: string (nullable = true)
 |-- Base Period: string (nullable = true)
 |-- Other notes: string (nullable = true)
 |-- Aggregation method: string (nullable = true)
 |-- Limitations and exceptions: string (nullable = true)
 |-- Notes from original source: string (nullable = true)
 |-- General comments: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Statistical concept and methodology: string (nullable = true)
 |-- Development relevance: string (nullable = true)
 |-- Related source links: string (nullable = true)
 |-- Other web links: string (nullable = true)
 |-- Related indicators: string (nullable = true)
 |-- License Type: string (nullable = true)
 |-- _c20: string (nullable = tru

In [17]:
showDF(series)

Unnamed: 0,Series Code,Topic,Indicator Name,Short definition,Long definition,Unit of measure,Periodicity,Base Period,Other notes,Aggregation method,Limitations and exceptions,Notes from original source,General comments,Source,Statistical concept and methodology,Development relevance,Related source links,Other web links,Related indicators,License Type,_c20
0,AG.AGR.TRAC.NO,Environment: Agricultural production,"Agricultural machinery, tractors",,Agricultural machinery refers to the number of...,,Annual,,,Sum,The data are collected by the Food and Agricul...,,,"Food and Agriculture Organization, electronic ...",A tractor provides the power and traction to m...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,
1,AG.CON.FERT.PT.ZS,Environment: Agricultural production,Fertilizer consumption (% of fertilizer produc...,,Fertilizer consumption measures the quantity o...,,Annual,,,Weighted average,The FAO has revised the time series for fertil...,,,"Food and Agriculture Organization, electronic ...",Fertilizer consumption measures the quantity o...,"Factors such as the green revolution, has led ...",,,,CC BY-4.0,
2,AG.CON.FERT.ZS,Environment: Agricultural production,Fertilizer consumption (kilograms per hectare ...,,Fertilizer consumption measures the quantity o...,,Annual,,,Weighted average,The FAO has revised the time series for fertil...,,,"Food and Agriculture Organization, electronic ...",Fertilizer consumption measures the quantity o...,"Factors such as the green revolution, has led ...",,,,CC BY-4.0,
3,AG.LND.AGRI.K2,Environment: Land use,Agricultural land (sq. km),,Agricultural land refers to the share of land ...,,Annual,,,Sum,The data are collected by the Food and Agricul...,,,"Food and Agriculture Organization, electronic ...",Agricultural land constitutes only a part of a...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,
4,AG.LND.AGRI.ZS,Environment: Land use,Agricultural land (% of land area),,Agricultural land refers to the share of land ...,,Annual,,,Weighted average,The data are collected by the Food and Agricul...,,,"Food and Agriculture Organization, electronic ...",Agriculture is still a major sector in many ec...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,
5,AG.LND.ARBL.HA,Environment: Land use,Arable land (hectares),,Arable land (in hectares) includes land define...,,Annual,,,,The Food and Agriculture Organization (FAO) tr...,,,"Food and Agriculture Organization, electronic ...",Temporary fallow land refers to land left fall...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,
6,AG.LND.ARBL.HA.PC,Environment: Land use,Arable land (hectares per person),,Arable land (hectares per person) includes lan...,,Annual,,,Weighted Average,The Food and Agriculture Organization (FAO) tr...,,,"Food and Agriculture Organization, electronic ...",Temporary fallow land refers to land left fall...,Agricultural land covers about one-third of th...,,,,CC BY-4.0,
7,AG.LND.ARBL.ZS,Environment: Land use,Arable land (% of land area),,Arable land includes land defined by the FAO a...,,Annual,,,Weighted average,The Food and Agriculture Organization (FAO) tr...,,,"Food and Agriculture Organization, electronic ...",Temporary fallow land refers to land left fall...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,
8,AG.LND.CREL.HA,Environment: Agricultural production,Land under cereal production (hectares),,Land under cereal production refers to harvest...,,Annual,,,Sum,The data are collected by the Food and Agricul...,,,"Food and Agriculture Organization, electronic ...","Cereals production includes wheat, rice, maize...",The cultivation of cereals varies widely in di...,,,,CC BY-4.0,
9,AG.LND.CROP.ZS,Environment: Land use,Permanent cropland (% of land area),,Permanent cropland is land cultivated with cro...,,Annual,,,Weighted average,The Food and Agriculture Organization (FAO) tr...,,,"Food and Agriculture Organization, electronic ...",The data on Permanent cropland and land area a...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,


In [18]:
series.count()

1593

#### Examining Dimensions
##### What are the different periodicities or aggregation methods we might expect to see in the data ?

In [19]:
showDF(series.select('Periodicity').distinct(), truncate = False)

Unnamed: 0,Periodicity
0,Annual
1,
2,"International Civil Aviation Organization, Civil Aviation Statistics of the World and ICAO staff estimates."
3,Quarterly (represented as Annual)


In [20]:
showDF(series.select('Aggregation Method').distinct(), truncate = False)

Unnamed: 0,Aggregation Method
0,Sum
1,Weighted average
2,
3,Weighted Average
4,Gap-filled total
5,Median
6,Unweighted average
7,Simple average
8,Linear mixed-effect model estimates


## Exercise

Repeat the same steps for the `WDIData.csv` file and read it into a dataframe called `indicators`.

In [21]:
# Read the data
indicators = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("data/WDIData.csv")

indicators.count()

420024

In [22]:
# Inspect the schema
indicators.printSchema()

root
 |-- Country Name: string (nullable = true)
 |-- Country Code: string (nullable = true)
 |-- Indicator Name: string (nullable = true)
 |-- Indicator Code: string (nullable = true)
 |-- 1960: double (nullable = true)
 |-- 1961: double (nullable = true)
 |-- 1962: double (nullable = true)
 |-- 1963: double (nullable = true)
 |-- 1964: double (nullable = true)
 |-- 1965: double (nullable = true)
 |-- 1966: double (nullable = true)
 |-- 1967: double (nullable = true)
 |-- 1968: double (nullable = true)
 |-- 1969: double (nullable = true)
 |-- 1970: double (nullable = true)
 |-- 1971: double (nullable = true)
 |-- 1972: double (nullable = true)
 |-- 1973: double (nullable = true)
 |-- 1974: double (nullable = true)
 |-- 1975: double (nullable = true)
 |-- 1976: double (nullable = true)
 |-- 1977: double (nullable = true)
 |-- 1978: double (nullable = true)
 |-- 1979: double (nullable = true)
 |-- 1980: double (nullable = true)
 |-- 1981: double (nullable = true)
 |-- 1982: double (null

In [23]:
# Look at sample records
showDF(indicators)

Unnamed: 0,Country Name,Country Code,Indicator Name,Indicator Code,1960,1961,1962,1963,1964,1965,1966,1967,1968,1969,1970,1971,1972,1973,1974,1975,1976,1977,1978,1979,1980,1981,1982,1983,1984,1985,1986,1987,1988,1989,1990,1991,1992,1993,1994,1995,1996,1997,1998,1999,2000,2001,2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017
0,Arab World,ARB,2005 PPP conversion factor; GDP (LCU per inter...,PA.NUS.PPP.05,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1,Arab World,ARB,2005 PPP conversion factor; private consumptio...,PA.NUS.PRVT.PP.05,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2,Arab World,ARB,Access to clean fuels and technologies for coo...,EG.CFT.ACCS.ZS,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,73.675898,74.928059,76.2554,77.129805,78.114145,79.089859,79.883249,80.603401,81.243897,81.844982,82.407647,82.827636,83.169227,83.587141,83.954293,84.23063,84.570425,
3,Arab World,ARB,Access to electricity (% of population),EG.ELC.ACCS.ZS,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,74.384239,74.38222,74.31316,75.349325,75.788522,76.214138,77.20515,77.57373,78.395511,78.965532,78.76233,80.149257,80.359978,81.354788,82.662402,83.687576,85.800296,84.735723,85.432827,85.189815,86.136134,86.782683,87.288244,88.389705,88.076774,88.517967,88.768654,
4,Arab World,ARB,Access to electricity; rural (% of rural popul...,EG.ELC.ACCS.RU.ZS,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,57.770273,61.405699,61.960592,63.047098,63.573115,64.036901,65.528228,66.159344,67.546808,68.441798,66.785593,69.46401,67.164518,68.608013,69.079816,69.08958,74.531572,71.752764,73.4604,73.541696,74.50747,75.652712,76.62832,78.663736,77.439066,78.35552,78.743207,
5,Arab World,ARB,Access to electricity; urban (% of urban popul...,EG.ELC.ACCS.UR.ZS,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,92.26647,92.232237,91.547291,92.363777,92.557607,92.784188,92.986799,93.117865,93.28958,93.42637,94.253532,93.745237,93.780788,94.045639,94.816829,94.783179,95.626115,95.417388,95.609699,95.08829,95.841533,96.033101,96.060627,96.446582,96.481228,96.641407,96.773284,
6,Arab World,ARB,Account ownership at a financial institution o...,FX.OWN.TOTL.ZS,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,22.260538,,,30.27713,,,37.165211
7,Arab World,ARB,Account ownership at a financial institution o...,FX.OWN.TOTL.FE.ZS,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,13.775815,,,22.07935,,,25.635403
8,Arab World,ARB,Account ownership at a financial institution o...,FX.OWN.TOTL.MA.ZS,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,30.377668,,,37.790764,,,48.328518
9,Arab World,ARB,Account ownership at a financial institution o...,FX.OWN.TOTL.OL.ZS,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,25.741285,,,34.216583,,,42.542046


In [24]:
# Get some basic stats
indicators.count()

420024

# Data Transformation and Quality

### Step 4 - Transform the data

#### Transform the Country Dataset
- Select the columns that we will need for our data model
- Rename columns for data ingestion

We'll be using the various operations supported for a DataFrame.  You can view the complete list [here](http://spark.apache.org/docs/2.1.1/api/python/pyspark.sql.html#pyspark.sql.DataFrame) .

In [25]:
countryDim = country \
    .select("2-alpha code", "Country Code", "Short Name", "Long Name", "Region", "Income Group") \
    .withColumnRenamed("2-alpha code", "country_iso_code") \
    .withColumnRenamed("Country Code", "wb_country_code") \
    .withColumnRenamed("Short Name", "country_name") \
    .withColumnRenamed("Long Name", "country_long_name") \
    .withColumnRenamed("Region", "region") \
    .withColumnRenamed("Income Group", "income_group")
    
showDF(countryDim)

Unnamed: 0,country_iso_code,wb_country_code,country_name,country_long_name,region,income_group
0,AW,ABW,Aruba,Aruba,Latin America & Caribbean,High income
1,AF,AFG,Afghanistan,Islamic State of Afghanistan,South Asia,Low income
2,AO,AGO,Angola,People's Republic of Angola,Sub-Saharan Africa,Lower middle income
3,AL,ALB,Albania,Republic of Albania,Europe & Central Asia,Upper middle income
4,AD,AND,Andorra,Principality of Andorra,Europe & Central Asia,High income
5,1A,ARB,Arab World,Arab World,,
6,AE,ARE,United Arab Emirates,United Arab Emirates,Middle East & North Africa,High income
7,AR,ARG,Argentina,Argentine Republic,,
8,AM,ARM,Armenia,Republic of Armenia,Europe & Central Asia,Lower middle income
9,AS,ASM,American Samoa,American Samoa,East Asia & Pacific,Upper middle income


#### You can also do the similar transformations in sql
Let's rename the country_name and country_long_name columns

In [0]:
#Lets you create a view that you can use in SQL queries
countryDim.createOrReplaceTempView("country")

In [27]:
transformQuery = """
select 
    country_iso_code,
    wb_country_code,
    country_name as name,
    country_long_name as long_name,
    region,
    income_group
from 
    country
"""

showDF(spark.sql(transformQuery))

Unnamed: 0,country_iso_code,wb_country_code,name,long_name,region,income_group
0,AW,ABW,Aruba,Aruba,Latin America & Caribbean,High income
1,AF,AFG,Afghanistan,Islamic State of Afghanistan,South Asia,Low income
2,AO,AGO,Angola,People's Republic of Angola,Sub-Saharan Africa,Lower middle income
3,AL,ALB,Albania,Republic of Albania,Europe & Central Asia,Upper middle income
4,AD,AND,Andorra,Principality of Andorra,Europe & Central Asia,High income
5,1A,ARB,Arab World,Arab World,,
6,AE,ARE,United Arab Emirates,United Arab Emirates,Middle East & North Africa,High income
7,AR,ARG,Argentina,Argentine Republic,,
8,AM,ARM,Armenia,Republic of Armenia,Europe & Central Asia,Lower middle income
9,AS,ASM,American Samoa,American Samoa,East Asia & Pacific,Upper middle income


### Step 5 - Check the data quality

##### Do all countries have 2 character country_iso_codes ?

We will use some sql functions supported in PySpark for this exercise.  You can find a complete list of functions supported [here](http://spark.apache.org/docs/2.1.1/api/python/pyspark.sql.html#module-pyspark.sql.functions) .

In [28]:
countryCodeLength = countryDim.select(length("country_iso_code").alias("column_length")) \
    .groupBy("column_length") \
    .agg(count("*").alias("cnt")) \
    .filter("cnt > 1") 
showDF(countryCodeLength)

Unnamed: 0,column_length,cnt
0,2,262


In [29]:
#You can also do this in SQL
countryCodeLengthQuery = """
select 
    length(country_iso_code) as column_length,
    count(1) as cnt
from 
    country
group by 
    length(country_iso_code)
having 
    count(1) > 1
"""

showDF(spark.sql(countryCodeLengthQuery))

Unnamed: 0,column_length,cnt
0,2,262


## Quick Exercise
Do you notice a difference in the counts returned by the Quality check for the 2-character country_iso_code vs the original row count for the WDICountry.csv ?

In [0]:
# Paste your solution here
# The record counts are different 262 vs 263 and this is because one of the rows has a null country_iso_code

##### Do we have duplicate records for any of the key columns ?

In [31]:
showDF(countryDim.groupBy("country_iso_code").agg(count("*").alias("cnt")).filter("cnt > 1"))

showDF(countryDim.groupBy("wb_country_code").agg(count("*").alias("cnt")).filter("cnt > 1"))

showDF(countryDim.groupBy("country_name").agg(count("*").alias("cnt")).filter("cnt > 1"))

Unnamed: 0,country_iso_code,cnt


Unnamed: 0,wb_country_code,cnt


Unnamed: 0,country_name,cnt


### Step 6 - Fix any issues with data quality

In [32]:
countryDimFinal = countryDim.filter("country_iso_code is not null")

showDF(countryDimFinal)

Unnamed: 0,country_iso_code,wb_country_code,country_name,country_long_name,region,income_group
0,AW,ABW,Aruba,Aruba,Latin America & Caribbean,High income
1,AF,AFG,Afghanistan,Islamic State of Afghanistan,South Asia,Low income
2,AO,AGO,Angola,People's Republic of Angola,Sub-Saharan Africa,Lower middle income
3,AL,ALB,Albania,Republic of Albania,Europe & Central Asia,Upper middle income
4,AD,AND,Andorra,Principality of Andorra,Europe & Central Asia,High income
5,1A,ARB,Arab World,Arab World,,
6,AE,ARE,United Arab Emirates,United Arab Emirates,Middle East & North Africa,High income
7,AR,ARG,Argentina,Argentine Republic,,
8,AM,ARM,Armenia,Republic of Armenia,Europe & Central Asia,Lower middle income
9,AS,ASM,American Samoa,American Samoa,East Asia & Pacific,Upper middle income


In [33]:
countryDimFinal.count()

262

### Step 7 - Write the data to the destination

In [0]:
# Here we are going to write the country dimension to an output csv file
countryDimFinal \
    .coalesce(1) \
    .write.csv('CountryDim', mode='overwrite', header='true')

### Step 8 - Check if the output was written out as expected

In [35]:
!cat CountryDim/*csv | head

country_iso_code,wb_country_code,country_name,country_long_name,region,income_group
AW,ABW,Aruba,Aruba,Latin America & Caribbean,High income
AF,AFG,Afghanistan,Islamic State of Afghanistan,South Asia,Low income
AO,AGO,Angola,People's Republic of Angola,Sub-Saharan Africa,Lower middle income
AL,ALB,Albania,Republic of Albania,Europe & Central Asia,Upper middle income
AD,AND,Andorra,Principality of Andorra,Europe & Central Asia,High income
1A,ARB,Arab World,Arab World,"",""
AE,ARE,United Arab Emirates,United Arab Emirates,Middle East & North Africa,High income
AR,ARG,Argentina,Argentine Republic,"",""
AM,ARM,Armenia,Republic of Armenia,Europe & Central Asia,Lower middle income


## Exercise

#### Transform the Series Dataset and make it available in the dataframe seriesDim
- Filter only for series that have Annual periodicity
- Get the following columns and rename the selected columns to prepare further processing  

| Name in CSV | Column Name |
| ------------- |:-------------:|
| Series Code | indicator_code |
| Indicator Name | indicator_name |
| Periodicity | periodicity |
| Aggregation Method | aggregation_method |


##### How many series do you end up with ?

In [36]:
#Your solution here
seriesDim = series \
    .select("Series Code", "Indicator Name", "Short Definition", "Periodicity", "Aggregation Method") \
    .withColumnRenamed("Series Code", "indicator_code") \
    .withColumnRenamed("Indicator Name", "indicator_name") \
    .withColumnRenamed("Periodicity", "periodicity") \
    .withColumnRenamed("Aggregation Method", "aggregation_method") \
    .filter(col("periodicity") == "Annual") 
    
showDF(seriesDim)

Unnamed: 0,indicator_code,indicator_name,Short Definition,periodicity,aggregation_method
0,AG.AGR.TRAC.NO,"Agricultural machinery, tractors",,Annual,Sum
1,AG.CON.FERT.PT.ZS,Fertilizer consumption (% of fertilizer produc...,,Annual,Weighted average
2,AG.CON.FERT.ZS,Fertilizer consumption (kilograms per hectare ...,,Annual,Weighted average
3,AG.LND.AGRI.K2,Agricultural land (sq. km),,Annual,Sum
4,AG.LND.AGRI.ZS,Agricultural land (% of land area),,Annual,Weighted average
5,AG.LND.ARBL.HA,Arable land (hectares),,Annual,
6,AG.LND.ARBL.HA.PC,Arable land (hectares per person),,Annual,Weighted Average
7,AG.LND.ARBL.ZS,Arable land (% of land area),,Annual,Weighted average
8,AG.LND.CREL.HA,Land under cereal production (hectares),,Annual,Sum
9,AG.LND.CROP.ZS,Permanent cropland (% of land area),,Annual,Weighted average


In [37]:
seriesDim.count()

1587

#### What data quality checks were you able to perform ?

- Are there any duplicate codes ?
- Are all indicator_codes following the same pattern ?
- Is the case on columns consistent ? 

### Complex Transformations
##### Problem Statement
We want to measure the cellular and broadband penetration in comparison to the population demographics for every country.  It'll also be helpful to get some insights on annual global aggregates.

###### Our dataset has multiple types of metrics.  The only ones that we care about are simple aggregates.

In [38]:
simpleAggIndicators = seriesDim \
    .filter("lower(aggregation_method) = 'sum'") \
    .select("indicator_code", "indicator_name") \
    .orderBy("indicator_code")

showDF(simpleAggIndicators, limitRows = 500, truncate = False)

Unnamed: 0,indicator_code,indicator_name
0,AG.AGR.TRAC.NO,"Agricultural machinery, tractors"
1,AG.LND.AGRI.K2,Agricultural land (sq. km)
2,AG.LND.CREL.HA,Land under cereal production (hectares)
3,AG.LND.EL5M.RU.K2,Rural land area where elevation is below 5 meters (sq. km)
4,AG.LND.EL5M.UR.K2,Urban land area where elevation is below 5 meters (sq. km)
5,AG.LND.FRST.K2,Forest area (sq. km)
6,AG.LND.TOTL.K2,Land area (sq. km)
7,AG.LND.TOTL.RU.K2,Rural land area (sq. km)
8,AG.LND.TOTL.UR.K2,Urban land area (sq. km)
9,AG.PRD.CREL.MT,Cereal production (metric tons)


##### Only keep the indicators that are relevant to requirements i.e. Population indicators and Cellular and Broadband penetration

In [39]:
targetIndicators = simpleAggIndicators \
    .filter("lower(indicator_name) like '%population%total%' " + 
            " or lower(indicator_name) like '%cellular%' " +
            " or lower(indicator_name) like '%broadband%'") \
    .filter("lower(indicator_name) not like '%refugee%'")

showDF(targetIndicators)

Unnamed: 0,indicator_code,indicator_name
0,IT.CEL.SETS,Mobile cellular subscriptions
1,IT.NET.BBND,Fixed broadband subscriptions
2,SP.POP.0014.TO,"Population ages 0-14, total"
3,SP.POP.1564.TO,"Population ages 15-64, total"
4,SP.POP.65UP.TO,"Population ages 65 and above, total"
5,SP.POP.TOTL,"Population, total"


##### Now that we have identified the various indicators of interest, we can continue with getting the metrics for these indicators

In [0]:
# Keep the columns that are relevant for further transformations
indicatorsData = indicators \
    .withColumnRenamed("Indicator Code", "indicator_code") \
    .withColumnRenamed("Country Code", "wb_country_code") \
    .drop("Indicator Name") \
    .drop("Country Name") \
    .drop("_c62")

In [0]:
#Keep only the indicators that we care about
targetIndicatorsData = indicatorsData.join(targetIndicators \
                                         , indicatorsData.indicator_code == targetIndicators.indicator_code) \
    .drop(targetIndicators.indicator_code)

In [42]:
showDF(targetIndicatorsData)

Unnamed: 0,wb_country_code,indicator_code,1960,1961,1962,1963,1964,1965,1966,1967,1968,1969,1970,1971,1972,1973,1974,1975,1976,1977,1978,1979,1980,1981,1982,1983,1984,1985,1986,1987,1988,1989,1990,1991,1992,1993,1994,1995,1996,1997,1998,1999,2000,2001,2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017,indicator_name
0,ARB,IT.NET.BBND,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,365937.0,861800.0,1545499.0,2811961.0,4235468.0,5845325.0,7123797.0,8570787.0,10323590.0,11799990.0,13834070.0,16604450.0,18526990.0,,Fixed broadband subscriptions
1,ARB,IT.CEL.SETS,0.0,,,,,0.0,,,,,0.0,,,,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2330.0,3564.0,4936.0,7976.0,20756.0,33793.0,39447.0,58750.0,88618.0,129904.0,152281.0,199350.0,263401.0,501551.0,921229.0,1616551.0,2523542.0,4105508.0,9035241.0,16679594.0,25275663.0,35121269.0,52132873.0,84854521.0,125958922.0,174488139.0,214062500.0,265270800.0,312247800.0,351958000.0,381641900.0,407704500.0,415029400.0,417195600.0,412315300.0,,Mobile cellular subscriptions
2,ARB,SP.POP.0014.TO,40064255.0,41518715.0,42987169.0,44458639.0,45910606.0,47331020.0,48923976.0,50456294.0,51945966.0,53411639.0,54857512.0,56453735.0,57991493.0,59525243.0,61129366.0,62859354.0,64785694.0,66828081.0,68955334.0,71106520.0,73234442.0,75469309.0,77659213.0,79818025.0,81972127.0,84131860.0,86494332.0,88809494.0,91075462.0,93259426.0,96277669.0,98289704.0,99510372.0,101223904.0,102711484.0,104452866.0,105564857.0,106523081.0,107354429.0,108131446.0,108884238.0,109554629.0,110230211.0,110952794.0,111773790.0,112736681.0,113853070.0,115077993.0,116467600.0,118075200.0,119921700.0,122165900.0,124673800.0,127344100.0,129987000.0,132465400.0,135007700.0,,"Population ages 0-14, total"
3,ARB,SP.POP.1564.TO,49179481.0,50158016.0,51208328.0,52348368.0,53610121.0,55012095.0,56343748.0,57859656.0,59517129.0,61262878.0,63074009.0,64743285.0,66525027.0,68443969.0,70533559.0,72811620.0,75216468.0,77818854.0,80586837.0,83487468.0,86496611.0,89444551.0,92513772.0,95690461.0,98957605.0,102307492.0,105510462.0,108820444.0,112223266.0,115733643.0,120354864.0,124123536.0,126812158.0,131010463.0,135330556.0,140816007.0,145169463.0,149603044.0,154098379.0,158718005.0,163502786.0,168521819.0,173698045.0,179061404.0,184648505.0,190468825.0,196525245.0,202846731.0,209286800.0,215639500.0,221771600.0,227485600.0,232951100.0,238215400.0,243405400.0,248614900.0,253566100.0,,"Population ages 15-64, total"
4,ARB,SP.POP.65UP.TO,3247196.0,3367766.0,3486797.0,3604069.0,3719175.0,3831873.0,3962869.0,4090982.0,4217070.0,4342025.0,4466853.0,4610399.0,4752855.0,4894204.0,5033836.0,5172324.0,5330216.0,5486119.0,5641553.0,5798500.0,5958437.0,6138090.0,6317099.0,6497341.0,6681024.0,6870949.0,7088973.0,7312611.0,7546043.0,7794333.0,8102913.0,8416628.0,8714650.0,9051724.0,9393890.0,9760798.0,10109142.0,10448950.0,10782338.0,11113418.0,11444991.0,11773908.0,12098319.0,12420321.0,12739734.0,13059222.0,13394950.0,13729073.0,14071130.0,14430470.0,14815540.0,15244330.0,15682050.0,16142610.0,16650640.0,17224680.0,17878930.0,,"Population ages 65 and above, total"
5,ARB,SP.POP.TOTL,92490932.0,95044497.0,97682294.0,100411076.0,103239902.0,106174988.0,109230593.0,112406932.0,115680165.0,119016542.0,122398374.0,125807419.0,129269375.0,132863416.0,136696761.0,140843298.0,145332378.0,150133054.0,155183724.0,160392488.0,165689490.0,171051950.0,176490084.0,182005827.0,187610756.0,193310301.0,199093767.0,204942549.0,210844771.0,216787402.0,224735446.0,230829868.0,235037179.0,241286091.0,247435930.0,255029671.0,260843462.0,266575075.0,272235146.0,277962869.0,283832016.0,289850357.0,296026575.0,302434519.0,309162029.0,316264728.0,323773264.0,331653797.0,339825500.0,348145100.0,356508900.0,364895900.0,373307000.0,381702100.0,390043000.0,398305000.0,406452700.0,,"Population, total"
6,CSS,IT.NET.BBND,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,21197.0,56497.0,91957.0,138826.0,187127.0,253455.0,333308.0,434162.0,473138.0,498285.0,627251.0,674419.0,753368.0,897134.0,981886.0,,Fixed broadband subscriptions
7,CSS,IT.CEL.SETS,0.0,,,,,0.0,,,,,0.0,,,,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2072.0,5526.0,13393.0,23733.0,36161.0,66297.0,85516.0,108633.0,142649.0,261905.0,720054.0,1220489.0,2053418.0,2837910.0,3675512.0,4361812.0,5593660.0,6437567.0,7108319.0,7646708.0,7935923.0,7534114.0,7322041.0,7738401.0,7952805.0,8158698.0,8487436.0,,Mobile cellular subscriptions
8,CSS,SP.POP.0014.TO,1766884.0,1816372.0,1864013.0,1908732.0,1949651.0,1986537.0,2027526.0,2065067.0,2099075.0,2129958.0,2157574.0,2161662.0,2166614.0,2170393.0,2170412.0,2165879.0,2146121.0,2123720.0,2102017.0,2086263.0,2078845.0,2070572.0,2071997.0,2078306.0,2081406.0,2076689.0,2076933.0,2066207.0,2050451.0,2039142.0,2036976.0,2030740.0,2034151.0,2042579.0,2048202.0,2047008.0,2040088.0,2029026.0,2015946.0,2004340.0,1995212.0,1980547.0,1965839.0,1950837.0,1935177.0,1918729.0,1895746.0,1873815.0,1851614.0,1828309.0,1804177.0,1783918.0,1761456.0,1738418.0,1717638.0,1700765.0,1687271.0,,"Population ages 0-14, total"
9,CSS,SP.POP.1564.TO,2151060.0,2173846.0,2198714.0,2225619.0,2253715.0,2281832.0,2303230.0,2324023.0,2345303.0,2368421.0,2394815.0,2444417.0,2493596.0,2543802.0,2597023.0,2653995.0,2721778.0,2791258.0,2860226.0,2925392.0,2985929.0,3053494.0,3115516.0,3172934.0,3229088.0,3285917.0,3329091.0,3375049.0,3420931.0,3462524.0,3499178.0,3547648.0,3591934.0,3634556.0,3680063.0,3730018.0,3780408.0,3832769.0,3885271.0,3935289.0,3982553.0,4036795.0,4090689.0,4144400.0,4198175.0,4251978.0,4309485.0,4365620.0,4421571.0,4477802.0,4533534.0,4585728.0,4638828.0,4691000.0,4739145.0,4781112.0,4816676.0,,"Population ages 15-64, total"


#### The output that we see currently isn't the most ideal from a modeling perspective.  
A well-modeled dataset should allow for data to be easily augmented.  E.g. instead of having a column for each year (wide format), we would prefer a row for each year (long format). This allows us to easily add more rows in the future. This wide-to-long transformation is also a common preprocessing step when building models.

We want something similar to the output of the following code block:

In [43]:
indicatorsSample = targetIndicatorsData \
    .select(col("wb_country_code"),
            col("indicator_code"),
            lit("1960").alias("year"),
            col("1960").alias("indicator_value")) \
    .filter("indicator_value >= 0.0")

showDF(indicatorsSample)

Unnamed: 0,wb_country_code,indicator_code,year,indicator_value
0,ARB,IT.CEL.SETS,1960,0.0
1,ARB,SP.POP.0014.TO,1960,40064255.0
2,ARB,SP.POP.1564.TO,1960,49179481.0
3,ARB,SP.POP.65UP.TO,1960,3247196.0
4,ARB,SP.POP.TOTL,1960,92490932.0
5,CSS,IT.CEL.SETS,1960,0.0
6,CSS,SP.POP.0014.TO,1960,1766884.0
7,CSS,SP.POP.1564.TO,1960,2151060.0
8,CSS,SP.POP.65UP.TO,1960,169157.0
9,CSS,SP.POP.TOTL,1960,4198307.0


##### Let us start by getting the list of years that we have metrics for

In [44]:
yearList = [x for x in targetIndicatorsData.schema.names \
             if x != 'wb_country_code' and x != 'indicator_code' and x != 'indicator_name'] 

print(yearList)

['1960', '1961', '1962', '1963', '1964', '1965', '1966', '1967', '1968', '1969', '1970', '1971', '1972', '1973', '1974', '1975', '1976', '1977', '1978', '1979', '1980', '1981', '1982', '1983', '1984', '1985', '1986', '1987', '1988', '1989', '1990', '1991', '1992', '1993', '1994', '1995', '1996', '1997', '1998', '1999', '2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007', '2008', '2009', '2010', '2011', '2012', '2013', '2014', '2015', '2016', '2017']


In [45]:
#Cheat for creating a dataframe with no rows 
indicatorsDF = indicatorsSample.filter('1 = 0')

#Iterate through the list of years and store the rows in the DataFrame we created above
for indicatorYear in yearList:
    print("Processing indicators for " + indicatorYear)
    yearIndicatorDF = targetIndicatorsData \
        .select(col("wb_country_code")
                , col("indicator_code")
                , lit(indicatorYear).alias("year")
                , col(indicatorYear).alias("indicator_value")) \
        .filter("indicator_value >= 0")
    indicatorsDF = indicatorsDF.union(yearIndicatorDF)    

Processing indicators for 1960
Processing indicators for 1961
Processing indicators for 1962
Processing indicators for 1963
Processing indicators for 1964
Processing indicators for 1965
Processing indicators for 1966
Processing indicators for 1967
Processing indicators for 1968
Processing indicators for 1969
Processing indicators for 1970
Processing indicators for 1971
Processing indicators for 1972
Processing indicators for 1973
Processing indicators for 1974
Processing indicators for 1975
Processing indicators for 1976
Processing indicators for 1977
Processing indicators for 1978
Processing indicators for 1979
Processing indicators for 1980
Processing indicators for 1981
Processing indicators for 1982
Processing indicators for 1983
Processing indicators for 1984
Processing indicators for 1985
Processing indicators for 1986
Processing indicators for 1987
Processing indicators for 1988
Processing indicators for 1989
Processing indicators for 1990
Processing indicators for 1991
Processi

#### Let's cache the dataset to iterate over it

In [46]:
# You can iterate over a dataframe that is already computed by caching it once and using it repeatedly
indicatorsDF.cache()

#Force the data to be cached
indicatorsDF.count()

70303

In [47]:
#Check the indicator counts per year
showDF(indicatorsDF.groupBy('year').agg(count("*")).orderBy("year"), limitRows=100)

Unnamed: 0,year,count(1)
0,1960,1220
1,1961,974
2,1962,974
3,1963,974
4,1964,974
5,1965,1220
6,1966,974
7,1967,974
8,1968,974
9,1969,974


#### Getting yearly indicator totals

In [0]:
yearPivot = indicatorsDF.groupBy('year').pivot('indicator_code').sum('indicator_value') 

In [49]:
showDF(yearPivot.orderBy('year'))

Unnamed: 0,year,IT.CEL.SETS,IT.NET.BBND,SP.POP.0014.TO,SP.POP.1564.TO,SP.POP.65UP.TO,SP.POP.TOTL
0,1960,0.0,,11640430000.0,17646560000.0,1419169000.0,30720520000.0
1,1961,,,11860660000.0,17807070000.0,1451321000.0,31133690000.0
2,1962,,,12153580000.0,18033400000.0,1483492000.0,31685500000.0
3,1963,,,12498410000.0,18341100000.0,1517104000.0,32372040000.0
4,1964,,,12817220000.0,18681780000.0,1549219000.0,33064040000.0
5,1965,0.0,,13104550000.0,19071430000.0,1581083000.0,33773270000.0
6,1966,,,13437310000.0,19431870000.0,1634468000.0,34520250000.0
7,1967,,,13717670000.0,19843420000.0,1687816000.0,35265910000.0
8,1968,,,13969240000.0,20297060000.0,1741146000.0,36024840000.0
9,1969,,,14225160000.0,20782730000.0,1795141000.0,36820820000.0


In [50]:
yearPivot.printSchema()

root
 |-- year: string (nullable = false)
 |-- IT.CEL.SETS: double (nullable = true)
 |-- IT.NET.BBND: double (nullable = true)
 |-- SP.POP.0014.TO: double (nullable = true)
 |-- SP.POP.1564.TO: double (nullable = true)
 |-- SP.POP.65UP.TO: double (nullable = true)
 |-- SP.POP.TOTL: double (nullable = true)



In [0]:
yearPivotDF = yearPivot.orderBy('year') \
    .withColumnRenamed('IT.CEL.SETS', 'cellular_subscriptions') \
    .withColumnRenamed('IT.NET.BBND', 'broadband_subscriptions') \
    .withColumnRenamed('SP.POP.0014.TO', 'population_age_0_to_14') \
    .withColumnRenamed('SP.POP.1564.TO', 'population_age_15_64') \
    .withColumnRenamed('SP.POP.65UP.TO', 'population_age_65_and_above') \
    .withColumnRenamed('SP.POP.TOTL', 'population')

#### Data Quality Checkpoint 

In [52]:
# You can iterate over a dataframe that is already computed by caching it once and using it repeatedly
yearPivotDF.cache()

#Forces the data to be cached
yearPivotDF.count()

57

In [53]:
yearPivotDF.filter('population_age_0_to_14 < 0').count()

0

In [54]:
yearPivotDF.filter('population_age_15_64 < 0').count()

0

In [55]:
yearPivotDF.filter('population_age_0_to_14 < 0').count()

0

In [56]:
yearPivotDF.filter('population_age_65_and_above < 0').count()

0

In [57]:
yearPivotDF.filter('population < 0').count()

0

In [58]:
yearPivotDF.filter('cellular_subscriptions < 0').count()

0

In [59]:
yearPivotDF.filter('broadband_subscriptions < 0').count()

0

In [60]:
yearPivotDF.filter('population_age_0_to_14 > population').count()

0

In [61]:
yearPivotDF.filter('population_age_15_64 > population').count()

0

In [62]:
yearPivotDF.filter('population_age_65_and_above > population').count()

0

In [63]:
yearPivotDF.filter('(population_age_0_to_14 + population_age_15_64 + population_age_65_and_above) > population').count()

0

In [0]:
#Write the yearly totals to a CSV File
yearPivotDF \
    .select(col('year')
            , col('population').cast(DecimalType(38, 2))
            , col('population_age_0_to_14').cast(DecimalType(38, 2))
            , col('population_age_15_64').cast(DecimalType(38, 2))
            , col('population_age_65_and_above').cast(DecimalType(38, 2))
            , col('broadband_subscriptions').cast(DecimalType(38, 2))
            , col('cellular_subscriptions').cast(DecimalType(38, 2))) \
    .coalesce(1) \
    .write.csv('YearlyStats', mode='overwrite', header='true')

#### Getting yearly regional totals

In [0]:
regionalIndicators = indicatorsDF.join(countryDimFinal
                                       , indicatorsDF.wb_country_code == countryDim.wb_country_code
                                       , "inner") \
    .select(countryDim.region
            , indicatorsDF.wb_country_code
            , indicatorsDF.year
            , indicatorsDF.indicator_code
            , indicatorsDF.indicator_value)

In [66]:
showDF(regionalIndicators)

Unnamed: 0,region,wb_country_code,year,indicator_code,indicator_value
0,,ARB,1960,IT.CEL.SETS,0.0
1,,ARB,1960,SP.POP.0014.TO,40064255.0
2,,ARB,1960,SP.POP.1564.TO,49179481.0
3,,ARB,1960,SP.POP.65UP.TO,3247196.0
4,,ARB,1960,SP.POP.TOTL,92490932.0
5,,CSS,1960,IT.CEL.SETS,0.0
6,,CSS,1960,SP.POP.0014.TO,1766884.0
7,,CSS,1960,SP.POP.1564.TO,2151060.0
8,,CSS,1960,SP.POP.65UP.TO,169157.0
9,,CSS,1960,SP.POP.TOTL,4198307.0


In [0]:
regionalPivot = regionalIndicators.groupBy('region', 'year').pivot('indicator_code').sum('indicator_value')

In [68]:
showDF(regionalPivot.orderBy('region', 'year'), limitRows=100)

Unnamed: 0,region,year,IT.CEL.SETS,IT.NET.BBND,SP.POP.0014.TO,SP.POP.1564.TO,SP.POP.65UP.TO,SP.POP.TOTL
0,,1960,0.0,,10529180000.0,15913790000.0,1270274000.0,27726200000.0
1,,1961,,,10728830000.0,16057890000.0,1299015000.0,28098950000.0
2,,1962,,,10995080000.0,16261950000.0,1327810000.0,28598410000.0
3,,1963,,,11308760000.0,16539850000.0,1357916000.0,29220450000.0
4,,1964,,,11598830000.0,16847780000.0,1386660000.0,29847550000.0
5,,1965,0.0,,11860240000.0,17200400000.0,1415175000.0,30490450000.0
6,,1966,,,12163110000.0,17526690000.0,1463169000.0,31167950000.0
7,,1967,,,12418380000.0,17899580000.0,1511139000.0,31844460000.0
8,,1968,,,12647600000.0,18311040000.0,1559115000.0,32533460000.0
9,,1969,,,12880630000.0,18751020000.0,1607638000.0,33255360000.0


In [0]:
#Write the regional-yearly totals to a CSV File
regionalPivot.filter('region is not null') \
    .orderBy('region','year') \
    .withColumnRenamed('IT.CEL.SETS', 'cellular_subscriptions') \
    .withColumnRenamed('IT.NET.BBND', 'broadband_subscriptions') \
    .withColumnRenamed('SP.POP.0014.TO', 'population_age_0_to_14') \
    .withColumnRenamed('SP.POP.1564.TO', 'population_age_15_64') \
    .withColumnRenamed('SP.POP.65UP.TO', 'population_age_65_and_above') \
    .withColumnRenamed('SP.POP.TOTL', 'population') \
    .select(col('region')
            , col('year')
            , col('population').cast(DecimalType(38, 2))
            , col('population_age_0_to_14').cast(DecimalType(38, 2))
            , col('population_age_15_64').cast(DecimalType(38, 2))
            , col('population_age_65_and_above').cast(DecimalType(38, 2))
            , col('broadband_subscriptions').cast(DecimalType(38, 2))
            , col('cellular_subscriptions').cast(DecimalType(38, 2))) \
    .coalesce(1) \
    .write.csv('RegionalStats', mode='overwrite', header='true')

## Short Exercise

Becky finds the regional metrics interesting, but she wants to look at these metrics at a country level for each year.  Can you adapt the regional pivot that we computed earlier to get the metrics for each year and country ?

In [70]:
countryIndicators = indicatorsDF.join(countryDimFinal
                                       , indicatorsDF.wb_country_code == countryDim.wb_country_code
                                       , "inner") \
    .select(indicatorsDF.wb_country_code
            , countryDim.country_iso_code
            , countryDim.country_name
            , indicatorsDF.year
            , indicatorsDF.indicator_code
            , indicatorsDF.indicator_value)

showDF(countryIndicators)

Unnamed: 0,wb_country_code,country_iso_code,country_name,year,indicator_code,indicator_value
0,ARB,1A,Arab World,1960,IT.CEL.SETS,0.0
1,ARB,1A,Arab World,1960,SP.POP.0014.TO,40064255.0
2,ARB,1A,Arab World,1960,SP.POP.1564.TO,49179481.0
3,ARB,1A,Arab World,1960,SP.POP.65UP.TO,3247196.0
4,ARB,1A,Arab World,1960,SP.POP.TOTL,92490932.0
5,CSS,S3,Caribbean small states,1960,IT.CEL.SETS,0.0
6,CSS,S3,Caribbean small states,1960,SP.POP.0014.TO,1766884.0
7,CSS,S3,Caribbean small states,1960,SP.POP.1564.TO,2151060.0
8,CSS,S3,Caribbean small states,1960,SP.POP.65UP.TO,169157.0
9,CSS,S3,Caribbean small states,1960,SP.POP.TOTL,4198307.0


In [0]:
countryPivot = countryIndicators.groupBy('country_iso_code', 'country_name', 'year') \
    .pivot('indicator_code').sum('indicator_value')

In [72]:
showDF(countryPivot.orderBy('country_iso_code', 'country_name', 'year'), limitRows=100)

Unnamed: 0,country_iso_code,country_name,year,IT.CEL.SETS,IT.NET.BBND,SP.POP.0014.TO,SP.POP.1564.TO,SP.POP.65UP.TO,SP.POP.TOTL
0,1A,Arab World,1960,0.0,,40064260.0,49179480.0,3247196.0,92490930.0
1,1A,Arab World,1961,,,41518720.0,50158020.0,3367766.0,95044500.0
2,1A,Arab World,1962,,,42987170.0,51208330.0,3486797.0,97682290.0
3,1A,Arab World,1963,,,44458640.0,52348370.0,3604069.0,100411100.0
4,1A,Arab World,1964,,,45910610.0,53610120.0,3719175.0,103239900.0
5,1A,Arab World,1965,0.0,,47331020.0,55012100.0,3831873.0,106175000.0
6,1A,Arab World,1966,,,48923980.0,56343750.0,3962869.0,109230600.0
7,1A,Arab World,1967,,,50456290.0,57859660.0,4090982.0,112406900.0
8,1A,Arab World,1968,,,51945970.0,59517130.0,4217070.0,115680200.0
9,1A,Arab World,1969,,,53411640.0,61262880.0,4342025.0,119016500.0


In [0]:
#Write the regional-yearly totals to a CSV File
countryPivot.filter('country_iso_code is not null') \
    .orderBy('country_iso_code','country_name', 'year') \
    .withColumnRenamed('IT.CEL.SETS', 'cellular_subscriptions') \
    .withColumnRenamed('IT.NET.BBND', 'broadband_subscriptions') \
    .withColumnRenamed('SP.POP.0014.TO', 'population_age_0_to_14') \
    .withColumnRenamed('SP.POP.1564.TO', 'population_age_15_64') \
    .withColumnRenamed('SP.POP.65UP.TO', 'population_age_65_and_above') \
    .withColumnRenamed('SP.POP.TOTL', 'population') \
    .select(col('country_iso_code')
            , col('country_name')
            , col('year')
            , col('population').cast(DecimalType(38, 2))
            , col('population_age_0_to_14').cast(DecimalType(38, 2))
            , col('population_age_15_64').cast(DecimalType(38, 2))
            , col('population_age_65_and_above').cast(DecimalType(38, 2))
            , col('broadband_subscriptions').cast(DecimalType(38, 2))
            , col('cellular_subscriptions').cast(DecimalType(38, 2))) \
    .coalesce(1) \
    .write.csv('CountryStats', mode='overwrite', header='true')

## Exercise

Kat wants to identify the countries that are conducive to start a business.  She thinks that it would suffice to look at the most recent metrics for the following:
- Gross National Income (GNI)
- Cost of business start-up procedures
- Number of days required to start a business (male, female, and overall)
- Number of start-up procedures to register a business
- GDP
- GDP per capita
- Business Regulatory Environment
- Ease of doing business index (Only available in 2017)

Write the data to a csv file called 'BusinessStartupData'.

In [74]:
#Hint - Start by matching up indicators that might have descriptions matching what Kat is looking for. E.g.:
showDF(seriesDim.filter("indicator_name = 'GNI'" +
                        " or lower(indicator_name) like '%business%'" +
                        " or indicator_name like '%GDP per capita%'"
                        " or indicator_name='GDP'").orderBy("indicator_code"), limitRows = 500)

Unnamed: 0,indicator_code,indicator_name,Short Definition,periodicity,aggregation_method
0,IC.BUS.DISC.XQ,Business extent of disclosure index (0=less di...,,Annual,Unweighted average
1,IC.BUS.EASE.XQ,Ease of doing business index (1=most business-...,,Annual,
2,IC.BUS.NDNS.ZS,"New business density (new registrations per 1,...",,Annual,Unweighted average
3,IC.BUS.NREG,New businesses registered (number),,Annual,
4,IC.REG.COST.PC.FE.ZS,"Cost of business start-up procedures, female (...",,Annual,Unweighted average
5,IC.REG.COST.PC.MA.ZS,"Cost of business start-up procedures, male (% ...",,Annual,Unweighted average
6,IC.REG.COST.PC.ZS,Cost of business start-up procedures (% of GNI...,,Annual,Unweighted average
7,IC.REG.DURS,Time required to start a business (days),,Annual,Unweighted average
8,IC.REG.DURS.FE,"Time required to start a business, female (days)",,Annual,Unweighted average
9,IC.REG.DURS.MA,"Time required to start a business, male (days)",,Annual,Unweighted average


In [75]:
recentIndicators = indicatorsData \
    .select("wb_country_code", "indicator_code", "2016") \
    .filter(col('indicator_code').isin('IC.REG.COST.PC.ZS', 'IC.REG.DURS', 'IC.REG.DURS.FE', 'IC.REG.DURS.MA', \
        'IC.REG.PROC', 'NY.GNP.ATLS.CD', 'NY.GDP.MKTP.KD', 'NY.GDP.PCAP.KD', 'IQ.CPA.BREG.XQ', 'IC.BUS.EASE.XQ')) \
    .withColumnRenamed("2016", "indicator_value") \
    .withColumn("indicator_value", col("indicator_value").cast(DecimalType(38, 2)))

showDF(recentIndicators)

Unnamed: 0,wb_country_code,indicator_code,indicator_value
0,ARB,IC.REG.COST.PC.ZS,36.47
1,ARB,IQ.CPA.BREG.XQ,2.8
2,ARB,IC.BUS.EASE.XQ,
3,ARB,NY.GDP.MKTP.KD,2616760000000.0
4,ARB,NY.GDP.PCAP.KD,6438.05
5,ARB,NY.GNP.ATLS.CD,2676040000000.0
6,ARB,IC.REG.PROC,8.32
7,ARB,IC.REG.DURS,23.17
8,ARB,IC.REG.DURS.FE,23.55
9,ARB,IC.REG.DURS.MA,22.91


In [0]:
businessIndexIndicators = indicatorsData \
    .select("wb_country_code", "indicator_code", "2017") \
    .filter(col('indicator_code').isin('IC.BUS.EASE.XQ')) \
    .withColumnRenamed("2017", "indicator_value") \
    .withColumn("indicator_value", col("indicator_value").cast(DecimalType(38, 2)))

In [0]:
allIndicators = recentIndicators.union(businessIndexIndicators)

In [0]:
countryBusinessStartupPivot = allIndicators.join(countryDimFinal
                                       , recentIndicators.wb_country_code == countryDim.wb_country_code
                                       , "inner") \
    .select(countryDimFinal.country_iso_code
            , countryDimFinal.country_name
            , recentIndicators.indicator_code
            , recentIndicators.indicator_value) \
    .groupBy('country_iso_code', 'country_name').pivot('indicator_code').sum('indicator_value') \
    .withColumnRenamed('country_iso_code', 'Country ISO Code') \
    .withColumnRenamed('country_name', 'Country Name') \
    .withColumnRenamed('NY.GNP.ATLS.CD', 'GNI') \
    .withColumnRenamed('IC.REG.DURS', 'Startup Time') \
    .withColumnRenamed('IC.REG.DURS.FE', 'Startup Time Female') \
    .withColumnRenamed('IC.REG.DURS.MA', 'Startup Time Male') \
    .withColumnRenamed('IC.REG.PROC', 'Startup Procedures') \
    .withColumnRenamed('IC.REG.COST.PC.ZS', 'Startup Cost Pct of GNI') \
    .withColumnRenamed('NY.GDP.MKTP.KD', 'GDP') \
    .withColumnRenamed('NY.GDP.PCAP.KD', 'GDP Per Capita') \
    .withColumnRenamed('IQ.CPA.BREG.XQ', 'Business Regulation') \
    .withColumnRenamed('IC.BUS.EASE.XQ', 'Ease of Business') \
    .withColumn('Startup Cost', (col('GNI') * col('Startup Cost Pct of GNI') / lit(100.0)).cast(DecimalType(38, 2))) \
    .filter(col('GNI') > 0) \
    .filter(col('Startup Time').isNotNull()) \
    .filter(col('Startup Procedures').isNotNull()) \
    .filter(col('Startup Cost').isNotNull())

In [79]:
showDF(countryBusinessStartupPivot, limitRows = 50)

Unnamed: 0,Country ISO Code,Country Name,Ease of Business,Startup Cost Pct of GNI,Startup Time,Startup Time Female,Startup Time Male,Startup Procedures,Business Regulation,GDP,GDP Per Capita,GNI,Startup Cost
0,BJ,Benin,151.0,3.8,8.5,9.0,8.0,6.0,3.0,9103831278.0,837.34,8938879520.0,339677421.76
1,XC,Euro area,,3.83,9.55,9.55,9.55,5.26,,13377100000000.0,39256.27,12312600000000.0,471572580000.0
2,JM,Jamaica,70.0,4.3,10.0,10.0,10.0,2.0,,13801803130.0,4790.04,13349534142.0,574029968.11
3,KZ,Kazakhstan,36.0,0.3,9.0,9.0,9.0,5.0,,188309000000.0,10582.5,156812000000.0,470436000.0
4,NO,Norway,8.0,0.9,4.0,4.0,4.0,4.0,,472766000000.0,90288.82,429276000000.0,3863484000.0
5,IR,Iran,124.0,1.2,15.5,16.0,15.0,9.0,,540581000000.0,6733.91,438869000000.0,5266428000.0
6,AG,Antigua and Barbuda,107.0,9.4,22.0,22.0,22.0,9.0,,1344373698.0,13315.51,1369440563.0,128727412.92
7,CG,Congo,179.0,61.2,50.0,50.0,50.0,11.0,2.5,14342385958.0,2798.07,8770006555.0,5367244011.66
8,AR,Argentina,117.0,10.8,24.0,24.0,24.0,13.0,,445227000000.0,10154.0,524974000000.0,56697192000.0
9,HT,Haiti,181.0,219.3,97.0,97.0,97.0,12.0,2.0,7910618370.0,729.27,8488280524.0,18614799189.13


In [0]:
countryBusinessStartupPivot \
    .select("Country ISO Code", "Country Name", "GDP", "GDP Per Capita", "GNI", \
            "Startup Cost", "Startup Cost Pct of GNI", "Startup Time", "Startup Procedures", \
            "Business Regulation", "Ease of business") \
    .coalesce(1) \
    .write.csv('BusinessStartupData', mode='overwrite', header='true')

# Modeling

Let's use Spark to do some modeling!

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

Let's take the dataset you created in the previous exercise and predict 'Ease of Business' using other indicators.

In [89]:
#Input all the features in one vector column
features = ["GDP", "GDP Per Capita", "GNI", "Startup Time",
            "Startup Cost", "Startup Cost Pct of GNI", "Startup Procedures", "Business Regulation"]

# Vector Assembler combines raw features and features generated by different feature transformers 
# into the single feature vector needed to train ML models.
assembler = VectorAssembler(inputCols=features, outputCol = 'Attributes')

output = assembler.setHandleInvalid("skip").transform(countryBusinessStartupPivot.filter(col('Ease of Business').isNotNull()))

#'Attributes' are the input features and 'Ease of Business' is the target column.
finalized_data = output.select("Attributes", "Ease of Business")

finalized_data.show()

+--------------------+----------------+
|          Attributes|Ease of Business|
+--------------------+----------------+
|[9.103831278E9,83...|          151.00|
|[1.4342385958E10,...|          179.00|
|[7.91061837E9,729...|          181.00|
|[3.369621899E9,45...|          160.00|
|[2.27748E11,1178....|          147.00|
|[1.6833353304E10,...|          140.00|
|[7.32044136E8,667...|          129.00|
|[1.2424107945E10,...|          180.00|
|[1.627046588E9,35...|          172.00|
|[1.3421822111E10,...|          143.00|
|[6.315715818E9,10...|           77.00|
|[8.458800788E9,96...|          123.00|
|[8.085878853E9,39...|          144.00|
|[7.326645709E9,20...|           44.00|
|[4.062377053E8,37...|           89.00|
|[1.4858345267E10,...|          138.00|
|[1.084315801E9,53...|          146.00|
|[1.790417939E8,33...|          149.00|
|[1.928690844E8,16...|          157.00|
|[3.0547324905E10,...|          182.00|
+--------------------+----------------+
only showing top 20 rows



In [91]:
#Split training and testing data
train_data,test_data = finalized_data.randomSplit([0.8,0.2])

regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'Ease of Business')

#Learn to fit the model from training set
regressor = regressor.fit(train_data)

#To predict the prices on testing set
pred = regressor.evaluate(test_data)

#Predict the model
pred.predictions.show()

+--------------------+----------------+------------------+
|          Attributes|Ease of Business|        prediction|
+--------------------+----------------+------------------+
|[1.6833353304E10,...|          140.00| 121.2723619115442|
|[8.085878853E9,39...|          144.00|113.02867824748559|
|[1.928690844E8,16...|          157.00|152.27471530136742|
|[7.38041747E8,378...|           87.00| 113.8654191546719|
|[1.035620086E10,4...|          162.00|140.91541281804916|
|[1.9856634053E10,...|          105.00|118.24913879612849|
|[8.867464234E8,14...|          116.00| 140.6478735145592|
|[2.1413614655E10,...|          183.00|146.91456643648525|
|[1.67771E11,1029....|          177.00|147.25354925521165|
|[5.060218541E8,68...|           98.00|100.12359132745794|
|[7.6149479922E10,...|          170.00|148.91975047555985|
|[1.1102149875E10,...|          141.00| 99.22452006961993|
|[2.234759342E9,28...|           75.00|120.16114635444366|
|[1.192299136E9,93...|          178.00|167.7258793093201

In [92]:
pd.set_option('display.float_format', lambda x: '%.5f' % x)

summary = pd.DataFrame({'feature': ['Intercept'] + features,
                        'coefficient': [regressor.intercept] + regressor.coefficients.tolist(),
                        'pvalue': regressor.summary.pValues})
summary

Unnamed: 0,feature,coefficient,pvalue
0,Intercept,235.93695,0.49581
1,GDP,-0.0,0.65339
2,GDP Per Capita,0.00098,0.62237
3,GNI,0.0,0.12556
4,Startup Time,-0.37788,0.25113
5,Startup Cost,0.0,0.39788
6,Startup Cost Pct of GNI,0.10033,0.1701
7,Startup Procedures,2.56238,7e-05
8,Business Regulation,-38.09088,0.0


In [93]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="Ease of Business", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 29.115
MSE: 847.686
MAE: 25.363
r2: 0.430
