# Data Analysis using Spark
### Data Engineering Capstone Project

#### Project Summary
Goal: Construct a pipeline for building a data lake in S3 with spark for World Development Indicators Data.
The purpose is to make use of technologies learnt in this course and query some useful information out of this project


## Importing libraries required in this project

In [1]:
import os
import configparser
import pandas
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import col

In [2]:
import pyspark.sql.functions as f
from pyspark.sql.functions import udf, monotonically_increasing_id, col
from pyspark.sql import types as t

## Configure AWS credentials to use S3.

In [328]:
config = configparser.ConfigParser()

config.read_file(open('config.cfg'))

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

country_data_path = config['S3']['COUNTRY_DATA']
country_notes_path = config['S3']['COUNTRY_NOTES_DATA']
foot_notes_path = config['S3']['FOOT_NOTES_DATA']
series_path = config['S3']['SERIES_DATA']
series_notes_path = config['S3']['SERIES_NOTES_DATA']
indicators_path = config['S3']['INDICATORS_DATA']
output_path = config['S3']['OUTPUT']


## Setting up enviornment 

In [4]:
os.environ['PYSPARK_PYTHON']='/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON']='/usr/bin/python3'

## Build spark session

In [5]:
spark = SparkSession.builder.config('spark.jars.packages', 
                                    'org.apache.hadoop:hadoop-aws:2.7.0').getOrCreate()

In [7]:
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key",
                                     os.environ['AWS_ACCESS_KEY_ID'])
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", 
                                     os.environ['AWS_SECRET_ACCESS_KEY'])
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

### Step 1: Scope the Project and Gather Data

#### Scope 

Basically, my goal is to apply what I have learnt in this entire course. Data modeling, Data cleansing and querying some meaningful information out of it. 
I plan to use spark for this process since this is very large dataset and spark performs best in these situations. Also Building a Data lake in S3

#### Describe and Gather Data 

- Dataset: I choose multiple datasets in this project:
1. Worlds Development Indicators:  
2. Country Data: 
3. Series Data: 
- World Bank contain over a thousand annual indicators of economic development from hundreds of countries around the world. It presents the most current and accurate global development data available, and includes national, regional and global estimates. The dataset contains a wealth of information on country demographics, land use, economies, education, and a vast array of other measures of well-being and development
* Source: https://www.kaggle.com/

### Read the data which comes from AWS s3 Bucket

In [329]:
indicators_data = spark.read.csv(indicators_path, header=True)
country_data = spark.read.csv(country_data_path, header=True)
series_data = spark.read.csv(series_path, header=True)

### Load the datasets

In [330]:
# Read in the data here
indicators_data.distinct().limit(5).toPandas()

Unnamed: 0,CountryName,CountryCode,IndicatorName,IndicatorCode,Year,Value
0,Caribbean small states,CSS,CO2 emissions (metric tons per capita),EN.ATM.CO2E.PC,1960,1.4026407782743702
1,Caribbean small states,CSS,Merchandise exports to economies in the Arab W...,TX.VAL.MRCH.AL.ZS,1960,0.0886682035821954
2,East Asia & Pacific (developing only),EAP,"Population, ages 0-14 (% of total)",SP.POP.0014.TO.ZS,1960,40.3325267343913
3,Europe & Central Asia (all income levels),ECS,"Net bilateral aid flows from DAC donors, Austr...",DC.DAC.AUTL.CD,1960,700000.0
4,European Union,EUU,"Net bilateral aid flows from DAC donors, Unite...",DC.DAC.GBRL.CD,1960,34350000.0


In [331]:
#Counting number of rows
indicators_data.count()

5656458

In [53]:
country_data.limit(5).toPandas()

