# **ETL process from S3 to PostgreSQL using Google Colab and PySpark**

# Load PySpark

In [532]:
# Install PySpark
import os
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Hit:2 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:5 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:6 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:7 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:8 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:9 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Ign:10 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:11 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:12 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:13 https://developer.download.nvidia.com/compute/machine-le

In [533]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
# !wget https://jdbc.postgresql.org/download/postgresql-42.3.1.jar

--2022-01-24 03:41:45--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.5’


2022-01-24 03:41:45 (6.60 MB/s) - ‘postgresql-42.2.16.jar.5’ saved [1002883/1002883]



In [534]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameBasics").getOrCreate()

In [535]:
# Load SparkFile module
from pyspark import SparkFiles

# **ETL Process for all 12 datasets**

# 1 - country_list_df

## Extract

In [536]:
# Read in Country data from S3 Bucket
url = "https://corti-gw-group-project.s3.amazonaws.com/Unprocessed+Data/UN_Country.csv"
spark.sparkContext.addFile(url)
country_list_df = spark.read.csv(SparkFiles.get("UN_Country.csv"), sep=",", header=True, inferSchema=True)

In [537]:
# Show Country DataFrame and data types
country_list_df.show()
country_list_df.dtypes

+------------+-------------------+
|country_code|       country_name|
+------------+-------------------+
|           4|        Afghanistan|
|           8|            Albania|
|          12|            Algeria|
|          16|     American Samoa|
|          20|            Andorra|
|          24|             Angola|
|         660|           Anguilla|
|          28|Antigua and Barbuda|
|          32|          Argentina|
|          51|            Armenia|
|         533|              Aruba|
|          36|          Australia|
|          40|            Austria|
|          31|         Azerbaijan|
|          44|            Bahamas|
|          48|            Bahrain|
|          50|         Bangladesh|
|          52|           Barbados|
|         112|            Belarus|
|          56|            Belgium|
+------------+-------------------+
only showing top 20 rows



[('country_code', 'int'), ('country_name', 'string')]

In [538]:
# Row and column count pre-cleaning
print(country_list_df.count(), len(country_list_df.columns))

235 2


## Transform

In [539]:
# Drop Nulls
country_list_df = country_list_df.dropna()

In [540]:
# Row and column count post-cleaning
print(country_list_df.count(), len(country_list_df.columns))

235 2


In [541]:
country_list_df.show()
country_list_df.dtypes

+------------+-------------------+
|country_code|       country_name|
+------------+-------------------+
|           4|        Afghanistan|
|           8|            Albania|
|          12|            Algeria|
|          16|     American Samoa|
|          20|            Andorra|
|          24|             Angola|
|         660|           Anguilla|
|          28|Antigua and Barbuda|
|          32|          Argentina|
|          51|            Armenia|
|         533|              Aruba|
|          36|          Australia|
|          40|            Austria|
|          31|         Azerbaijan|
|          44|            Bahamas|
|          48|            Bahrain|
|          50|         Bangladesh|
|          52|           Barbados|
|         112|            Belarus|
|          56|            Belgium|
+------------+-------------------+
only showing top 20 rows



[('country_code', 'int'), ('country_name', 'string')]

## Load

In [635]:
# Store environmental variable
from getpass import getpass
#password = getpass('Enter PW')
password = 'happiness'

In [636]:
# Configure settings for RDS
mode = "overwrite"
jdbc_url="jdbc:postgresql://group-project-v2.ctg85syqs9uc.us-east-1.rds.amazonaws.com:5432/AWS-group-project-v2"
config = {"user":"postgres", 
          "password": password, 
          "driver":"org.postgresql.Driver"}

In [637]:
# Write county_list_df to table in RDS
country_list_df.write.jdbc(url=jdbc_url, table='country', mode=mode, properties=config)

Py4JJavaError: ignored



---



# 2 - demoracy_index_df

## Extract

In [545]:
# Read in Democracy Index data from S3 Bucket
url = "https://corti-gw-group-project.s3.amazonaws.com/Unprocessed+Data/Democracy_Index_2019.csv"
spark.sparkContext.addFile(url)
demoracy_index_df = spark.read.csv(SparkFiles.get("Democracy_Index_2019.csv"), sep=",", header=True, inferSchema=True)

In [546]:
demoracy_index_df.show()
demoracy_index_df.dtypes

+------------+--------------------+----+---------------------+
|country_code|        country_name|  yr|Democracy index (EIU)|
+------------+--------------------+----+---------------------+
|           4|         Afghanistan|2019|                 28.5|
|           8|             Albania|2019|                 58.9|
|          12|             Algeria|2019|                 40.1|
|          24|              Angola|2019|                 37.2|
|          32|           Argentina|2019|                 70.2|
|          51|             Armenia|2019|                 55.4|
|          36|           Australia|2019|                 90.9|
|          40|             Austria|2019|                 82.9|
|          31|          Azerbaijan|2019|                 27.5|
|          48|             Bahrain|2019|                 25.5|
|          50|          Bangladesh|2019|                 58.8|
|         112|             Belarus|2019|                 24.8|
|          56|             Belgium|2019|               

