# Data Exploration

In this notebook we'll look at one of the first elements involved in any Data Engineering project - Getting to know what the inputs might look like.

### Step 1 - What inputs do you have?

In [1]:
%%bash
ls ../data

WDICountry.csv
WDIData.csv
WDISeries.csv


### Step 2 - Get your tools setup

As part of this we will be using [PySpark](http://spark.apache.org/docs/2.1.1/api/python/index.html) to inspect the data on hand and also gather some basic details.

In [2]:
import os
from IPython.display import display, HTML
import pandas as pd

#Locating where pyspark is installed
import findspark
findspark.init()
import pyspark

#Settings for PySpark to work
driver_memory = '4g'
num_executors = 2
executor_memory = '1g'
#pyspark_submit_args = ' --driver-memory ' + driver_memory + ' --executor-memory ' + executor_memory + ' --num-executors ' + num_executors + ' pyspark-shell'
pyspark_submit_args = ' --driver-memory ' + driver_memory + ' pyspark-shell'

#Setting the required parameters to start up PySpark
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

#Import Modules Needed for PySpark
from pyspark.sql import SparkSession

In [3]:
#Helper for pretty formatting for Spark DataFrames
def showDF(df, limitRows =  20, truncate = True):
    if(truncate):
        pd.set_option('display.max_colwidth', 50)
    else:
        pd.set_option('display.max_colwidth', -1)
    pd.set_option('display.max_rows', limitRows)
    display(df.limit(limitRows).toPandas())
    pd.reset_option('display.max_rows')

In [8]:
#Creating a spark session
spark = SparkSession.builder.appName("Data Exploration").getOrCreate()

### Step 3 - Look inside your data

We need to look at how our data is composed:
1. Format
2. Structure
3. Size
4. Dimensions

In this example our input is a CSV file with a header.  Let's try to see what the data looks like

#### Read The Data

In [9]:
#Read the file into a Spark Data Frame
country = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("../data/WDICountry.csv")

#### Inspect the schema of the file you just read

In [10]:
country.printSchema()

root
 |-- Country Code: string (nullable = true)
 |-- Short Name: string (nullable = true)
 |-- Table Name: string (nullable = true)
 |-- Long Name: string (nullable = true)
 |-- 2-alpha code: string (nullable = true)
 |-- Currency Unit: string (nullable = true)
 |-- Special Notes: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Income Group: string (nullable = true)
 |-- WB-2 code: string (nullable = true)
 |-- National accounts base year: string (nullable = true)
 |-- National accounts reference year: integer (nullable = true)
 |-- SNA price valuation: string (nullable = true)
 |-- Lending category: string (nullable = true)
 |-- Other groups: string (nullable = true)
 |-- System of National Accounts: string (nullable = true)
 |-- Alternative conversion factor: string (nullable = true)
 |-- PPP survey year: string (nullable = true)
 |-- Balance of Payments Manual in use: string (nullable = true)
 |-- External debt Reporting status: string (nullable = true)
 |-- Sys

#### Take a look at some sample data

You can run <dataframe>.show() to look at the sample data.  However the output is not well formatted so we will use our helper function to look at the data.

In [12]:
showDF(country, limitRows = 10, truncate = True)

Unnamed: 0,Country Code,Short Name,Table Name,Long Name,2-alpha code,Currency Unit,Special Notes,Region,Income Group,WB-2 code,...,Government Accounting concept,IMF data dissemination standard,Latest population census,Latest household survey,Source of most recent Income and expenditure data,Vital registration complete,Latest agricultural census,Latest industrial data,Latest trade data,Latest water withdrawal data
0,ABW,Aruba,Aruba,Aruba,AW,Aruban florin,SNA data for 2000-2011 are updated from offici...,Latin America & Caribbean,High income,AW,...,,Enhanced General Data Dissemination System (e-...,2010,,,Yes,,,2016.0,
1,AFG,Afghanistan,Afghanistan,Islamic State of Afghanistan,AF,Afghan afghani,Fiscal year end: March 20; reporting period fo...,South Asia,Low income,AF,...,Consolidated central government,Enhanced General Data Dissemination System (e-...,1979,"Demographic and Health Survey, 2015","Integrated household survey (IHS), 2011",,,,2016.0,2000.0
2,AGO,Angola,Angola,People's Republic of Angola,AO,Angolan kwanza,,Sub-Saharan Africa,Lower middle income,AO,...,Budgetary central government,Enhanced General Data Dissemination System (e-...,2014,"Demographic and Health Survey, 2015/16","Integrated household survey (IHS), 2008/09",,,,2016.0,2005.0
3,ALB,Albania,Albania,Republic of Albania,AL,Albanian lek,,Europe & Central Asia,Upper middle income,AL,...,Consolidated central government,Enhanced General Data Dissemination System (e-...,2011,"Demographic and Health Survey, 2008/09",Living Standards Measurement Study Survey (LSM...,Yes,2012.0,2013.0,2016.0,2006.0
4,AND,Andorra,Andorra,Principality of Andorra,AD,Euro,WB-3 code changed from ADO to AND to align wit...,Europe & Central Asia,High income,AD,...,,,2011. Population data compiled from administra...,,,Yes,,,,
5,ARB,Arab World,Arab World,Arab World,1A,,Arab World aggregate. Arab World is composed o...,,,1A,...,,,,,,,,,2016.0,
6,ARE,United Arab Emirates,United Arab Emirates,United Arab Emirates,AE,U.A.E. dirham,,Middle East & North Africa,High income,AE,...,Consolidated central government,Enhanced General Data Dissemination System (e-...,2010,"World Health Survey, 2003",,,2012.0,1985.0,2016.0,2005.0
7,ARG,Argentina,Argentina,Argentine Republic,AR,Argentine peso,National Institute of Statistics and Census re...,,,,...,,,,,,,,,,
8,ARM,Armenia,Armenia,Republic of Armenia,AM,Armenian dram,,Europe & Central Asia,Lower middle income,AM,...,Consolidated central government,Special Data Dissemination Standard (SDDS),2011,"Demographic and Health Survey, 2015/16","Integrated household survey (IHS), 2015",Yes,2014.0,,2016.0,2012.0
9,ASM,American Samoa,American Samoa,American Samoa,AS,U.S. dollar,New base Year 2009,East Asia & Pacific,Upper middle income,AS,...,,,2010,,,Yes,2008.0,,2016.0,


#### Get Some Basic Stats

In [13]:
#Count the number of records in the dataframe
country.count()

263

#### Examining Dimensions
##### How many different regions do the various countries belong to ?

In [14]:
showDF(country.select('Region').distinct(), truncate = False)

Unnamed: 0,Region
0,South Asia
1,
2,Sub-Saharan Africa
3,Europe & Central Asia
4,North America
5,East Asia & Pacific
6,Middle East & North Africa
7,Latin America & Caribbean


##### How many different income groups do we have across countries?

In [15]:
showDF(country.select('Income Group').distinct(), truncate = False)

Unnamed: 0,Income Group
0,Lower middle income
1,
2,High income
3,Upper middle income
4,Low income


#### By applying the same steps as we did for the "WDICountry.csv" dataset, we can see what the rest of the datasets look like

###### WDISeries.csv

In [16]:
series = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("../data/WDISeries.csv")

In [17]:
series.printSchema()

root
 |-- Series Code: string (nullable = true)
 |-- Topic: string (nullable = true)
 |-- Indicator Name: string (nullable = true)
 |-- Short definition: string (nullable = true)
 |-- Long definition: string (nullable = true)
 |-- Unit of measure: string (nullable = true)
 |-- Periodicity: string (nullable = true)
 |-- Base Period: string (nullable = true)
 |-- Other notes: string (nullable = true)
 |-- Aggregation method: string (nullable = true)
 |-- Limitations and exceptions: string (nullable = true)
 |-- Notes from original source: string (nullable = true)
 |-- General comments: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Statistical concept and methodology: string (nullable = true)
 |-- Development relevance: string (nullable = true)
 |-- Related source links: string (nullable = true)
 |-- Other web links: string (nullable = true)
 |-- Related indicators: string (nullable = true)
 |-- License Type: string (nullable = true)
 |-- _c20: string (nullable = tru

In [20]:
showDF(series, limitRows = 10)

Unnamed: 0,Series Code,Topic,Indicator Name,Short definition,Long definition,Unit of measure,Periodicity,Base Period,Other notes,Aggregation method,...,Notes from original source,General comments,Source,Statistical concept and methodology,Development relevance,Related source links,Other web links,Related indicators,License Type,_c20
0,AG.AGR.TRAC.NO,Environment: Agricultural production,"Agricultural machinery, tractors",,Agricultural machinery refers to the number of...,,Annual,,,Sum,...,,,"Food and Agriculture Organization, electronic ...",A tractor provides the power and traction to m...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,
1,AG.CON.FERT.PT.ZS,Environment: Agricultural production,Fertilizer consumption (% of fertilizer produc...,,Fertilizer consumption measures the quantity o...,,Annual,,,Weighted average,...,,,"Food and Agriculture Organization, electronic ...",Fertilizer consumption measures the quantity o...,"Factors such as the green revolution, has led ...",,,,CC BY-4.0,
2,AG.CON.FERT.ZS,Environment: Agricultural production,Fertilizer consumption (kilograms per hectare ...,,Fertilizer consumption measures the quantity o...,,Annual,,,Weighted average,...,,,"Food and Agriculture Organization, electronic ...",Fertilizer consumption measures the quantity o...,"Factors such as the green revolution, has led ...",,,,CC BY-4.0,
3,AG.LND.AGRI.K2,Environment: Land use,Agricultural land (sq. km),,Agricultural land refers to the share of land ...,,Annual,,,Sum,...,,,"Food and Agriculture Organization, electronic ...",Agricultural land constitutes only a part of a...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,
4,AG.LND.AGRI.ZS,Environment: Land use,Agricultural land (% of land area),,Agricultural land refers to the share of land ...,,Annual,,,Weighted average,...,,,"Food and Agriculture Organization, electronic ...",Agriculture is still a major sector in many ec...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,
5,AG.LND.ARBL.HA,Environment: Land use,Arable land (hectares),,Arable land (in hectares) includes land define...,,Annual,,,,...,,,"Food and Agriculture Organization, electronic ...",Temporary fallow land refers to land left fall...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,
6,AG.LND.ARBL.HA.PC,Environment: Land use,Arable land (hectares per person),,Arable land (hectares per person) includes lan...,,Annual,,,Weighted Average,...,,,"Food and Agriculture Organization, electronic ...",Temporary fallow land refers to land left fall...,Agricultural land covers about one-third of th...,,,,CC BY-4.0,
7,AG.LND.ARBL.ZS,Environment: Land use,Arable land (% of land area),,Arable land includes land defined by the FAO a...,,Annual,,,Weighted average,...,,,"Food and Agriculture Organization, electronic ...",Temporary fallow land refers to land left fall...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,
8,AG.LND.CREL.HA,Environment: Agricultural production,Land under cereal production (hectares),,Land under cereal production refers to harvest...,,Annual,,,Sum,...,,,"Food and Agriculture Organization, electronic ...","Cereals production includes wheat, rice, maize...",The cultivation of cereals varies widely in di...,,,,CC BY-4.0,
9,AG.LND.CROP.ZS,Environment: Land use,Permanent cropland (% of land area),,Permanent cropland is land cultivated with cro...,,Annual,,,Weighted average,...,,,"Food and Agriculture Organization, electronic ...",The data on Permanent cropland and land area a...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,


In [19]:
series.count()

1593

#### Examining Dimensions
##### What are the different periodicities or aggregation methods we might expect to see in the data ?

In [21]:
showDF(series.select('Periodicity').distinct(), truncate = False)

Unnamed: 0,Periodicity
0,Annual
1,
2,Quarterly (represented as Annual)
3,"International Civil Aviation Organization, Civil Aviation Statistics of the World and ICAO staff estimates."


In [22]:
showDF(series.select('Aggregation Method').distinct(), truncate = False)

Unnamed: 0,Aggregation Method
0,
1,Weighted average
2,Simple average
3,Gap-filled total
4,Median
5,Unweighted average
6,Linear mixed-effect model estimates
7,Weighted Average
8,Sum


In [23]:
showDF(series.select('Topic').distinct(), truncate = False)

Unnamed: 0,Topic
0,Education: Efficiency
1,Social Protection & Labor: Labor force structure
2,Infrastructure: Transportation
3,Economic Policy & Debt: National accounts: Local currency at constant prices: Other items
4,Environment: Density & urbanization
5,Economic Policy & Debt: External debt: Interest
6,Environment: Agricultural production
7,Economic Policy & Debt: External debt: Undisbursed debt
8,Public Sector: Policy & institutions
9,Environment: Land use


## Exercise

Repeat the same steps for the `WDIData.csv` file and read it into a dataframe called `indicators`.

In [24]:
# Read the data
indicators = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("../data/WDIData.csv")

In [25]:
# Inspect the schema
indicators.printSchema()

root
 |-- Country Name: string (nullable = true)
 |-- Country Code: string (nullable = true)
 |-- Indicator Name: string (nullable = true)
 |-- Indicator Code: string (nullable = true)
 |-- 1960: double (nullable = true)
 |-- 1961: double (nullable = true)
 |-- 1962: double (nullable = true)
 |-- 1963: double (nullable = true)
 |-- 1964: double (nullable = true)
 |-- 1965: double (nullable = true)
 |-- 1966: double (nullable = true)
 |-- 1967: double (nullable = true)
 |-- 1968: double (nullable = true)
 |-- 1969: double (nullable = true)
 |-- 1970: double (nullable = true)
 |-- 1971: double (nullable = true)
 |-- 1972: double (nullable = true)
 |-- 1973: double (nullable = true)
 |-- 1974: double (nullable = true)
 |-- 1975: double (nullable = true)
 |-- 1976: double (nullable = true)
 |-- 1977: double (nullable = true)
 |-- 1978: double (nullable = true)
 |-- 1979: double (nullable = true)
 |-- 1980: double (nullable = true)
 |-- 1981: double (nullable = true)
 |-- 1982: double (null

In [69]:
indicators.count()

420024

In [68]:
# Look at sample records
showDF(indicators.sample(.0001))

Unnamed: 0,Country Name,Country Code,Indicator Name,Indicator Code,1960,1961,1962,1963,1964,1965,...,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017
0,Latin America & the Caribbean (IDA & IBRD coun...,TLA,Poverty gap at $5.50 a day (2011 PPP) (% of po...,SI.POV.UMIC.GP,,,,,,,...,,,,,,,,,,
1,North America,NAC,Total natural resources rents (% of GDP),NY.GDP.TOTL.RT.ZS,,,,,,,...,2.453188,0.9440692,1.157976,1.431669,1.031652,1.007627,0.9638295,0.3556517,0.3371314,
2,Post-demographic dividend,PST,Central government debt; total (% of GDP),GC.DOD.TOTL.GD.ZS,,,,,,,...,,,,,,,,,,
3,South Asia (IDA & IBRD),TSA,Adjusted net savings; including particulate em...,NY.ADJ.SVNG.GN.ZS,,,,,,,...,19.75165,23.4788,23.52232,20.84954,20.0228,19.50514,20.19086,18.74313,16.34986,
4,South Asia (IDA & IBRD),TSA,PPG; official creditors (DIS; current US$),DT.DIS.OFFT.CD,,,,,,,...,,,,,,,,,,
5,World,WLD,Currency composition of PPG debt; Pound sterli...,DT.CUR.UKPS.ZS,,,,,,,...,,,,,,,,,,
6,Australia,AUS,Net flows on external debt; short-term (NFL; c...,DT.NFL.DSTC.CD,,,,,,,...,,,,,,,,,,
7,Barbados,BRB,Final consumption expenditure; etc. (constant ...,NE.CON.TETC.KD,,,,,,,...,,,4294150000.0,,,,,,,
8,Benin,BEN,Coverage of social safety net programs in 3rd ...,per_sa_allsa.cov_q3_tot,,,,,,,...,,,,,,,,,,
9,Cayman Islands,CYM,Completeness of birth registration; male (%),SP.REG.BRTH.MA.ZS,,,,,,,...,,,,,,,,,,


In [63]:
# How many countries are represented?
indicators.select("Country Name").distinct().count()

264

In [27]:
# Get some basic stats
showDF(indicators.select('Indicator Name').distinct(), truncate = False)

Unnamed: 0,Indicator Name
0,Adjusted savings: education expenditure (current US$)
1,Cause of death; by injury (% of total)
2,Completeness of infant death reporting (% of reported infant deaths to estimated infant deaths)
3,Forest area (% of land area)
4,Labor force participation rate; total (% of total population ages 15+) (modeled ILO estimate)
5,People using safely managed sanitation services (% of population)
6,People with basic handwashing facilities including soap and water; urban (% of urban population)
7,PPG; bilateral concessional (DOD; current US$)
8,Coverage of social protection and labor programs (% of population)
9,Currency composition of PPG debt; Swiss franc (%)


In [34]:
showDF(indicators.where(indicators['Indicator Name'] == 'Gross domestic savings (% of GDP)')\
       [['Country Name', '2015']])

Unnamed: 0,Country Name,2015
0,Arab World,26.888773
1,Caribbean small states,16.363792
2,Central Europe and the Baltics,26.095708
3,Early-demographic dividend,24.899499
4,East Asia & Pacific,35.196669
5,East Asia & Pacific (excluding high income),44.804527
6,East Asia & Pacific (IDA & IBRD countries),44.803684
7,Euro area,24.089543
8,Europe & Central Asia,24.181948
9,Europe & Central Asia (excluding high income),26.775318


In [73]:
from pyspark.sql.functions import min, max, mean, sum, col, count

# Calculate average, sum, and maximum Gross domestic savings 
# (as a % of GDP) for countries in 2015
groupBy = ["Country Name"]
aggregate = ["2015"]
funcs = [mean, min, max]
exprs = [f(col(c)) for f in funcs for c in aggregate]

showDF(indicators.where(indicators['Indicator Name'] == 'Gross domestic savings (% of GDP)')\
       .agg(*exprs), truncate = False)

Unnamed: 0,avg(2015),min(2015),max(2015)
0,18.890471,-66.922321,64.30084


In [74]:
showDF(indicators.groupBy("Country Name").agg(count("*").alias("cnt")).filter("cnt > 1"))

Unnamed: 0,Country Name,cnt
0,South Asia,1591
1,Chad,1591
2,Lower middle income,1591
3,Paraguay,1591
4,Low & middle income,1591
5,Heavily indebted poor countries (HIPC),1591
6,World,1591
7,Senegal,1591
8,East Asia & Pacific (IDA & IBRD countries),1591
9,Cabo Verde,1591


In [None]:
groupBy = ["k"]
aggregate = ["v"] 
funs = [mean, sum, max]

exprs = [f(col(c)) for f in funs for c in aggregate]

# or equivalent df.groupby(groupBy).agg(*exprs)
df.groupby(*groupBy).agg(*exprs)