# 2. Data Transformation, Modeling, and Quality with Spark

After exploring the data, I will transform (reshaping, joining, filtering) the datasets so they will be ready for use in a Data Model. 
This is a continuation from the Data Exploration notebook, so I will be using the same datasets: WDICountry.csv, WDIData.csv, and WDISeries.csv

In [71]:
%%bash
ls ./Downloads/WDI_csv/

WDICountry-Series.csv
WDICountry.csv
WDIData.csv
WDIFootNote.csv
WDISeries-Time.csv
WDISeries.csv


In [280]:
import os
from IPython.display import display, HTML
import pandas as pd
import pyspark 
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType

#import modules needed for pyspark
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName("Data Transformation").getOrCreate()

In [90]:
#a function to format Spark DataFrames clearly
def showDF(df, limitRows =  15, truncate = True):
    if(truncate):
        pd.set_option('display.max_colwidth', 50)
        display(df.limit(limitRows).toPandas())
    else:
        pd.set_option('display.max_colwidth', -1)
        pd.set_option('display.max_rows', limitRows)
        display(df.limit(limitRows).toPandas())
        pd.reset_option('display.max_rows')

#### Read the files into Spark DataFrames

In [17]:
country = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("./Downloads/WDI_csv/WDICountry.csv")

country.count()

263

In [18]:
series = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("./Downloads/WDI_csv/WDIseries.csv")

series.count()

1437

In [21]:
indicators = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("./Downloads/WDI_csv/WDIData.csv")

indicators.count()

377256

### Transforming the data
#### Starting with the Country DataFrame:
- Remove unwanted columns for the data model
- Rename columns for data indigestion

In [25]:
countryRenamed = country \
    .select("2-alpha code", "Country Code", "Short Name", "Long Name", "Region", "Income Group") \
    .withColumnRenamed("2-alpha code", "country_iso_code") \
    .withColumnRenamed("Country Code", "wb_country_code") \
    .withColumnRenamed("Short Name", "country_name") \
    .withColumnRenamed("Long Name", "country_long_name") \
    .withColumnRenamed("Region", "region") \
    .withColumnRenamed("Income Group", "income_group")

In [183]:
#Register the DataFrame as a global temporary view for SQL queries
countryRenamed.createOrReplaceTempView("country")

#### First 15 rows of the Country dataset with renamed columns:

In [30]:
renamedDF = """
select 
    country_iso_code,
    wb_country_code,
    country_name as name,
    country_long_name as long_name,
    region,
    income_group
from 
    country
"""

showDF(spark.sql(renamedDF))

Unnamed: 0,country_iso_code,wb_country_code,name,long_name,region,income_group
0,AW,ABW,Aruba,Aruba,Latin America & Caribbean,High income
1,AF,AFG,Afghanistan,Islamic State of Afghanistan,South Asia,Low income
2,AO,AGO,Angola,People's Republic of Angola,Sub-Saharan Africa,Lower middle income
3,AL,ALB,Albania,Republic of Albania,Europe & Central Asia,Upper middle income
4,AD,AND,Andorra,Principality of Andorra,Europe & Central Asia,High income
5,1A,ARB,Arab World,Arab World,,
6,AE,ARE,United Arab Emirates,United Arab Emirates,Middle East & North Africa,High income
7,AR,ARG,Argentina,Argentine Republic,Latin America & Caribbean,Upper middle income
8,AM,ARM,Armenia,Republic of Armenia,Europe & Central Asia,Upper middle income
9,AS,ASM,American Samoa,American Samoa,East Asia & Pacific,Upper middle income


### Quality Check
#### Do all countries have 2 character country_iso_codes?
The number of countries with 2 character iso codes (262) differ from the number or rows of the Country dataset (263)

In [45]:
isoLength = """
select 
    length(country_iso_code) as characters,
    count(1) as count
from 
    country
group by 
    characters
having 
    count > 1
"""

showDF(spark.sql(isoLength))

Unnamed: 0,characters,count
0,2,262


#### Check for duplicates for any of the key columns
There are no duplicates

In [46]:
isoDuplicates = """
select 
    country_iso_code,
    count(1) as count
from
    country
group by
    country_iso_code
having
    count > 1
    
"""
showDF(spark.sql(isoDuplicates))

wbDuplicates = """
select 
    country_iso_code,
    count(1) as count
from
    country
group by
    country_iso_code
having
    count > 1
    
"""
showDF(spark.sql(wbDuplicates))

nameDuplicates = """
select 
    country_iso_code,
    count(1) as count
from
    country
group by
    country_iso_code
having
    count > 1
    
"""

showDF(spark.sql(nameDuplicates))

Unnamed: 0,country_iso_code,count


Unnamed: 0,country_iso_code,count


Unnamed: 0,country_iso_code,count


#### The reason is because one of the iso codes are null

In [47]:
isoLength2 = """
select 
    length(country_iso_code) as characters,
    count(1) as count
from 
    country
group by 
    characters
"""