[('country_code', 'int'),
 ('country_name', 'string'),
 ('yr', 'int'),
 ('Democracy index (EIU)', 'double')]

In [547]:
# Row and column count pre-cleaning
print(demoracy_index_df.count(), len(demoracy_index_df.columns))

1864 4


## Transform

In [548]:
# Only include relevant columns in dataframe
demoracy_index_df = demoracy_index_df.select(["country_code", "Democracy index (EIU)"])

In [549]:
# Rename Columns
demoracy_index_df = demoracy_index_df.withColumnRenamed("Region/Country/Area", "country_code")
demoracy_index_df = demoracy_index_df.withColumnRenamed("Democracy index (EIU)", "democracy_index")

In [550]:
# Drop Nulls
demoracy_index_df = demoracy_index_df.dropna()

In [551]:
# Row and column count post-cleaning
print(demoracy_index_df.count(), len(demoracy_index_df.columns))

165 2


In [552]:
demoracy_index_df.show()
demoracy_index_df.dtypes

+------------+---------------+
|country_code|democracy_index|
+------------+---------------+
|           4|           28.5|
|           8|           58.9|
|          12|           40.1|
|          24|           37.2|
|          32|           70.2|
|          51|           55.4|
|          36|           90.9|
|          40|           82.9|
|          31|           27.5|
|          48|           25.5|
|          50|           58.8|
|         112|           24.8|
|          56|           76.4|
|         204|           50.9|
|          64|           53.0|
|          68|           48.4|
|          70|           48.6|
|          72|           78.1|
|          76|           68.6|
|         100|           70.3|
+------------+---------------+
only showing top 20 rows



[('country_code', 'int'), ('democracy_index', 'double')]

## Load



---



# 3 - happiness_score_df

## Extract

In [553]:
# Read in Happiness Score data from S3 Bucket
url = 'https://corti-gw-group-project.s3.amazonaws.com/Unprocessed+Data/Happiness_Scores_2019.csv'
spark.sparkContext.addFile(url)
happiness_score_df = spark.read.csv(SparkFiles.get("Happiness_Scores_2019.csv"), sep=",", header=True, inferSchema=True)

In [554]:
happiness_score_df.show()
happiness_score_df.dtypes