Unnamed: 0,CountryCode,ShortName,TableName,LongName,Alpha2Code,CurrencyUnit,SpecialNotes,Region,IncomeGroup,Wb2Code,...,GovernmentAccountingConcept,ImfDataDisseminationStandard,LatestPopulationCensus,LatestHouseholdSurvey,SourceOfMostRecentIncomeAndExpenditureData,VitalRegistrationComplete,LatestAgriculturalCensus,LatestIndustrialData,LatestTradeData,LatestWaterWithdrawalData
0,AFG,Afghanistan,Afghanistan,Islamic State of Afghanistan,AF,Afghan afghani,Fiscal year end: March 20; reporting period fo...,South Asia,Low income,AF,...,Consolidated central government,General Data Dissemination System (GDDS),1979,"Multiple Indicator Cluster Survey (MICS), 2010/11","Integrated household survey (IHS), 2008",,2013/14,,2013.0,2000.0
1,ALB,Albania,Albania,Republic of Albania,AL,Albanian lek,,Europe & Central Asia,Upper middle income,AL,...,Budgetary central government,General Data Dissemination System (GDDS),2011,"Demographic and Health Survey (DHS), 2008/09",Living Standards Measurement Study Survey (LSM...,Yes,2012,2011.0,2013.0,2006.0
2,DZA,Algeria,Algeria,People's Democratic Republic of Algeria,DZ,Algerian dinar,,Middle East & North Africa,Upper middle income,DZ,...,Budgetary central government,General Data Dissemination System (GDDS),2008,"Multiple Indicator Cluster Survey (MICS), 2012","Integrated household survey (IHS), 1995",,,2010.0,2013.0,2001.0
3,ASM,American Samoa,American Samoa,American Samoa,AS,U.S. dollar,,East Asia & Pacific,Upper middle income,AS,...,,,2010,,,Yes,2007,,,
4,ADO,Andorra,Andorra,Principality of Andorra,AD,Euro,,Europe & Central Asia,High income: nonOECD,AD,...,,,2011. Population data compiled from administra...,,,Yes,,,2006.0,


In [22]:
#Counting number of rows
country_data.count()

247

In [333]:
series_data.limit(5).toPandas()


Unnamed: 0,SeriesCode,Topic,IndicatorName,ShortDefinition,LongDefinition,UnitOfMeasure,Periodicity,BasePeriod,OtherNotes,AggregationMethod,LimitationsAndExceptions,NotesFromOriginalSource,GeneralComments,Source,StatisticalConceptAndMethodology,DevelopmentRelevance,RelatedSourceLinks,OtherWebLinks,RelatedIndicators,LicenseType
0,BN.KLT.DINV.CD,Economic Policy & Debt: Balance of payments: C...,"Foreign direct investment, net (BoP, current US$)",,Foreign direct investment are the net inflows ...,,Annual,,,,,,Note: Data are based on the sixth edition of t...,"International Monetary Fund, Balance of Paymen...",,,,,,Open
1,BX.KLT.DINV.WD.GD.ZS,Economic Policy & Debt: Balance of payments: C...,"Foreign direct investment, net inflows (% of GDP)",,Foreign direct investment are the net inflows ...,,Annual,,,Weighted average,FDI data do not give a complete picture of int...,,,,,,,,,
2,The volume of global private financial flows r...,classification of economies,and method used to adjust and disaggregate re...,particularly for debt financing,differences may also reflect how some install...,,,,,,,,,,,,,,,
3,Data on equity flows are shown for all countri...,,Note: Data starting from 2005 are based on the...,"International Monetary Fund, International Fin...",Data on equity flows are based on balance of p...,,,,,,,,,,,,,,,
4,The internationally accepted definition of FDI...,includes the following components: equity inv...,including investment associated with equity t...,based on control and influence. Distinguished...,FDI is made to establish a lasting interest i...,manufacturing facilities,and other permanent or long-term organization...,where the investor starts a new venture in a ...,where the investor enters into a partnership ...,where the investor acquires an existing enter...,and the definition of long-term loans differs...,Private financial flows - equity and debt - ac...,,,,Open,,,,


In [334]:
series_data.count()

3926

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.


In [27]:
# Checking for Null values in all the columns of indicators_data table
for col in list(indicators_data.columns):
    print(col, indicators_data.filter((indicators_data[col] == "") | indicators_data[col].isNull()).count())


CountryName 0
CountryCode 0
IndicatorName 0
IndicatorCode 0
Year 0
Value 0


In [28]:
# Checking for Null values in all the columns of country_data table
for col in list(country_data.columns):
    print(col, country_data.filter((country_data[col] == "") | country_data[col].isNull()).count())

CountryCode 0
ShortName 0
TableName 0
LongName 0
Alpha2Code 3
CurrencyUnit 33
SpecialNotes 83
Region 33
IncomeGroup 33
Wb2Code 1
NationalAccountsBaseYear 42
NationalAccountsReferenceYear 192
SnaPriceValuation 49
LendingCategory 103
OtherGroups 188
SystemOfNationalAccounts 33
AlternativeConversionFactor 199
PppSurveyYear 57
BalanceOfPaymentsManualInUse 66
ExternalDebtReportingStatus 124
SystemOfTrade 47
GovernmentAccountingConcept 86
ImfDataDisseminationStandard 64
LatestPopulationCensus 34
LatestHouseholdSurvey 100
SourceOfMostRecentIncomeAndExpenditureData 89
VitalRegistrationComplete 134
LatestAgriculturalCensus 105
LatestIndustrialData 134
LatestTradeData 62
LatestWaterWithdrawalData 67


In [30]:
# Checking for Null values in all the columns of series_data table
for col in list(series_data.columns):
    print(col, series_data.filter((series_data[col] == "") | series_data[col].isNull()).count())

SeriesCode 0
Topic 315
IndicatorName 711
ShortDefinition 2292
LongDefinition 1325
UnitOfMeasure 2864
Periodicity 1680
BasePeriod 3303
OtherNotes 3495
AggregationMethod 2589
LimitationsAndExceptions 2956
NotesFromOriginalSource 3764
GeneralComments 3627
Source 2932
StatisticalConceptAndMethodology 3491
DevelopmentRelevance 3791
RelatedSourceLinks 3852
OtherWebLinks 3897
RelatedIndicators 3897
LicenseType 3264


## Cleaning Steps
### Cleaning data as required for this project
1. Cleaning indicators data whose countryname coloumn has invalid entries. 
2. Cleaning Series data specified column which has null values
3. Cleaning Country data specified column which has null values

* Note: Since, indicators data has 0 null values in all the columns, Therefore no need to clean null values.

In [64]:
#These are the list of names which are not countries and are present in countryName column 
list = ['Arab World', 'Caribbean small states', 'Central Europe and the Baltics',
 'East Asia & Pacific (all income levels)',
 'East Asia & Pacific (developing only)', 'Euro area',
 'Europe & Central Asia (all income levels)',
 'Europe & Central Asia (developing only)', 'European Union',
 'Fragile and conflict affected situations',
 'Heavily indebted poor countries (HIPC)', 'High income',
 'High income: nonOECD', 'High income: OECD',
 'Latin America & Caribbean (all income levels)',
 'Latin America & Caribbean (developing only)',
 'Least developed countries: UN classification', 'Low & middle income',
 'Low income', 'Lower middle income',
 'Middle East & North Africa (all income levels)',
 'Middle East & North Africa (developing only)', 'Middle income',
 'North America' 'OECD members' ,'Other small states',
 'Pacific island small states', 'Small states', 'South Asia',
 'Sub-Saharan Africa (all income levels)',
 'Sub-Saharan Africa (developing only)' ,'Upper middle income' ,'World', 'North America', 'OECD members', 'Russian Federation']

In [339]:
#Filtering out the invalid names in the CountryName column
indicators_data_clean = indicators_data.filter(col("CountryName").isin(list))
indicators_data_clean.distinct().limit(5).toPandas()

Unnamed: 0,CountryName,CountryCode,IndicatorName,IndicatorCode,Year,Value
0,Caribbean small states,CSS,CO2 emissions (metric tons per capita),EN.ATM.CO2E.PC,1960,1.4026407782743702
1,Caribbean small states,CSS,Merchandise exports to economies in the Arab W...,TX.VAL.MRCH.AL.ZS,1960,0.0886682035821954
2,East Asia & Pacific (developing only),EAP,"Population, ages 0-14 (% of total)",SP.POP.0014.TO.ZS,1960,40.3325267343913
3,Europe & Central Asia (all income levels),ECS,"Net bilateral aid flows from DAC donors, Austr...",DC.DAC.AUTL.CD,1960,700000.0
4,European Union,EUU,"Net bilateral aid flows from DAC donors, Unite...",DC.DAC.GBRL.CD,1960,34350000.0


In [449]:
#filtering out specified coloumn with null values
series_data_clean = series_data.filter(col('LongDefinition').isNotNull() | col('IndicatorName').isNotNull())
# series_data_clean = series_data_clean.filter(series_data_clean['IndicatorName'].isNotNull() | series_data_clean['LongDefinition'].isNotNull())
series_data_clean.limit(5).toPandas()


Unnamed: 0,SeriesCode,Topic,IndicatorName,ShortDefinition,LongDefinition,UnitOfMeasure,Periodicity,BasePeriod,OtherNotes,AggregationMethod,LimitationsAndExceptions,NotesFromOriginalSource,GeneralComments,Source,StatisticalConceptAndMethodology,DevelopmentRelevance,RelatedSourceLinks,OtherWebLinks,RelatedIndicators,LicenseType
0,BN.KLT.DINV.CD,Economic Policy & Debt: Balance of payments: C...,"Foreign direct investment, net (BoP, current US$)",,Foreign direct investment are the net inflows ...,,Annual,,,,,,Note: Data are based on the sixth edition of t...,"International Monetary Fund, Balance of Paymen...",,,,,,Open
1,BX.KLT.DINV.WD.GD.ZS,Economic Policy & Debt: Balance of payments: C...,"Foreign direct investment, net inflows (% of GDP)",,Foreign direct investment are the net inflows ...,,Annual,,,Weighted average,FDI data do not give a complete picture of int...,,,,,,,,,
2,The volume of global private financial flows r...,classification of economies,and method used to adjust and disaggregate re...,particularly for debt financing,differences may also reflect how some install...,,,,,,,,,,,,,,,
3,Data on equity flows are shown for all countri...,,Note: Data starting from 2005 are based on the...,"International Monetary Fund, International Fin...",Data on equity flows are based on balance of p...,,,,,,,,,,,,,,,
4,The internationally accepted definition of FDI...,includes the following components: equity inv...,including investment associated with equity t...,based on control and influence. Distinguished...,FDI is made to establish a lasting interest i...,manufacturing facilities,and other permanent or long-term organization...,where the investor starts a new venture in a ...,where the investor enters into a partnership ...,where the investor acquires an existing enter...,and the definition of long-term loans differs...,Private financial flows - equity and debt - ac...,,,,Open,,,,


In [336]:
#filtering out specified coloumn with null values
country_data_clean = country_data.filter(col('Region').isNotNull() | col('IncomeGroup').isNotNull())
country_data_clean.limit(5).toPandas()

Unnamed: 0,CountryCode,ShortName,TableName,LongName,Alpha2Code,CurrencyUnit,SpecialNotes,Region,IncomeGroup,Wb2Code,...,GovernmentAccountingConcept,ImfDataDisseminationStandard,LatestPopulationCensus,LatestHouseholdSurvey,SourceOfMostRecentIncomeAndExpenditureData,VitalRegistrationComplete,LatestAgriculturalCensus,LatestIndustrialData,LatestTradeData,LatestWaterWithdrawalData
0,AFG,Afghanistan,Afghanistan,Islamic State of Afghanistan,AF,Afghan afghani,Fiscal year end: March 20; reporting period fo...,South Asia,Low income,AF,...,Consolidated central government,General Data Dissemination System (GDDS),1979,"Multiple Indicator Cluster Survey (MICS), 2010/11","Integrated household survey (IHS), 2008",,2013/14,,2013.0,2000.0
1,ALB,Albania,Albania,Republic of Albania,AL,Albanian lek,,Europe & Central Asia,Upper middle income,AL,...,Budgetary central government,General Data Dissemination System (GDDS),2011,"Demographic and Health Survey (DHS), 2008/09",Living Standards Measurement Study Survey (LSM...,Yes,2012,2011.0,2013.0,2006.0
2,DZA,Algeria,Algeria,People's Democratic Republic of Algeria,DZ,Algerian dinar,,Middle East & North Africa,Upper middle income,DZ,...,Budgetary central government,General Data Dissemination System (GDDS),2008,"Multiple Indicator Cluster Survey (MICS), 2012","Integrated household survey (IHS), 1995",,,2010.0,2013.0,2001.0
3,ASM,American Samoa,American Samoa,American Samoa,AS,U.S. dollar,,East Asia & Pacific,Upper middle income,AS,...,,,2010,,,Yes,2007,,,
4,ADO,Andorra,Andorra,Principality of Andorra,AD,Euro,,Europe & Central Asia,High income: nonOECD,AD,...,,,2011. Population data compiled from administra...,,,Yes,,,2006.0,


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
#### Map out the conceptual data model and explain why you chose that model

After loading, accessing and preprocessing the multiple data sources, we need to create data model connecting relations between them using Primary Key, Foreign key constraints. In my case the Primary key is IndicatorsName and Country Code. Which connects the three tables with meaningfull relations.

| Table | Description |
--- | ---
| indicators_data      | Staging table for indicators
| country_data         | Staging table for country data
| series_data          | Staging table for Series data
| indicators_table    | Fact table containing world development information derived from the staging tables mentioned above ie; country_data and series_data 
| series_table        | Dimension table containing Long Description of Indicator Codes derived from Indicators_data
| country_table       | Dimension table containing data country code, region and IncomeGroup data derived from country_data

Using the data model above we can finally answer query's like which country has the higest population, which country Annual GDP rate was higest in particular year or Which IncomeGroup category the country belong to and many more.

#### The idea behind this pipeline is to combine different sources of data for the following purposes:
1. Identifying Rich Countries
2. GDP growth of India in selected years range ?
3. Comparing GDP anual growth of India with other countries.
4. % of firms experiencing at least one bribe payment request.

#### 3.2 Mapping Out Data Pipelines
    - The Facts table would be the indicators table
    - Dimension table would be country table, series table
    - Will be Mapping indicators table with seies and country table.
    

In [305]:
country_data_clean.createOrReplaceTempView('country_data_clean')

In [210]:
indicators_data_clean.createOrReplaceTempView('indicators_data_clean')

In [438]:
# Identifying Rich countries with specific indicatorCode and year 1982 and above and displaying top 10 rows with higesh values
rich_countries = spark.sql(""" SELECT  Distinct(CountryName), Value, Year
                            FROM indicators_data_clean
                            WHERE IndicatorCode = 'SP.ADO.TFRT'
                            AND Year >= 1982
                            ORDER BY cast(Value as int) DESC""")
rich_countries.distinct().limit(10).toPandas()

Unnamed: 0,CountryName,Value,Year
0,Niger,203.604,2014
1,Cameroon,196.1244,1983
2,Liberia,185.0282,1986
3,Malawi,161.423,2000
4,Liberia,159.5558,1993
5,"Gambia, The",158.3904,1991
6,Belize,151.2,1982
7,Cameroon,147.0562,2003
8,Benin,123.8332,1988
9,Nepal,117.7428,2000


In [346]:
#GDP growth of India Year Wise

GDP_growth = spark.sql("""SELECT Year, Value, IndicatorCode, (CountryCode)
               FROM indicators_data_clean
               WHERE IndicatorName='GDP growth (annual %)'
               AND CountryName = 'India'
               AND Year>=1990 AND Year <= 2010
              ORDER BY Year""")
GDP_growth.limit(20).toPandas()

Unnamed: 0,Year,Value,IndicatorCode,CountryCode
0,1990,5.53345456321573,NY.GDP.MKTP.KD.ZG,IND
1,1991,1.0568314322797498,NY.GDP.MKTP.KD.ZG,IND
2,1992,5.48239602225368,NY.GDP.MKTP.KD.ZG,IND
3,1993,4.75077621944588,NY.GDP.MKTP.KD.ZG,IND
4,1994,6.6589240670105,NY.GDP.MKTP.KD.ZG,IND
5,1995,7.57449184104719,NY.GDP.MKTP.KD.ZG,IND
6,1996,7.54952224818398,NY.GDP.MKTP.KD.ZG,IND
7,1997,4.04982084912471,NY.GDP.MKTP.KD.ZG,IND
8,1998,6.18441582075693,NY.GDP.MKTP.KD.ZG,IND
9,1999,8.84575556124574,NY.GDP.MKTP.KD.ZG,IND


In [434]:
#Comparing and displaying GDP anual growth of 3 countries ordered by highest value first in year (2000 and above)
GDP_growth = spark.sql("""SELECT  (CountryName), Year, Value, IndicatorCode
               FROM indicators_data_clean
               WHERE IndicatorName='GDP growth (annual %)'
               AND CountryName IN ("India", "Belgium" ,"United States")
               AND Year>=2000
              ORDER BY Year, cast(Value as int) """)
GDP_growth.toPandas()


Unnamed: 0,CountryName,Year,Value,IndicatorCode
0,Belgium,2000,3.63381720812023,NY.GDP.MKTP.KD.ZG
1,India,2000,3.84099115681246,NY.GDP.MKTP.KD.ZG
2,United States,2000,4.09217644881066,NY.GDP.MKTP.KD.ZG
3,Belgium,2001,0.811463997377572,NY.GDP.MKTP.KD.ZG
4,United States,2001,0.975981833932124,NY.GDP.MKTP.KD.ZG
5,India,2001,4.82396626420316,NY.GDP.MKTP.KD.ZG
6,Belgium,2002,1.78045214903302,NY.GDP.MKTP.KD.ZG
7,United States,2002,1.78612768745552,NY.GDP.MKTP.KD.ZG
8,India,2002,3.8039753212355696,NY.GDP.MKTP.KD.ZG
9,Belgium,2003,0.774536137034218,NY.GDP.MKTP.KD.ZG


In [431]:
## % of firms experiencing at least one bribe payment request.
bribery = spark.sql("""SELECT CountryName, Value, Region, IncomeGroup
               FROM indicators_data_clean as i
               LEFT JOIN country_table as c
               ON i.CountryCode = c.CountryCode 
               WHERE IndicatorCode='IC.FRM.BRIB.ZS'
               AND Year >=1990 AND Year <= 2010
              ORDER BY cast(Value as int) DESC""")



In [435]:
bribery.limit(10).toPandas()

Unnamed: 0,CountryName,Value,Region,IncomeGroup
0,Cameroon,70.4,Sub-Saharan Africa,Lower middle income
1,Liberia,70.5,Sub-Saharan Africa,Low income
2,Syrian Arab Republic,69.6,Middle East & North Africa,Lower middle income
3,"Congo, Dem. Rep.",68.9,Sub-Saharan Africa,Low income
4,"Yemen, Rep.",68.9,Middle East & North Africa,Lower middle income
5,Bangladesh,60.4,South Asia,Lower middle income
6,Pakistan,60.2,South Asia,Lower middle income
7,Guinea,60.7,Sub-Saharan Africa,Low income
8,Cambodia,59.5,East Asia & Pacific,Low income
9,"Congo, Dem. Rep.",57.2,Sub-Saharan Africa,Low income


### The above results prove that most of the top countries belong to Low income or Lower middle income category who are inolved in bribery.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

## Building Series Table

In [109]:
# Write code here
series_data.printSchema()

root
 |-- SeriesCode: string (nullable = true)
 |-- Topic: string (nullable = true)
 |-- IndicatorName: string (nullable = true)
 |-- ShortDefinition: string (nullable = true)
 |-- LongDefinition: string (nullable = true)
 |-- UnitOfMeasure: string (nullable = true)
 |-- Periodicity: string (nullable = true)
 |-- BasePeriod: string (nullable = true)
 |-- OtherNotes: string (nullable = true)
 |-- AggregationMethod: string (nullable = true)
 |-- LimitationsAndExceptions: string (nullable = true)
 |-- NotesFromOriginalSource: string (nullable = true)
 |-- GeneralComments: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- StatisticalConceptAndMethodology: string (nullable = true)
 |-- DevelopmentRelevance: string (nullable = true)
 |-- RelatedSourceLinks: string (nullable = true)
 |-- OtherWebLinks: string (nullable = true)
 |-- RelatedIndicators: string (nullable = true)
 |-- LicenseType: string (nullable = true)



In [450]:
series_data_clean.createOrReplaceTempView('series_data_clean')


In [289]:
series_table = spark.sql(""" SELECT IndicatorName as Indicator, LongDefinition
                        FROM series_data_clean
                        """)
series_table.printSchema()

root
 |-- Indicator: string (nullable = true)
 |-- LongDefinition: string (nullable = true)



In [348]:
series_table.limit(5).toPandas()

Unnamed: 0,Indicator,LongDefinition
0,"Foreign direct investment, net (BoP, current US$)",Foreign direct investment are the net inflows ...
1,"Foreign direct investment, net inflows (% of GDP)",Foreign direct investment are the net inflows ...
2,and method used to adjust and disaggregate re...,differences may also reflect how some install...
3,Note: Data starting from 2005 are based on the...,Data on equity flows are based on balance of p...
4,including investment associated with equity t...,FDI is made to establish a lasting interest i...


## Building Country Table

In [267]:
country_data.printSchema()

root
 |-- CountryCode: string (nullable = true)
 |-- ShortName: string (nullable = true)
 |-- TableName: string (nullable = true)
 |-- LongName: string (nullable = true)
 |-- Alpha2Code: string (nullable = true)
 |-- CurrencyUnit: string (nullable = true)
 |-- SpecialNotes: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- IncomeGroup: string (nullable = true)
 |-- Wb2Code: string (nullable = true)
 |-- NationalAccountsBaseYear: string (nullable = true)
 |-- NationalAccountsReferenceYear: string (nullable = true)
 |-- SnaPriceValuation: string (nullable = true)
 |-- LendingCategory: string (nullable = true)
 |-- OtherGroups: string (nullable = true)
 |-- SystemOfNationalAccounts: string (nullable = true)
 |-- AlternativeConversionFactor: string (nullable = true)
 |-- PppSurveyYear: string (nullable = true)
 |-- BalanceOfPaymentsManualInUse: string (nullable = true)
 |-- ExternalDebtReportingStatus: string (nullable = true)
 |-- SystemOfTrade: string (nullable = true)


In [317]:
country_table = spark.sql(""" SELECT CountryCode, Region, IncomeGroup
                        FROM country_data_clean
                        """)
country_table.printSchema()


root
 |-- CountryCode: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- IncomeGroup: string (nullable = true)



In [350]:
country_table.limit(5).toPandas()

Unnamed: 0,CountryCode,Region,IncomeGroup
0,AFG,South Asia,Low income
1,ALB,Europe & Central Asia,Upper middle income
2,DZA,Middle East & North Africa,Upper middle income
3,ASM,East Asia & Pacific,Upper middle income
4,ADO,Europe & Central Asia,High income: nonOECD


## Building Indicators table
### Left Join Indicators table with Series table to provide the long description for very indicator name

In [390]:
# indicators_table = spark.sql("""SELECT i.CountryName as Country,
#                                 i.CountryCode as Code,
#                                 i.IndicatorName as IndicatorName,
#                                 i.IndicatorCode as IndicatorCode,
#                                 i.Value as Value,
#                                 i.Year as Year,
#                                 s.LongDefinition as LongDefinition
#                                 FROM indicators_data_clean as i
#                                 LEFT JOIN series_table as s, country_table as c
#                                 ON i.IndicatorName = s.IndicatorName i.CountryCode = c.CountryCode
#                                 WHERE i.IndicatorName LIKE 'Population, total'
#                                 """)

# joined_df = indicators_data_clean.join(series_table, col('IndicatorName') == col('Indicator'), 'left') \
# .join(country_data_clean, col('CountryCode') == col('Code'), 'left') \
# .select('CountryName','CountryCode','IndicatorName','IndicatorCode','Value','Year', 'series_data_clean.LongDefinition', 'country_data_clean.Region', 'country_data_clean.IncomeGroup')

indicators_table = spark.sql("""SELECT i.CountryName as Country,
                                i.CountryCode as Code,
                                i.IndicatorName as IndicatorName,
                                i.IndicatorCode as IndicatorCode,
                                i.Value as Value,
                                i.Year as Year,
                                s.LongDefinition as LongDefinition
                                FROM indicators_data_clean as i
                                LEFT JOIN series_table as s
                                ON i.IndicatorName = s.IndicatorName
                                                                """)


In [316]:
indicators_table.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Code: string (nullable = true)
 |-- IndicatorName: string (nullable = true)
 |-- IndicatorCode: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- LongDefinition: string (nullable = true)



In [320]:
indicators_table.createOrReplaceTempView('indicators_table')

### After Combining with series table we can look at the Long Desicription for every rows

In [392]:
indicators_table.limit(10).toPandas()

Unnamed: 0,Country,Code,IndicatorName,IndicatorCode,Value,Year,LongDefinition
0,Afghanistan,AFG,Adjusted savings: net forest depletion (% of GNI),NY.ADJ.DFOR.GN.ZS,3.78890103077157,1973,Net forest depletion is calculated as the prod...
1,Afghanistan,AFG,"Age dependency ratio, old (% of working-age po...",SP.POP.DPND.OL,4.73881614634687,1981,"Age dependency ratio, old, is the ratio of old..."
2,Afghanistan,AFG,Agricultural raw materials exports (% of merch...,TX.VAL.AGRI.ZS.UN,33.6726530924795,1975,Agricultural raw materials comprise SITC secti...
3,Afghanistan,AFG,Agricultural raw materials exports (% of merch...,TX.VAL.AGRI.ZS.UN,36.8364501194565,1977,Agricultural raw materials comprise SITC secti...
4,Afghanistan,AFG,Exports of goods and services (current LCU),NE.EXP.GNFS.CN,10099999744.0,1973,Exports of goods and services represent the va...
5,Afghanistan,AFG,Exports of goods and services (current US$),NE.EXP.GNFS.CD,68888923.0222222,1963,Exports of goods and services represent the va...
6,Afghanistan,AFG,External balance on goods and services (curren...,NE.RSB.GNFS.CN,-699999616.0,1960,External balance on goods and services (former...
7,Afghanistan,AFG,Food exports (% of merchandise exports),TX.VAL.FOOD.ZS.UN,40.8758669399891,1967,Food comprises the commodities in SITC section...
8,Afghanistan,AFG,GDP (current LCU),NY.GDP.MKTP.CN,62999998464.0,1966,GDP at purchaser's prices is the sum of gross ...
9,Afghanistan,AFG,Gross capital formation (% of GDP),NE.GDI.TOTL.ZS,12.4153493357792,1977,"""Gross capital formation (formerly gross domes..."


### Query to display the Top 10 Country names with the higest world population in year 2010 and which region and IncomeGroup it belongs to.
### Mapping country table with indicator table to display the below results

In [388]:
Worldpolulation = spark.sql("""SELECT i.CountryName, i.Year, i.Value, c.CountryCode, c.Region, c.IncomeGroup
                                FROM indicators_data_clean as i
                                LEFT JOIN country_table as c
                                ON i.CountryCode = c.CountryCode 
                                WHERE Year = 2010
                                AND IndicatorCode = 'SP.POP.TOTL'
                                ORDER BY cast(Value as int) DESC
                                """)
Worldpolulation.limit(10).toPandas()

Unnamed: 0,CountryName,Year,Value,CountryCode,Region,IncomeGroup
0,China,2010,1337705000.0,CHN,East Asia & Pacific,Upper middle income
1,India,2010,1230984504.0,IND,South Asia,Lower middle income
2,United States,2010,309347057.0,USA,North America,High income: OECD
3,Indonesia,2010,241613126.0,IDN,East Asia & Pacific,Lower middle income
4,Brazil,2010,198614208.0,BRA,Latin America & Caribbean,Upper middle income
5,Pakistan,2010,170043918.0,PAK,South Asia,Lower middle income
6,Nigeria,2010,159424742.0,NGA,Sub-Saharan Africa,Lower middle income
7,Bangladesh,2010,151616777.0,BGD,South Asia,Lower middle income
8,Japan,2010,128070000.0,JPN,East Asia & Pacific,High income: OECD
9,Mexico,2010,118617542.0,MEX,Latin America & Caribbean,Upper middle income


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

## Indicator Data Quality check

In [396]:
# Perform quality checks here
Raw_data_indicators = indicators_data.count()
Processed_data = indicators_data_clean.count()
facts_table_data = indicators_table.count()

In [411]:
print(f"Raw_data_indicators: {Raw_data_indicators}")
print(f"Processed_data:  {Processed_data}")
print(f"facts_table_data: {facts_table_data}")
print("---------------------------------------------------------------------------------------------")
print(f"had {Raw_data_indicators-Processed_data} invalid values in CountryName column successfully removed all")
print(f"Data loss after creating fact table {Processed_data - facts_table_data} ")
print(f" This implies that we lost some of data after the successful query")

Raw_data_indicators: 5656458
Processed_data:  747288
facts_table_data: 4909170
---------------------------------------------------------------------------------------------
had 4909170 invalid values in CountryName column successfully removed all
Data loss after creating fact table -4161882 
 This implies that we lost some of data after the successful query


## Country Data Quality Check

In [402]:
Raw_Country_data = country_data.count()
processed_country_data = country_data_clean.count()
dim_country_data = country_table.count()

In [407]:
print(f"Raw_Country_data: {Raw_Country_data}")
print(f"processed_country_data:  {processed_country_data}")
print(f"dim_country_data: {dim_country_data}")
print("---------------------------------------------------------------------------------------------")
print(f"had {Raw_Country_data-dim_country_data} null values in region column successfully removed all")
print(f"Data loss after creating fact table {processed_country_data - dim_country_data} ")

Raw_Country_data: 247
processed_country_data:  214
dim_country_data: 214
---------------------------------------------------------------------------------------------
had 33 null values in region column successfully removed all
Data loss after creating fact table 0 


## Series Data Quality check

In [408]:
raw_series_data = series_data.count()
processed_series_data = series_data_clean.count()
dim_series_data = series_table.count()

In [409]:
print(f"raw_series_data: {raw_series_data}")
print(f"processed_series_data: {processed_series_data}")
print(f"dim_series_data: {dim_series_data}")
print("---------------------------------------------------------------------------------------------")
print(f"had {raw_series_data-processed_series_data} null values in  columns successfully removed all")
print(f"Data loss after creating fact table {processed_series_data - dim_series_data} ")

raw_series_data: 3926
processed_series_data: 3316
dim_series_data: 3316
---------------------------------------------------------------------------------------------
had 610 null values in  columns successfully removed all
Data loss after creating fact table 0 


## Data Dictionary

### indicator table
    - CountryName: Country Name
    - CountryCode: Country's Code Ex: India = IND
    - IndicatorCode Unique codes for Indicators Ex: 'SP.POP.TOTL' Tells us the world poluation
    - IndicatorName: Name for every Indicator Code
    - Year: Year
    - Value: Value's for every indicators Ex: Total population, Average amount, GDP % etc.

In [417]:
Indicators = spark.sql("""SELECT DISTINCT(IndicatorCode), IndicatorName
                        FROM indicators_data_clean""")

In [419]:
Indicators.limit(20).toPandas()

Unnamed: 0,IndicatorCode,IndicatorName
0,TM.VAL.MRCH.R5.ZS,Merchandise imports from developing economies ...
1,CM.MKT.LCAP.CD,Market capitalization of listed domestic compa...
2,DT.AXR.PRVT.CD,"Principal rescheduled, private (current US$)"
3,DT.ODA.OATL.CD,Net official aid received (current US$)
4,IC.EXP.COST.CD,Cost to export (US$ per container)
5,SH.STA.DIAB.ZS,Diabetes prevalence (% of population ages 20 t...
6,DT.DOD.DLXF.CD,"External debt stocks, long-term (DOD, current ..."
7,SL.SRV.EMPL.ZS,Employment in services (% of total employment)
8,EN.POP.EL5M.ZS,Population living in areas where elevation is ...
9,EN.ATM.CO2E.PP.GD,CO2 emissions (kg per PPP $ of GDP)


### Country table
    - Country Code : Country's short code
    - Region : Which geographic region this country belongs to
    - Income Group: Which category this country falls in

### Series table
    - Indicator Name : Tells what series code's name is
    - Long Description: Describes the indicator name detail 

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project:
    - I choose to use S3 to store and retreive my data, which performed really well. ETL processes occur in a write-once, read-many fashion and can produce many S3 objects that collectively are stored and referenced as a Parquet file. The Data is stored in Data lake which makes queryable at different patients. 
    - Spark is a general purpose distributed data processing and unified analytics big data engine. It lets us process big data faster by splitting the work up into chunks and assigning those chunks across computational resources. Since my dataset contained 5.6 Million rows, I needed some powerfull tool to process such huge data and query with it, Spark made my task easier because, of its high data querying advantage. Works really well with distributed data such as S3 seamless.  
* Propose how often the data should be updated and why.
    - This data holds the world's development details like every country's GDP, Population, Age Group, Teacher's in secondary education, Co2 emission etc. So I feel this data should be updated on Bi-weekly or montly basis.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     - I would use MongoDB to store this data. Since it supports really well for text based data.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     - There are lots of information which can be used for this purpose using Worlds Development indicator data. I would use scheduler to automatically update for me.
 * The database needed to be accessed by 100+ people.
     - S3 would still hold better, becuase AWS provides massive scalable cloud storage.The data is distributed accross data centers. Amazon claims "you could scale your read performance to 55,000 read requests per second." it also handles errorst and has no impact on downstream users who might be querying the data from S3.