showDF(spark.sql(isoLength2))

Unnamed: 0,characters,count
0,,1
1,2.0,262


#### After removing the null iso code, the numbers of rows is the same as the number of iso codes that are > 2 characters in length. (262)

In [61]:
countryFinal = countryRenamed.filter("country_iso_code is not null")

showDF(countryFinal)

Unnamed: 0,country_iso_code,wb_country_code,country_name,country_long_name,region,income_group
0,AW,ABW,Aruba,Aruba,Latin America & Caribbean,High income
1,AF,AFG,Afghanistan,Islamic State of Afghanistan,South Asia,Low income
2,AO,AGO,Angola,People's Republic of Angola,Sub-Saharan Africa,Lower middle income
3,AL,ALB,Albania,Republic of Albania,Europe & Central Asia,Upper middle income
4,AD,AND,Andorra,Principality of Andorra,Europe & Central Asia,High income
5,1A,ARB,Arab World,Arab World,,
6,AE,ARE,United Arab Emirates,United Arab Emirates,Middle East & North Africa,High income
7,AR,ARG,Argentina,Argentine Republic,Latin America & Caribbean,Upper middle income
8,AM,ARM,Armenia,Republic of Armenia,Europe & Central Asia,Upper middle income
9,AS,ASM,American Samoa,American Samoa,East Asia & Pacific,Upper middle income


In [63]:
countryFinal.count()

262

#### Write the CountryFinal as a csv file to the folder

In [74]:
countryFinal \
    .coalesce(1) \
    .write.csv('./Downloads/WDI_csv/Final/country', mode='overwrite', header='true')

#### Check if it is written correctly
It is!