+------------+--------------------+----+-----------+
|country_code|        country_name|  yr|Life Ladder|
+------------+--------------------+----+-----------+
|           4|         Afghanistan|2019|      2.375|
|           8|             Albania|2019|      4.995|
|          12|             Algeria|2019|      4.745|
|          32|           Argentina|2019|      6.086|
|          51|             Armenia|2019|      5.488|
|          36|           Australia|2019|      7.234|
|          40|             Austria|2019|      7.195|
|          31|          Azerbaijan|2019|      5.173|
|          48|             Bahrain|2019|      7.098|
|          50|          Bangladesh|2019|      5.114|
|         112|             Belarus|2019|      5.821|
|          56|             Belgium|2019|      6.772|
|         204|               Benin|2019|      4.976|
|          68|Bolivia (Plurin. ...|2019|      5.674|
|          70|Bosnia and Herzeg...|2019|      6.016|
|          72|            Botswana|2019|      

[('country_code', 'int'),
 ('country_name', 'string'),
 ('yr', 'int'),
 ('Life Ladder', 'double')]

In [555]:
# Row and column count pre-cleaning
print(happiness_score_df.count(), len(happiness_score_df.columns))

140 4


## Transform

In [556]:
# Only include relevant columns in dataframe
happiness_score_df = happiness_score_df.select(["country_code", "Life Ladder"])

In [557]:
# Rename Columns
happiness_score_df = happiness_score_df.withColumnRenamed("Life Ladder", "life_ladder")

In [558]:
# Drop Nulls
happiness_score_df = happiness_score_df.dropna()

In [559]:
# Row and column count post-cleaning
print(happiness_score_df.count(), len(happiness_score_df.columns))

140 2


In [560]:
happiness_score_df.show()
happiness_score_df.dtypes

+------------+-----------+
|country_code|life_ladder|
+------------+-----------+
|           4|      2.375|
|           8|      4.995|
|          12|      4.745|
|          32|      6.086|
|          51|      5.488|
|          36|      7.234|
|          40|      7.195|
|          31|      5.173|
|          48|      7.098|
|          50|      5.114|
|         112|      5.821|
|          56|      6.772|
|         204|      4.976|
|          68|      5.674|
|          70|      6.016|
|          72|      3.471|
|          76|      6.451|
|         100|      5.108|
|         854|      4.741|
|         116|      4.998|
+------------+-----------+
only showing top 20 rows



[('country_code', 'int'), ('life_ladder', 'double')]

## Load



---



# 4 - consumer_price_df

## Extract

In [561]:
# Read in Consumer Price Index data from S3 Bucket
url = "https://corti-gw-group-project.s3.amazonaws.com/Unprocessed+Data/UN_Consumer_Price+Index_2019.csv"
spark.sparkContext.addFile(url)
consumer_price_df = spark.read.csv(SparkFiles.get("UN_Consumer_Price+Index_2019.csv"), sep=",", header=True, inferSchema=True)

In [562]:
consumer_price_df.show()
consumer_price_df.dtypes

+-------------------+-------------------+----+--------------------+-----+
|Region/Country/Area|                _c1|Year|              Series|Value|
+-------------------+-------------------+----+--------------------+-----+
|                  4|        Afghanistan|2019|Consumer price in...|149.9|
|                  8|            Albania|2019|Consumer price in...|119.1|
|                 12|            Algeria|2019|Consumer price in...|151.4|
|                 20|            Andorra|2019|Consumer price in...|105.8|
|                 24|             Angola|2019|Consumer price in...|378.9|
|                660|           Anguilla|2019|Consumer price in...|107.1|
|                 28|Antigua and Barbuda|2019|Consumer price in...|115.4|
|                 32|          Argentina|2019|Consumer price in...|232.8|
|                 51|            Armenia|2019|Consumer price in...|129.2|
|                533|              Aruba|2019|Consumer price in...|109.2|
|                 36|          Austral

[('Region/Country/Area', 'int'),
 ('_c1', 'string'),
 ('Year', 'int'),
 ('Series', 'string'),
 ('Value', 'double')]

In [563]:
# Row and column count pre-cleaning
print(consumer_price_df.count(), len(consumer_price_df.columns))

187 5


## Transform

In [564]:
# Only include relevant columns in dataframe
consumer_price_df = consumer_price_df.select(["Region/Country/Area", "Value"])

In [565]:
# Rename Columns
consumer_price_df = consumer_price_df.withColumnRenamed("Region/Country/Area", "country_code")
consumer_price_df = consumer_price_df.withColumnRenamed("Value", "consumer_price_index")

In [566]:
# Drop Nulls
consumer_price_df = consumer_price_df.dropna()

In [567]:
# Row and column count post-cleaning
print(consumer_price_df.count(), len(consumer_price_df.columns))

187 2


In [568]:
consumer_price_df.show()
consumer_price_df.dtypes

+------------+--------------------+
|country_code|consumer_price_index|
+------------+--------------------+
|           4|               149.9|
|           8|               119.1|
|          12|               151.4|
|          20|               105.8|
|          24|               378.9|
|         660|               107.1|
|          28|               115.4|
|          32|               232.8|
|          51|               129.2|
|         533|               109.2|
|          36|               119.8|
|          40|               118.1|
|          31|               156.9|
|          44|               116.2|
|          48|               118.8|
|          50|               179.7|
|          52|               191.7|
|         112|               508.1|
|          56|               117.1|
|          84|               106.2|
+------------+--------------------+
only showing top 20 rows



[('country_code', 'int'), ('consumer_price_index', 'double')]

## Load



---



# 5 - gdp_per_capita_df

## Extract

In [569]:
# Read in GDP Per Capita data from S3 Bucket
url = "https://corti-gw-group-project.s3.amazonaws.com/Unprocessed+Data/UN_GDP_Per_Capita_2019.csv"
spark.sparkContext.addFile(url)
gdp_per_capita_df = spark.read.csv(SparkFiles.get("UN_GDP_Per_Capita_2019.csv"), sep=",", header=True, inferSchema=True)

In [570]:
gdp_per_capita_df.show()
gdp_per_capita_df.dtypes

+-------------------+-------------------+----+--------------------+----------+
|Region/Country/Area|                _c1|Year|              Series|     Value|
+-------------------+-------------------+----+--------------------+----------+
|                  4|        Afghanistan|2019|GDP per capita (U...|   $470.00|
|                  8|            Albania|2019|GDP per capita (U...| $5,303.00|
|                 12|            Algeria|2019|GDP per capita (U...| $3,976.00|
|                 20|            Andorra|2019|GDP per capita (U...|$40,887.00|
|                 24|             Angola|2019|GDP per capita (U...| $2,671.00|
|                660|           Anguilla|2019|GDP per capita (U...|$25,529.00|
|                 28|Antigua and Barbuda|2019|GDP per capita (U...|$17,113.00|
|                 32|          Argentina|2019|GDP per capita (U...|$10,041.00|
|                 51|            Armenia|2019|GDP per capita (U...| $4,623.00|
|                533|              Aruba|2019|GDP pe

[('Region/Country/Area', 'int'),
 ('_c1', 'string'),
 ('Year', 'int'),
 ('Series', 'string'),
 ('Value', 'string')]

In [571]:
# Row and column count pre-cleaning
print(gdp_per_capita_df.count(), len(gdp_per_capita_df.columns))

210 5


## Transform

In [572]:
# Only include relevant columns in dataframe
gdp_per_capita_df = gdp_per_capita_df.select(["Region/Country/Area", "Value"])

In [573]:
# Rename Columns
gdp_per_capita_df = gdp_per_capita_df.withColumnRenamed("Region/Country/Area", "country_code")

In [574]:
# Drop Nulls
gdp_per_capita_df = gdp_per_capita_df.dropna()

In [575]:
gdp_per_capita_df.show()
gdp_per_capita_df.dtypes

+------------+----------+
|country_code|     Value|
+------------+----------+
|           4|   $470.00|
|           8| $5,303.00|
|          12| $3,976.00|
|          20|$40,887.00|
|          24| $2,671.00|
|         660|$25,529.00|
|          28|$17,113.00|
|          32|$10,041.00|
|          51| $4,623.00|
|         533|$30,975.00|
|          36|$54,763.00|
|          40|$49,701.00|
|          31| $4,782.00|
|          44|$34,864.00|
|          48|$23,504.00|
|          50| $1,846.00|
|          52|$18,149.00|
|         112| $6,674.00|
|          56|$46,198.00|
|          84| $4,815.00|
+------------+----------+
only showing top 20 rows



[('country_code', 'int'), ('Value', 'string')]

In [576]:
# Convert dollars (string) to double (get rid of "$")
import pyspark.sql.functions as f
gdp_per_capita_df = gdp_per_capita_df.withColumn('per_capita_gdp_dollars', f.regexp_replace('Value', '[$,]', '').cast('double'))
gdp_per_capita_df.show()

+------------+----------+----------------------+
|country_code|     Value|per_capita_gdp_dollars|
+------------+----------+----------------------+
|           4|   $470.00|                 470.0|
|           8| $5,303.00|                5303.0|
|          12| $3,976.00|                3976.0|
|          20|$40,887.00|               40887.0|
|          24| $2,671.00|                2671.0|
|         660|$25,529.00|               25529.0|
|          28|$17,113.00|               17113.0|
|          32|$10,041.00|               10041.0|
|          51| $4,623.00|                4623.0|
|         533|$30,975.00|               30975.0|
|          36|$54,763.00|               54763.0|
|          40|$49,701.00|               49701.0|
|          31| $4,782.00|                4782.0|
|          44|$34,864.00|               34864.0|
|          48|$23,504.00|               23504.0|
|          50| $1,846.00|                1846.0|
|          52|$18,149.00|               18149.0|
|         112| $6,67

In [577]:
# Only include relevant columns in dataframe
gdp_per_capita_df = gdp_per_capita_df.select(["country_code", "per_capita_gdp_dollars"])

In [578]:
gdp_per_capita_df.show()
gdp_per_capita_df.dtypes

+------------+----------------------+
|country_code|per_capita_gdp_dollars|
+------------+----------------------+
|           4|                 470.0|
|           8|                5303.0|
|          12|                3976.0|
|          20|               40887.0|
|          24|                2671.0|
|         660|               25529.0|
|          28|               17113.0|
|          32|               10041.0|
|          51|                4623.0|
|         533|               30975.0|
|          36|               54763.0|
|          40|               49701.0|
|          31|                4782.0|
|          44|               34864.0|
|          48|               23504.0|
|          50|                1846.0|
|          52|               18149.0|
|         112|                6674.0|
|          56|               46198.0|
|          84|                4815.0|
+------------+----------------------+
only showing top 20 rows



[('country_code', 'int'), ('per_capita_gdp_dollars', 'double')]

## Load



---



# 6 - gender_ratio_df

## Extract

In [579]:
# Read in Gender Ratio data from S3 Bucket
url = "https://corti-gw-group-project.s3.amazonaws.com/Unprocessed+Data/UN_Gender_Ratio_2019.csv"
spark.sparkContext.addFile(url)
gender_ratio_df = spark.read.csv(SparkFiles.get("UN_Gender_Ratio_2019.csv"), sep=",", header=True, inferSchema=True)

In [580]:
gender_ratio_df.show()
gender_ratio_df.dtypes

+-------------------+-------------------+----+--------------------+-----+
|Region/Country/Area|                _c1|Year|              Series|Value|
+-------------------+-------------------+----+--------------------+-----+
|                  4|        Afghanistan|2019|Sex ratio (males ...|105.5|
|                  8|            Albania|2019|Sex ratio (males ...|103.7|
|                 12|            Algeria|2019|Sex ratio (males ...|102.1|
|                 24|             Angola|2019|Sex ratio (males ...| 97.9|
|                 28|Antigua and Barbuda|2019|Sex ratio (males ...| 93.2|
|                 32|          Argentina|2019|Sex ratio (males ...| 95.2|
|                 51|            Armenia|2019|Sex ratio (males ...| 88.8|
|                533|              Aruba|2019|Sex ratio (males ...| 90.3|
|                 36|          Australia|2019|Sex ratio (males ...| 99.2|
|                 40|            Austria|2019|Sex ratio (males ...| 97.0|
|                 31|         Azerbaij

[('Region/Country/Area', 'int'),
 ('_c1', 'string'),
 ('Year', 'int'),
 ('Series', 'string'),
 ('Value', 'double')]

In [581]:
# Row and column count pre-cleaning
print(gender_ratio_df.count(), len(gender_ratio_df.columns))

196 5


## Transform

In [582]:
# Only include relevant columns in dataframe
gender_ratio_df = gender_ratio_df.select(["Region/Country/Area", "Value"])

In [583]:
# Rename Columns
gender_ratio_df = gender_ratio_df.withColumnRenamed("Region/Country/Area", "country_code")
gender_ratio_df = gender_ratio_df.withColumnRenamed("Value", "gender_ratio_males_per100_female")

In [584]:
# Drop Nulls
gender_ratio_df = gender_ratio_df.dropna()

In [585]:
# Row and column count post-cleaning
print(gender_ratio_df.count(), len(gender_ratio_df.columns))

196 2


In [586]:
gender_ratio_df.show()
gender_ratio_df.dtypes

+------------+--------------------------------+
|country_code|gender_ratio_males_per100_female|
+------------+--------------------------------+
|           4|                           105.5|
|           8|                           103.7|
|          12|                           102.1|
|          24|                            97.9|
|          28|                            93.2|
|          32|                            95.2|
|          51|                            88.8|
|         533|                            90.3|
|          36|                            99.2|
|          40|                            97.0|
|          31|                            99.7|
|          44|                            94.5|
|          48|                           179.9|
|          50|                           102.4|
|          52|                            93.7|
|         112|                            87.1|
|          56|                            98.0|
|          84|                          

[('country_code', 'int'), ('gender_ratio_males_per100_female', 'double')]

## Load



---



# 7 - infant_mortality_df

## Extract

In [587]:
# Read in Infant Mortality data from S3 Bucket
url = "https://corti-gw-group-project.s3.amazonaws.com/Unprocessed+Data/UN_Infant_Mortality_2020.csv"
spark.sparkContext.addFile(url)
infant_mortality_df = spark.read.csv(SparkFiles.get("UN_Infant_Mortality_2020.csv"), sep=",", header=True, inferSchema=True)

In [588]:
infant_mortality_df.show()
infant_mortality_df.dtypes

+-------------------+-------------------+----+--------------------+-----+
|Region/Country/Area|                _c1|Year|              Series|Value|
+-------------------+-------------------+----+--------------------+-----+
|                  4|        Afghanistan|2020|Infant mortality ...| 51.7|
|                  8|            Albania|2020|Infant mortality ...|  8.0|
|                 12|            Algeria|2020|Infant mortality ...| 21.2|
|                 24|             Angola|2020|Infant mortality ...| 61.5|
|                 28|Antigua and Barbuda|2020|Infant mortality ...|  5.2|
|                 32|          Argentina|2020|Infant mortality ...| 10.2|
|                 51|            Armenia|2020|Infant mortality ...| 10.8|
|                533|              Aruba|2020|Infant mortality ...| 13.6|
|                 36|          Australia|2020|Infant mortality ...|  3.1|
|                 40|            Austria|2020|Infant mortality ...|  3.2|
|                 31|         Azerbaij

[('Region/Country/Area', 'int'),
 ('_c1', 'string'),
 ('Year', 'int'),
 ('Series', 'string'),
 ('Value', 'double')]

In [589]:
# Row and column count pre-cleaning
print(infant_mortality_df.count(), len(infant_mortality_df.columns))

201 5


## Transform

In [590]:
# Only include relevant columns in dataframe
infant_mortality_df = infant_mortality_df.select(["Region/Country/Area", "Value"])

In [591]:
# Rename Columns
infant_mortality_df = infant_mortality_df.withColumnRenamed("Region/Country/Area", "country_code")
infant_mortality_df = infant_mortality_df.withColumnRenamed("Value", "infant_mortality_per1000_births")

In [592]:
# Drop Nulls
infant_mortality_df = infant_mortality_df.dropna()

In [593]:
# Row and column count post-cleaning
print(infant_mortality_df.count(), len(infant_mortality_df.columns))

201 2


In [594]:
infant_mortality_df.show()
infant_mortality_df.dtypes

+------------+-------------------------------+
|country_code|infant_mortality_per1000_births|
+------------+-------------------------------+
|           4|                           51.7|
|           8|                            8.0|
|          12|                           21.2|
|          24|                           61.5|
|          28|                            5.2|
|          32|                           10.2|
|          51|                           10.8|
|         533|                           13.6|
|          36|                            3.1|
|          40|                            3.2|
|          31|                           20.8|
|          44|                            5.9|
|          48|                            6.0|
|          50|                           26.8|
|          52|                           10.0|
|         112|                            3.0|
|          56|                            2.8|
|          84|                           12.8|
|         204

[('country_code', 'int'), ('infant_mortality_per1000_births', 'double')]

## Load



---



# 8 - life_expectancy_df

## Extract

In [595]:
# Read in Life Expectancy data from S3 Bucket
url = "https://corti-gw-group-project.s3.amazonaws.com/Unprocessed+Data/UN_Life_Expectancy_2020.csv"
spark.sparkContext.addFile(url)
life_expectancy_df = spark.read.csv(SparkFiles.get("UN_Life_Expectancy_2020.csv"), sep=",", header=True, inferSchema=True)

In [596]:
life_expectancy_df.show()
life_expectancy_df.dtypes

+-------------------+-------------------+----+--------------------+-----+
|Region/Country/Area|                _c1|Year|              Series|Value|
+-------------------+-------------------+----+--------------------+-----+
|                  4|        Afghanistan|2020|Life expectancy a...| 64.3|
|                  8|            Albania|2020|Life expectancy a...| 78.4|
|                 12|            Algeria|2020|Life expectancy a...| 76.6|
|                 24|             Angola|2020|Life expectancy a...| 60.5|
|                 28|Antigua and Barbuda|2020|Life expectancy a...| 76.8|
|                 32|          Argentina|2020|Life expectancy a...| 76.4|
|                 51|            Armenia|2020|Life expectancy a...| 74.9|
|                533|              Aruba|2020|Life expectancy a...| 76.1|
|                 36|          Australia|2020|Life expectancy a...| 83.2|
|                 40|            Austria|2020|Life expectancy a...| 81.4|
|                 31|         Azerbaij

[('Region/Country/Area', 'int'),
 ('_c1', 'string'),
 ('Year', 'int'),
 ('Series', 'string'),
 ('Value', 'double')]

In [597]:
# Row and column count pre-cleaning
print(life_expectancy_df.count(), len(life_expectancy_df.columns))

201 5


## Transform

In [598]:
# Only include relevant columns in dataframe
life_expectancy_df = life_expectancy_df.select(["Region/Country/Area", "Value"])

In [599]:
# Rename Columns
life_expectancy_df = life_expectancy_df.withColumnRenamed("Region/Country/Area", "country_code")
life_expectancy_df = life_expectancy_df.withColumnRenamed("Value", "life_expectancy")

In [600]:
# Drop Nulls
life_expectancy_df = life_expectancy_df.dropna()

In [601]:
# Row and column count post-cleaning
print(life_expectancy_df.count(), len(life_expectancy_df.columns))

201 2


In [602]:
life_expectancy_df.show()
life_expectancy_df.dtypes

+------------+---------------+
|country_code|life_expectancy|
+------------+---------------+
|           4|           64.3|
|           8|           78.4|
|          12|           76.6|
|          24|           60.5|
|          28|           76.8|
|          32|           76.4|
|          51|           74.9|
|         533|           76.1|
|          36|           83.2|
|          40|           81.4|
|          31|           72.8|
|          44|           73.7|
|          48|           77.1|
|          50|           72.2|
|          52|           79.0|
|         112|           74.5|
|          56|           81.4|
|          84|           74.4|
|         204|           61.3|
|          64|           71.3|
+------------+---------------+
only showing top 20 rows



[('country_code', 'int'), ('life_expectancy', 'double')]

## Load



---



# 9 - pop_density_df

## Extract

In [603]:
# Read in Population Density data from S3 Bucket
url = "https://corti-gw-group-project.s3.amazonaws.com/Unprocessed+Data/UN_Population_Density_2019.csv"
spark.sparkContext.addFile(url)
pop_density_df = spark.read.csv(SparkFiles.get("UN_Population_Density_2019.csv"), sep=",", header=True, inferSchema=True)

In [604]:
pop_density_df.show()
pop_density_df.dtypes

+-------------------+-------------------+----+------------------+------+
|Region/Country/Area|                _c1|Year|            Series| Value|
+-------------------+-------------------+----+------------------+------+
|                  4|        Afghanistan|2019|Population density|  58.3|
|                  8|            Albania|2019|Population density| 105.1|
|                 12|            Algeria|2019|Population density|  18.1|
|                 16|     American Samoa|2019|Population density| 276.6|
|                 20|            Andorra|2019|Population density| 164.1|
|                 24|             Angola|2019|Population density|  25.5|
|                660|           Anguilla|2019|Population density| 165.2|
|                 28|Antigua and Barbuda|2019|Population density| 220.7|
|                 32|          Argentina|2019|Population density|  16.4|
|                 51|            Armenia|2019|Population density| 103.9|
|                533|              Aruba|2019|Popul

[('Region/Country/Area', 'int'),
 ('_c1', 'string'),
 ('Year', 'int'),
 ('Series', 'string'),
 ('Value', 'double')]

In [605]:
# Row and column count pre-cleaning
print(pop_density_df.count(), len(pop_density_df.columns))

227 5


## Transform

In [606]:
# Only include relevant columns in dataframe
pop_density_df = pop_density_df.select(["Region/Country/Area", "Value"])

In [607]:
# Rename Columns
pop_density_df = pop_density_df.withColumnRenamed("Region/Country/Area", "country_code")
pop_density_df = pop_density_df.withColumnRenamed("Value", "population_density")

In [608]:
# Drop Nulls
pop_density_df = pop_density_df.dropna()

In [609]:
# Row and column count post-cleaning
print(pop_density_df.count(), len(pop_density_df.columns))

227 2


In [610]:
pop_density_df.show()
pop_density_df.dtypes

+------------+------------------+
|country_code|population_density|
+------------+------------------+
|           4|              58.3|
|           8|             105.1|
|          12|              18.1|
|          16|             276.6|
|          20|             164.1|
|          24|              25.5|
|         660|             165.2|
|          28|             220.7|
|          32|              16.4|
|          51|             103.9|
|         533|             590.6|
|          36|               3.3|
|          40|             108.7|
|          31|             121.6|
|          44|              38.9|
|          48|            2159.4|
|          50|            1252.6|
|          52|             667.5|
|         112|              46.6|
|          56|             381.1|
+------------+------------------+
only showing top 20 rows



[('country_code', 'int'), ('population_density', 'double')]

## Load



---



# 10 - women_seats_df

## Extract

In [611]:
# Read in Women Seats in Government data from S3 Bucket
url = "https://corti-gw-group-project.s3.amazonaws.com/Unprocessed+Data/UN_Seats_Held_by_Women_2019.csv"
spark.sparkContext.addFile(url)
women_seats_df = spark.read.csv(SparkFiles.get("UN_Seats_Held_by_Women_2019.csv"), sep=",", header=True, inferSchema=True)

In [612]:
women_seats_df.show()
women_seats_df.dtypes

+-------------------+--------------------+----+--------------------+------------------+---------------------------+-----+
|Region/Country/Area|                 _c1|Year|              Series|Last Election Date|Last Election Date footnote|Value|
+-------------------+--------------------+----+--------------------+------------------+---------------------------+-----+
|                  8|             Albania|2019|Seats held by wom...|           2017-06|                       null| 29.3|
|                 12|             Algeria|2019|Seats held by wom...|           2017-05|                       null| 25.8|
|                 20|             Andorra|2019|Seats held by wom...|           2019-04|                       null| 32.1|
|                 24|              Angola|2019|Seats held by wom...|           2017-08|                       null| 30.0|
|                 28| Antigua and Barbuda|2019|Seats held by wom...|           2018-03|                       null| 11.1|
|                 32|   

[('Region/Country/Area', 'int'),
 ('_c1', 'string'),
 ('Year', 'int'),
 ('Series', 'string'),
 ('Last Election Date', 'string'),
 ('Last Election Date footnote', 'string'),
 ('Value', 'double')]

In [613]:
# Row and column count pre-cleaning
print(women_seats_df.count(), len(women_seats_df.columns))

192 7


## Transform

In [614]:
# Only include relevant columns in dataframe
women_seats_df = women_seats_df.select(["Region/Country/Area", "Value"])

In [615]:
# Rename Columns
women_seats_df = women_seats_df.withColumnRenamed("Region/Country/Area", "country_code")
women_seats_df = women_seats_df.withColumnRenamed("Value", "seats_held_by_women_pct")

In [616]:
# Drop Nulls
women_seats_df = women_seats_df.dropna()

In [617]:
# Row and column count post-cleaning
print(women_seats_df.count(), len(women_seats_df.columns))

192 2


In [618]:
women_seats_df.show()
women_seats_df.dtypes

+------------+-----------------------+
|country_code|seats_held_by_women_pct|
+------------+-----------------------+
|           8|                   29.3|
|          12|                   25.8|
|          20|                   32.1|
|          24|                   30.0|
|          28|                   11.1|
|          32|                   38.8|
|          51|                   24.2|
|          36|                   30.0|
|          40|                   37.2|
|          31|                   16.8|
|          44|                   12.8|
|          48|                   15.0|
|          50|                   20.7|
|          52|                   20.0|
|         112|                   34.5|
|          56|                   38.0|
|          84|                    9.4|
|         204|                    7.2|
|          64|                   14.9|
|          68|                   53.1|
+------------+-----------------------+
only showing top 20 rows



[('country_code', 'int'), ('seats_held_by_women_pct', 'double')]

## Load



---



# 11 - unemployment_rate_df



## Extract

In [619]:
# Read in Unemployment Rate data from S3 Bucket
url = "https://corti-gw-group-project.s3.amazonaws.com/Unprocessed+Data/UN_Unemployment_Rate_2020.csv"
spark.sparkContext.addFile(url)
unemployment_rate_df = spark.read.csv(SparkFiles.get("UN_Unemployment_Rate_2020.csv"), sep=",", header=True, inferSchema=True)

In [620]:
unemployment_rate_df.show()
unemployment_rate_df.dtypes

+-------------------+--------------------+----+--------------------+-----+
|Region/Country/Area|                 _c1|Year|              Series|Value|
+-------------------+--------------------+----+--------------------+-----+
|                  4|         Afghanistan|2020|Unemployment rate...| 11.2|
|                  8|             Albania|2020|Unemployment rate...| 12.8|
|                 12|             Algeria|2020|Unemployment rate...| 11.5|
|                 24|              Angola|2020|Unemployment rate...|  6.8|
|                 32|           Argentina|2020|Unemployment rate...| 10.4|
|                 51|             Armenia|2020|Unemployment rate...| 16.6|
|                 36|           Australia|2020|Unemployment rate...|  5.3|
|                 40|             Austria|2020|Unemployment rate...|  4.8|
|                 31|          Azerbaijan|2020|Unemployment rate...|  6.0|
|                 44|             Bahamas|2020|Unemployment rate...| 11.3|
|                 48|    

[('Region/Country/Area', 'int'),
 ('_c1', 'string'),
 ('Year', 'int'),
 ('Series', 'string'),
 ('Value', 'double')]

In [621]:
# Row and column count pre-cleaning
print(unemployment_rate_df.count(), len(unemployment_rate_df.columns))

188 5


## Transform

In [622]:
# Only include relevant columns in dataframe
unemployment_rate_df = unemployment_rate_df.select(["Region/Country/Area", "Value"])

In [623]:
# Rename Columns
unemployment_rate_df = unemployment_rate_df.withColumnRenamed("Region/Country/Area", "country_code")
unemployment_rate_df = unemployment_rate_df.withColumnRenamed("Value", "unemployment_rate")

In [624]:
# Drop Nulls
unemployment_rate_df = unemployment_rate_df.dropna()

In [625]:
# Row and column count post-cleaning
print(unemployment_rate_df.count(), len(unemployment_rate_df.columns))

188 2


In [626]:
unemployment_rate_df.show()
unemployment_rate_df.dtypes

+------------+-----------------+
|country_code|unemployment_rate|
+------------+-----------------+
|           4|             11.2|
|           8|             12.8|
|          12|             11.5|
|          24|              6.8|
|          32|             10.4|
|          51|             16.6|
|          36|              5.3|
|          40|              4.8|
|          31|              6.0|
|          44|             11.3|
|          48|              0.8|
|          50|              4.2|
|          52|             10.9|
|         112|              4.6|
|          56|              5.7|
|          84|              6.4|
|         204|              2.0|
|          64|              2.4|
|          68|              3.5|
|          70|             18.4|
+------------+-----------------+
only showing top 20 rows



[('country_code', 'int'), ('unemployment_rate', 'double')]

## Load



---



# 12 - water_services_df

## Extract

In [627]:
# Read in Water Services data from S3 Bucket
url = "https://corti-gw-group-project.s3.amazonaws.com/Unprocessed+Data/UN_Water_Services_2020.csv"
spark.sparkContext.addFile(url)
water_services_df = spark.read.csv(SparkFiles.get("UN_Water_Services_2020.csv"), sep=",", header=True, inferSchema=True)

In [628]:
water_services_df.show()
water_services_df.dtypes

+-------------------+--------------------+----+--------------------+-----+
|Region/Country/Area|                 _c1|Year|              Series|Value|
+-------------------+--------------------+----+--------------------+-----+
|                  4|         Afghanistan|2020|Safely managed dr...| 27.6|
|                  8|             Albania|2020|Safely managed dr...| 70.7|
|                 12|             Algeria|2020|Safely managed dr...| 72.4|
|                 16|      American Samoa|2020|Safely managed dr...| 98.4|
|                 20|             Andorra|2020|Safely managed dr...| 90.6|
|                 51|             Armenia|2020|Safely managed dr...| 86.9|
|                 40|             Austria|2020|Safely managed dr...| 98.9|
|                 31|          Azerbaijan|2020|Safely managed dr...| 88.3|
|                 48|             Bahrain|2020|Safely managed dr...| 99.0|
|                 50|          Bangladesh|2020|Safely managed dr...| 58.5|
|                112|    

[('Region/Country/Area', 'int'),
 ('_c1', 'string'),
 ('Year', 'int'),
 ('Series', 'string'),
 ('Value', 'double')]

In [629]:
# Row and column count pre-cleaning
print(water_services_df.count(), len(water_services_df.columns))

138 5


## Transform

In [630]:
# Only include relevant columns in dataframe
water_services_df = water_services_df.select(["Region/Country/Area", "Value"])

In [631]:
# Rename Columns
water_services_df = water_services_df.withColumnRenamed("Region/Country/Area", "country_code")
water_services_df = water_services_df.withColumnRenamed("Value", "safe_drinking_water_access_pct")

In [632]:
# Drop Nulls
water_services_df = water_services_df.dropna()

In [633]:
# Row and column count post-cleaning
print(water_services_df.count(), len(water_services_df.columns))

138 2


In [634]:
water_services_df.show()
water_services_df.dtypes

+------------+------------------------------+
|country_code|safe_drinking_water_access_pct|
+------------+------------------------------+
|           4|                          27.6|
|           8|                          70.7|
|          12|                          72.4|
|          16|                          98.4|
|          20|                          90.6|
|          51|                          86.9|
|          40|                          98.9|
|          31|                          88.3|
|          48|                          99.0|
|          50|                          58.5|
|         112|                          94.6|
|          56|                          99.9|
|          64|                          36.6|
|          70|                          88.9|
|          76|                          85.8|
|         100|                          97.6|
|         116|                          27.8|
|         124|                          99.0|
|         140|                    

[('country_code', 'int'), ('safe_drinking_water_access_pct', 'double')]

## Load