In [75]:
%%bash
cat ./Downloads/WDI_csv/Final/country/*csv | head

country_iso_code,wb_country_code,country_name,country_long_name,region,income_group
AW,ABW,Aruba,Aruba,Latin America & Caribbean,High income
AF,AFG,Afghanistan,Islamic State of Afghanistan,South Asia,Low income
AO,AGO,Angola,People's Republic of Angola,Sub-Saharan Africa,Lower middle income
AL,ALB,Albania,Republic of Albania,Europe & Central Asia,Upper middle income
AD,AND,Andorra,Principality of Andorra,Europe & Central Asia,High income
1A,ARB,Arab World,Arab World,"",""
AE,ARE,United Arab Emirates,United Arab Emirates,Middle East & North Africa,High income
AR,ARG,Argentina,Argentine Republic,Latin America & Caribbean,Upper middle income
AM,ARM,Armenia,Republic of Armenia,Europe & Central Asia,Upper middle income


### Next, I will do the same for the WDIseries dataset by transforming and performing a data quality check

#### Renaming the columns and filtering the data by annual periodicity
This data set has 1416 rows

In [162]:
seriesAnnual = series \
    .select("Series Code", "Indicator Name", "Periodicity", "Aggregation Method") \
    .withColumnRenamed("Series Code", "indicator_code") \
    .withColumnRenamed("Indicator Name", "indicator_name") \
    .withColumnRenamed("Periodicity", "periodicity") \
    .withColumnRenamed("Aggregation Method", "aggregation_method") \
    .filter(F.col("periodicity") == "Annual") 
    
showDF(seriesAnnual)

seriesAnnual.count()

Unnamed: 0,indicator_code,indicator_name,periodicity,aggregation_method
0,AG.AGR.TRAC.NO,"Agricultural machinery, tractors",Annual,Sum
1,AG.CON.FERT.PT.ZS,Fertilizer consumption (% of fertilizer produc...,Annual,Weighted average
2,AG.CON.FERT.ZS,Fertilizer consumption (kilograms per hectare ...,Annual,Weighted average
3,AG.LND.AGRI.K2,Agricultural land (sq. km),Annual,Sum
4,AG.LND.AGRI.ZS,Agricultural land (% of land area),Annual,Weighted average
5,AG.LND.ARBL.HA,Arable land (hectares),Annual,
6,AG.LND.ARBL.HA.PC,Arable land (hectares per person),Annual,Weighted average
7,AG.LND.ARBL.ZS,Arable land (% of land area),Annual,Weighted average
8,AG.LND.CREL.HA,Land under cereal production (hectares),Annual,Sum
9,AG.LND.CROP.ZS,Permanent cropland (% of land area),Annual,Weighted average


1416

#### I want to measure the cellular and broadband penetration in comparison to the population demographics for each country. This will also bring insights on annual global aggregates

In [163]:
AggIndicators = seriesAnnual \
    .select("indicator_code", "indicator_name") \
    .filter("lower(aggregation_method) = 'sum'") \
    .filter("lower(indicator_name) like '%population%total%' " + 
            " or lower(indicator_name) like '%cellular%' " +
            " or lower(indicator_name) like '%broadband%'") \
    .filter("lower(indicator_name) not like '%refugee%'") \
    .orderBy("indicator_code") 

#SQL equivalent
# AggIndicators = """

# select 
#     indicator_code, 
#     indicator_name
# from 
#     seriesAnnual
# where 
#     aggregation_method = "Sum" and
#     ((indicator_name like 'Population%' and indicator_name like '%total') or
#     indicator_name like '%broadband%' or
#     indicator_name like '%cellular%') and
#     indicator_name not like '%refugee%'
# order by 
#     indicator_code
# """
showDF(AggIndicators, truncate = False)

Unnamed: 0,indicator_code,indicator_name
0,IT.CEL.SETS,Mobile cellular subscriptions
1,IT.NET.BBND,Fixed broadband subscriptions
2,SP.POP.0014.TO,"Population ages 0-14, total"
3,SP.POP.1564.TO,"Population ages 15-64, total"
4,SP.POP.65UP.TO,"Population ages 65 and above, total"
5,SP.POP.TOTL,"Population, total"


#### After identifying the indicators of interest, I can find the metrics for each indicator type

In [189]:
#keep the columns that are relevant for further transformations from the indicators dataset
indicatorsData = indicators \
    .withColumnRenamed("Indicator Code", "indicator_code") \
    .withColumnRenamed("Country Code", "wb_country_code") \
    .drop("Indicator Name") \
    .drop("Country Name") \
    .drop('_c64')

In [202]:
#joining indiscatorsData with AggIndicators by indicator_code
AggIndicatorsData = indicatorsData.join(
    AggIndicators, indicatorsData.indicator_code == AggIndicators.indicator_code) \
    .drop(AggIndicators.indicator_code)

#SQL equivalent
# AggIndicatorsData = """
# select 
#     *
# from 
#     AggIndicators
# inner join
#     indicatorsData on AggIndicators.indicator_code == indicatorsData.indicator_code

# """

showDF(AggIndicatorsData, truncate = False)

#number of rows should be the same as IndicatorsData
AggIndicatorsData.count()

Unnamed: 0,wb_country_code,indicator_code,1960,1961,1962,1963,1964,1965,1966,1967,...,2011,2012,2013,2014,2015,2016,2017,2018,2019,indicator_name
0,ARB,IT.NET.BBND,,,,,,,,,...,8570787.0,10323586.0,8880291.0,10801834.0,18080990.0,20176081.0,28386768.0,29798964.0,,Fixed broadband subscriptions
1,ARB,IT.CEL.SETS,,,,,,,,,...,351957968.0,381641858.0,407704505.0,415029432.0,419014000.0,417212258.0,416484374.0,392247081.0,,Mobile cellular subscriptions
2,ARB,SP.POP.0014.TO,39900284.0,41339703.0,42792877.0,44248946.0,45685285.0,47089854.0,48668380.0,50184681.0,...,121934515.0,124007632.0,126213709.0,128449757.0,130629500.0,133190635.0,135468682.0,137609185.0,,"Population ages 0-14, total"
3,ARB,SP.POP.1564.TO,49063244.0,50032198.0,51072095.0,52200457.0,53449205.0,54836642.0,56150911.0,57648467.0,...,226143306.0,231927215.0,237528428.0,242991668.0,248365400.0,253159511.0,258053167.0,263047867.0,,"Population ages 15-64, total"
4,ARB,SP.POP.65UP.TO,3234225.0,3352609.0,3469470.0,3584776.0,3698270.0,3809935.0,3939319.0,4066216.0,...,15080882.0,15508701.0,15963579.0,16466321.0,17033360.0,17674286.0,18377118.0,19133535.0,,"Population ages 65 and above, total"
5,ARB,SP.POP.TOTL,92197753.0,94724510.0,97334442.0,100034179.0,102832760.0,105736431.0,108758610.0,111899364.0,...,363158703.0,371443547.0,379705719.0,387907748.0,396028300.0,404024433.0,411898965.0,419790588.0,,"Population, total"
6,CSS,IT.NET.BBND,,,,,,,,,...,498285.0,627251.0,674419.0,753368.0,897134.0,971854.0,1003591.0,978363.0,,Fixed broadband subscriptions
7,CSS,IT.CEL.SETS,,,,,,,,,...,7534114.0,7322041.0,7738401.0,7952805.0,8158698.0,8483364.0,8225309.0,7090967.0,,Mobile cellular subscriptions
8,CSS,SP.POP.0014.TO,1764314.0,1813811.0,1861239.0,1905671.0,1946435.0,1983381.0,2024505.0,2062380.0,...,1795979.0,1780360.0,1766078.0,1753161.0,1741356.0,1733031.0,1724190.0,1715606.0,,"Population ages 0-14, total"
9,CSS,SP.POP.1564.TO,2150293.0,2172957.0,2197694.0,2224438.0,2252268.0,2280020.0,2301103.0,2321443.0,...,4572902.0,4625875.0,4676979.0,4725564.0,4771270.0,4810378.0,4847980.0,4882863.0,,"Population ages 15-64, total"


1584

#### The table above is not optimal. Years should be listed as rows instead of columns in case more data will be added in the future. I will start by listing the different years in this dataset

In [412]:
years = [x for x in AggIndicatorsData.schema.names \
             if x != 'wb_country_code' and x != 'indicator_code' and x != 'indicator_name'] 

print(years)

['1960', '1961', '1962', '1963', '1964', '1965', '1966', '1967', '1968', '1969', '1970', '1971', '1972', '1973', '1974', '1975', '1976', '1977', '1978', '1979', '1980', '1981', '1982', '1983', '1984', '1985', '1986', '1987', '1988', '1989', '1990', '1991', '1992', '1993', '1994', '1995', '1996', '1997', '1998', '1999', '2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007', '2008', '2009', '2010', '2011', '2012', '2013', '2014', '2015', '2016', '2017', '2018', '2019']


In [413]:
#create a DataFrame with the same column names but no rows
indicatorsDF = indicatorsSample.filter('1 = 0')

#Iterate through the list of years and store each year as a row in the DataFrame created above
for indicatorYear in years:
    print("Processing indicators for " + indicatorYear)
    yearIndicatorDF = AggIndicatorsData \
        .select("wb_country_code",
                "indicator_code",
                F.lit(indicatorYear).alias("year"),
                F.col(indicatorYear).alias("indicator_value")) \
        .filter("indicator_value >= 0")
    indicatorsDF = indicatorsDF.union(yearIndicatorDF)

Processing indicators for 1960
Processing indicators for 1961
Processing indicators for 1962
Processing indicators for 1963
Processing indicators for 1964
Processing indicators for 1965
Processing indicators for 1966
Processing indicators for 1967
Processing indicators for 1968
Processing indicators for 1969
Processing indicators for 1970
Processing indicators for 1971
Processing indicators for 1972
Processing indicators for 1973
Processing indicators for 1974
Processing indicators for 1975
Processing indicators for 1976
Processing indicators for 1977
Processing indicators for 1978
Processing indicators for 1979
Processing indicators for 1980
Processing indicators for 1981
Processing indicators for 1982
Processing indicators for 1983
Processing indicators for 1984
Processing indicators for 1985
Processing indicators for 1986
Processing indicators for 1987
Processing indicators for 1988
Processing indicators for 1989
Processing indicators for 1990
Processing indicators for 1991
Processi

In [414]:
showDF(indicatorsDF)

#cache the dataset 
indicatorsDF.cache()

#number of rows
indicatorsDF.count()

Unnamed: 0,wb_country_code,indicator_code,year,indicator_value
0,ARB,SP.POP.0014.TO,1960,39900284.0
1,ARB,SP.POP.1564.TO,1960,49063244.0
2,ARB,SP.POP.65UP.TO,1960,3234225.0
3,ARB,SP.POP.TOTL,1960,92197753.0
4,CSS,SP.POP.0014.TO,1960,1764314.0
5,CSS,SP.POP.1564.TO,1960,2150293.0
6,CSS,SP.POP.65UP.TO,1960,168897.0
7,CSS,SP.POP.TOTL,1960,4194710.0
8,CEB,SP.POP.0014.TO,1960,26671417.0
9,CEB,SP.POP.1564.TO,1960,58135802.0


72541

#### Check the indicator counts per year

In [415]:
#Register the DataFrame as a global temporary view for SQL queries
indicatorsDF.createOrReplaceTempView("indicatorsDF")

In [254]:
indicatorCount = """

select
    year,
    count(1) as count
from
    indicatorsDF
group by
    year
order by
    year

"""

showDF(spark.sql(indicatorCount))

Unnamed: 0,year,count
0,1960,1174
1,1961,974
2,1962,974
3,1963,974
4,1964,974
5,1965,1174
6,1966,974
7,1967,974
8,1968,974
9,1969,974


#### Get yearly indicator totals and rename columns

In [416]:
yearPivot = indicatorsDF \
    .groupBy('year') \
    .pivot('indicator_code') \
    .sum('indicator_value') \
    .orderBy('year') \
    .withColumnRenamed('IT.CEL.SETS', 'cellular_subscriptions') \
    .withColumnRenamed('IT.NET.BBND', 'broadband_subscriptions') \
    .withColumnRenamed('SP.POP.0014.TO', 'population_age_0_to_14') \
    .withColumnRenamed('SP.POP.1564.TO', 'population_age_15_64') \
    .withColumnRenamed('SP.POP.65UP.TO', 'population_age_65_and_above') \
    .withColumnRenamed('SP.POP.TOTL', 'population')

showDF(yearPivot)

Unnamed: 0,year,cellular_subscriptions,broadband_subscriptions,population_age_0_to_14,population_age_15_64,population_age_65_and_above,population
0,1960,0.0,,11654800000.0,17610820000.0,1417300000.0,30697260000.0
1,1961,,,11873280000.0,17771360000.0,1449380000.0,31108650000.0
2,1962,,,12163710000.0,17998330000.0,1481571000.0,31658630000.0
3,1963,,,12505600000.0,18306960000.0,1515232000.0,32343200000.0
4,1964,,,12821410000.0,18648500000.0,1547347000.0,33033060000.0
5,1965,0.0,,13106140000.0,19038490000.0,1579097000.0,33739930000.0
6,1966,,,13432820000.0,19402640000.0,1632302000.0,34484340000.0
7,1967,,,13709510000.0,19814640000.0,1685198000.0,35226330000.0
8,1968,,,13959210000.0,20267390000.0,1737935000.0,35981910000.0
9,1969,,,14213200000.0,20752480000.0,1791344000.0,36774800000.0


#### Inspect the schema

In [257]:
yearPivot.printSchema()

root
 |-- year: string (nullable = false)
 |-- cellular_subscriptions: double (nullable = true)
 |-- broadband_subscriptions: double (nullable = true)
 |-- population_age_0_to_14: double (nullable = true)
 |-- population_age_15_64: double (nullable = true)
 |-- population_age_65_and_above: double (nullable = true)
 |-- population: double (nullable = true)



### Data Quality Check

In [259]:
#cache the dataset
yearPivot.cache()

#number of rows
yearPivot.count()

59

#### Checking for negative values and if the population samples are greater than the entire population. The values below should be 0

In [262]:
yearPivot.filter('population_age_0_to_14 < 0').count()

0

In [263]:
yearPivot.filter('population_age_65_and_above > population').count()

0

In [264]:
yearPivot.filter('population_age_15_64 > population').count()

0

In [265]:
yearPivot.filter('population_age_0_to_14 > population').count()

0

In [266]:
yearPivot.filter('broadband_subscriptions < 0').count()

0

In [267]:
yearPivot.filter('cellular_subscriptions < 0').count()

0

In [268]:
yearPivot.filter('population < 0').count()

0

In [269]:
yearPivot.filter('population_age_65_and_above < 0').count()

0

In [270]:
yearPivot.filter('population_age_15_64 < 0').count()

0

In [271]:
yearPivot.filter('population_age_0_to_14 < 0').count()

0

In [417]:
yearPivot.filter('(population_age_0_to_14 + population_age_15_64 + population_age_65_and_above) > population').count()

0

#### Write yearPivot as a csv file to the folder

In [418]:
yearPivot \
    .select('year',
            F.col('population').cast(DecimalType(38, 2)),
            F.col('population_age_0_to_14').cast(DecimalType(38, 2)),
            F.col('population_age_15_64').cast(DecimalType(38, 2)),
            F.col('population_age_65_and_above').cast(DecimalType(38, 2)),
            F.col('broadband_subscriptions').cast(DecimalType(38, 2)),
            F.col('cellular_subscriptions').cast(DecimalType(38, 2))) \
    .coalesce(1) \
    .write.csv('./Downloads/WDI_csv/Final/yearlyStats', mode='overwrite', header='true')

#### Get yearly regional totals

In [371]:
#joining indicatorsDF with countryFinal by wb_country_code
regionalIndicators = indicatorsDF.join(countryFinal,
                                       indicatorsDF.wb_country_code == countryRenamed.wb_country_code, "inner") \
    .select(countryRenamed.region,
            indicatorsDF.wb_country_code,
            indicatorsDF.year,
            indicatorsDF.indicator_code,
            indicatorsDF.indicator_value)

#SQL equivalent
# select 
#     countryFinal.region,
#     indicatorsDF.wb_country_code,
#     indicatorsDF.year,
#     indicatorsDF.indicator_code,
#     indicatorsDF.indicator_value
# from 
#     countryFinal
# inner join
#     indicatorsDF on countryFinal.wb_country_code == indicatorsDF.wb_country_code

showDF(regionalIndicators)

Unnamed: 0,region,wb_country_code,year,indicator_code,indicator_value
0,,ARB,1960,SP.POP.0014.TO,39900284.0
1,,ARB,1960,SP.POP.1564.TO,49063244.0
2,,ARB,1960,SP.POP.65UP.TO,3234225.0
3,,ARB,1960,SP.POP.TOTL,92197753.0
4,,CSS,1960,SP.POP.0014.TO,1764314.0
5,,CSS,1960,SP.POP.1564.TO,2150293.0
6,,CSS,1960,SP.POP.65UP.TO,168897.0
7,,CSS,1960,SP.POP.TOTL,4194710.0
8,,CEB,1960,SP.POP.0014.TO,26671417.0
9,,CEB,1960,SP.POP.1564.TO,58135802.0


#### Pivot the table above so it is horizontal

In [372]:
regionalPivot = regionalIndicators \
    .groupBy('region', 'year') \
    .pivot('indicator_code') \
    .sum('indicator_value') \
    .orderBy('region', 'year')

showDF(regionalPivot)

Unnamed: 0,region,year,IT.CEL.SETS,IT.NET.BBND,SP.POP.0014.TO,SP.POP.1564.TO,SP.POP.65UP.TO,SP.POP.TOTL
0,,1960,,,10535080000.0,15867150000.0,1267373000.0,27682550000.0
1,,1961,,,10733030000.0,16011070000.0,1295999000.0,28053300000.0
2,,1962,,,10996910000.0,16215490000.0,1324760000.0,28550710000.0
3,,1963,,,11307830000.0,16493990000.0,1354860000.0,29170590000.0
4,,1964,,,11595090000.0,16802460000.0,1383554000.0,29795360000.0
5,,1965,,,11854110000.0,17155100000.0,1411916000.0,30435750000.0
6,,1966,,,12151370000.0,17484520000.0,1459688000.0,31110550000.0
7,,1967,,,12403210000.0,17857530000.0,1507189000.0,31783270000.0
8,,1968,,,12630680000.0,18267900000.0,1554569000.0,32468840000.0
9,,1969,,,12861920000.0,18707090000.0,1602500000.0,33187560000.0


#### Rename columns and write regionalPivot as a csv to the folder

In [373]:
#Write the regional-yearly totals to a CSV File
regionalPivot \
    .filter('region is not null') \
    .orderBy('region','year') \
    .withColumnRenamed('IT.CEL.SETS', 'cellular_subscriptions') \
    .withColumnRenamed('IT.NET.BBND', 'broadband_subscriptions') \
    .withColumnRenamed('SP.POP.0014.TO', 'population_age_0_to_14') \
    .withColumnRenamed('SP.POP.1564.TO', 'population_age_15_64') \
    .withColumnRenamed('SP.POP.65UP.TO', 'population_age_65_and_above') \
    .withColumnRenamed('SP.POP.TOTL', 'population') \
    .select('region',
            'year',
            F.col('population').cast(DecimalType(38, 2)),
            F.col('population_age_0_to_14').cast(DecimalType(38, 2)),
            F.col('population_age_15_64').cast(DecimalType(38, 2)),
            F.col('population_age_65_and_above').cast(DecimalType(38, 2)),
            F.col('broadband_subscriptions').cast(DecimalType(38, 2)),
            F.col('cellular_subscriptions').cast(DecimalType(38, 2))) \
    .coalesce(1) \
    .write.csv('./Downloads/WDI_csv/Final/regionalStats', mode='overwrite', header='true')

#### After finding regional statistics, I will do the same for countries

In [374]:
countryIndicators = indicatorsDF.join(countryFinal,
                                      indicatorsDF.wb_country_code == countryRenamed.wb_country_code, "inner") \
    .select(indicatorsDF.wb_country_code,
            countryRenamed.country_iso_code,
            countryRenamed.country_name,
            indicatorsDF.year,
            indicatorsDF.indicator_code,
            indicatorsDF.indicator_value)

# SQL equivalent
# select 
#     indicatorsDF.wb_country_code,
#     countryFinal.country_iso_code,
#     countryFinal.country_name,
#     indicatorsDF.year,
#     indicatorsDF.indicator_code,
#     indicatorsDF.indicator_value
# from 
#     countryFinal
# inner join
#     indicatorsDF on countryFinal.wb_country_code == indicatorsDF.wb_country_code

showDF(countryIndicators)

Unnamed: 0,wb_country_code,country_iso_code,country_name,year,indicator_code,indicator_value
0,ARB,1A,Arab World,1960,SP.POP.0014.TO,39900284.0
1,ARB,1A,Arab World,1960,SP.POP.1564.TO,49063244.0
2,ARB,1A,Arab World,1960,SP.POP.65UP.TO,3234225.0
3,ARB,1A,Arab World,1960,SP.POP.TOTL,92197753.0
4,CSS,S3,Caribbean small states,1960,SP.POP.0014.TO,1764314.0
5,CSS,S3,Caribbean small states,1960,SP.POP.1564.TO,2150293.0
6,CSS,S3,Caribbean small states,1960,SP.POP.65UP.TO,168897.0
7,CSS,S3,Caribbean small states,1960,SP.POP.TOTL,4194710.0
8,CEB,B8,Central Europe and the Baltics,1960,SP.POP.0014.TO,26671417.0
9,CEB,B8,Central Europe and the Baltics,1960,SP.POP.1564.TO,58135802.0


#### Pivot the table so it is horizontal

In [375]:
countryPivot = countryIndicators \
    .groupBy('country_iso_code', 'country_name', 'year') \
    .pivot('indicator_code') \
    .sum('indicator_value')

showDF(countryPivot)

Unnamed: 0,country_iso_code,country_name,year,IT.CEL.SETS,IT.NET.BBND,SP.POP.0014.TO,SP.POP.1564.TO,SP.POP.65UP.TO,SP.POP.TOTL
0,NG,Nigeria,2007,40395611.0,53594.0,64226660.0,78070740.0,4042581.0,146340000.0
1,AR,Argentina,2016,63719805.0,7223128.0,10932230.0,27924240.0,4733904.0,43590370.0
2,LS,Lesotho,1970,0.0,,466195.0,517678.0,45053.0,1028926.0
3,CD,Dem. Rep. Congo,1963,,,7154849.0,8829594.0,477387.0,16461830.0
4,FR,France,1971,,,12912230.0,32637160.0,6821956.0,52371340.0
5,GN,Guinea,1980,0.0,,2108557.0,2603388.0,159490.0,4871435.0
6,BA,Bosnia and Herzegovina,1990,0.0,,1078368.0,3091144.0,293911.0,4463423.0
7,4E,East Asia & Pacific (excluding high income),1994,3155452.0,,501177800.0,1101605000.0,93570036.0,1696475000.0
8,TD,Chad,2015,5465797.0,11337.0,6750220.0,7015783.0,344972.0,14110980.0
9,TL,Timor-Leste,2014,1375749.0,1028.0,470727.0,652339.0,51265.0,1174331.0


#### Rename columns and write countryPivot as a csv to the folder

In [376]:
countryPivot.filter('country_iso_code is not null') \
    .orderBy('country_iso_code','country_name', 'year') \
    .withColumnRenamed('IT.CEL.SETS', 'cellular_subscriptions') \
    .withColumnRenamed('IT.NET.BBND', 'broadband_subscriptions') \
    .withColumnRenamed('SP.POP.0014.TO', 'population_age_0_to_14') \
    .withColumnRenamed('SP.POP.1564.TO', 'population_age_15_64') \
    .withColumnRenamed('SP.POP.65UP.TO', 'population_age_65_and_above') \
    .withColumnRenamed('SP.POP.TOTL', 'population') \
    .select('country_iso_code',
            'country_name',
            'year',
            F.col('population').cast(DecimalType(38, 2)),
            F.col('population_age_0_to_14').cast(DecimalType(38, 2)),
            F.col('population_age_15_64').cast(DecimalType(38, 2)),
            F.col('population_age_65_and_above').cast(DecimalType(38, 2)),
            F.col('broadband_subscriptions').cast(DecimalType(38, 2)),
            F.col('cellular_subscriptions').cast(DecimalType(38, 2))) \
    .coalesce(1) \
    .write.csv('./Downloads/WDI_csv/Final/countryStats', mode='overwrite', header='true')

### Business Startup Metrics

To identify countries that are conductive to start a business, I will consider the most recent metrics for the following:
- Gross National Income (GNI)
- Cost of business start-up procedures
- Number of days required to start a business
- Number of start-up procedures to register a business
- GDP
- GDP per capita
- Business Regulatory Environment
- Ease of doing business index 

In [451]:
recentIndicators = indicatorsData \
    .select("wb_country_code", "indicator_code", "2018") \
    .filter(F.col('indicator_code').isin('IC.REG.COST.PC.ZS', 'IC.REG.DURS', 'IC.REG.PROC', \
        'NY.GNP.ATLS.CD', 'NY.GDP.MKTP.KD', 'NY.GDP.PCAP.KD', 'IQ.CPA.BREG.XQ', 'IC.BUS.EASE.XQ')) \
    .withColumnRenamed("2018", "indicator_value") \
    .withColumn("indicator_value", F.col("indicator_value").cast(DecimalType(38, 2)))

showDF(recentIndicators)

Unnamed: 0,wb_country_code,indicator_code,indicator_value
0,ARB,IC.REG.COST.PC.ZS,8.61
1,ARB,IQ.CPA.BREG.XQ,2.9
2,ARB,IC.BUS.EASE.XQ,
3,ARB,NY.GDP.MKTP.KD,2708687562213.17
4,ARB,NY.GDP.PCAP.KD,6452.47
5,ARB,NY.GNP.ATLS.CD,2672906114477.03
6,ARB,IC.REG.PROC,7.32
7,ARB,IC.REG.DURS,20.67
8,CSS,IC.REG.COST.PC.ZS,
9,CSS,IQ.CPA.BREG.XQ,3.4


In [452]:
businessIndexIndicators = indicatorsData \
    .select("wb_country_code", "indicator_code", "2019") \
    .filter(F.col('indicator_code').isin('IC.BUS.EASE.XQ')) \
    .withColumnRenamed("2019", "indicator_value") \
    .withColumn("indicator_value", F.col("indicator_value").cast(DecimalType(38, 2)))

showDF(businessIndexIndicators)

Unnamed: 0,wb_country_code,indicator_code,indicator_value
0,ARB,IC.BUS.EASE.XQ,
1,CSS,IC.BUS.EASE.XQ,
2,CEB,IC.BUS.EASE.XQ,
3,EAR,IC.BUS.EASE.XQ,
4,EAS,IC.BUS.EASE.XQ,
5,EAP,IC.BUS.EASE.XQ,
6,TEA,IC.BUS.EASE.XQ,
7,EMU,IC.BUS.EASE.XQ,
8,ECS,IC.BUS.EASE.XQ,
9,ECA,IC.BUS.EASE.XQ,


In [453]:
allIndicators = recentIndicators.union(businessIndexIndicators)
showDF(allIndicators)

Unnamed: 0,wb_country_code,indicator_code,indicator_value
0,ARB,IC.REG.COST.PC.ZS,8.61
1,ARB,IQ.CPA.BREG.XQ,2.9
2,ARB,IC.BUS.EASE.XQ,
3,ARB,NY.GDP.MKTP.KD,2708687562213.17
4,ARB,NY.GDP.PCAP.KD,6452.47
5,ARB,NY.GNP.ATLS.CD,2672906114477.03
6,ARB,IC.REG.PROC,7.32
7,ARB,IC.REG.DURS,20.67
8,CSS,IC.REG.COST.PC.ZS,
9,CSS,IQ.CPA.BREG.XQ,3.4


#### Rename and columns and filter out null values

In [454]:
countryBusinessStartupPivot = allIndicators.join(countryFinal
                                       , recentIndicators.wb_country_code == countryRenamed.wb_country_code
                                       , "inner") \
    .select(countryFinal.country_iso_code
            , countryFinal.country_name
            , recentIndicators.indicator_code
            , recentIndicators.indicator_value) \
    .groupBy('country_iso_code', 'country_name').pivot('indicator_code').sum('indicator_value') \
    .withColumnRenamed('NY.GNP.ATLS.CD', 'GNI') \
    .withColumnRenamed('IC.REG.DURS', 'Startup Time') \
    .withColumnRenamed('IC.REG.PROC', 'Startup Procedures') \
    .withColumnRenamed('IC.REG.COST.PC.ZS', 'Startup Cost Pct of GNI') \
    .withColumnRenamed('NY.GDP.MKTP.KD', 'GDP') \
    .withColumnRenamed('NY.GDP.PCAP.KD', 'GDP Per Capita') \
    .withColumnRenamed('IQ.CPA.BREG.XQ', 'Business Regulation') \
    .withColumnRenamed('IC.BUS.EASE.XQ', 'Ease of business') \
    .withColumn('Startup Cost', (F.col('GNI') * F.col('Startup Cost Pct of GNI') / F.lit(100.0)).cast(DecimalType(38, 2))) \
    .filter(F.col('GNI') > 0) \
    .filter(F.col('Startup Time').isNotNull()) \
    .filter(F.col('Startup Procedures').isNotNull()) \
    .filter(F.col('Startup Cost').isNotNull())

showDF(countryBusinessStartupPivot)

Unnamed: 0,country_iso_code,country_name,Ease of business,Startup Cost Pct of GNI,Startup Time,Startup Procedures,Business Regulation,GDP,GDP Per Capita,GNI,Startup Cost
0,BJ,Benin,149.0,5.2,8.5,6.0,3.5,10304371091.9,897.2,10029263429.42,521521698.33
1,XC,Euro area,,9.57,10.45,5.32,,13996985489820.4,40952.82,13252466851547.4,1268261077693.09
2,LY,Libya,186.0,26.6,35.0,10.0,,50334718705.94,7536.75,42771026208.7,11377092971.51
3,KZ,Kazakhstan,25.0,0.0,5.0,5.0,,204067060294.54,11165.54,147572879547.56,0.0
4,JM,Jamaica,71.0,0.0,3.0,2.0,,14249490164.85,4855.26,14582302023.57,0.0
5,NO,Norway,9.0,4.6,4.0,4.0,,489331167946.42,92077.57,428375207614.42,19705259550.26
6,AG,Antigua and Barbuda,113.0,0.0,22.0,9.0,,1457277455.93,15134.88,1530376688.84,0.0
7,CG,Congo,180.0,2.9,49.5,11.0,2.0,13906481140.14,2651.7,8613989208.89,249805687.06
8,AR,Argentina,126.0,0.0,11.0,11.0,,446880911357.64,10043.51,551170581779.71,0.0
9,HT,Haiti,179.0,12.4,97.0,12.0,1.5,8123218871.2,730.3,8913582852.36,1105284273.69


#### Save countryBusinessStartupPivot as a csv file to the folder

In [455]:
countryBusinessStartupPivot \
    .select("country_iso_code", "country_name", "GDP", "GDP Per Capita", "GNI", \
            "Startup Cost", "Startup Cost Pct of GNI", "Startup Time", "Startup Procedures", \
            "Business Regulation", "Ease of business") \
    .coalesce(1) \
    .write.csv('./Downloads/WDI_csv/Final/StartupData', mode='overwrite', header='true')