# 1. Introduction

They key job for Data engineering lies under the word 'enegineering', which basically means design and build things (Black,2023). Thus, data engineers accomplish the same task, by designing and building pipelines, e.g ETL, to transform and transport data from diverse range of sources to either Data Scientists or other end users. In the past few decades, there has been substantial increase in data volume, thus, requirements to split job in teams to achieve the task. Consequently, several technologies was established to achieve this purpose, including Apache Hadoop, AWS, Apache Hive and currently most-used Apache Spark (Wilson,2021). Thus, the approach for this assignment was established to ensure that the same code can be accessed by a team of users, despite the assignment was an individual one. 

World Bank Group (2021) has emphasised again the importance of anti-corruption in a country, as corruption has a detrimental impact on the poor and most vulnerable citizens in the country, in the dimensions of health, finance, education and justice. Similarlt, United Nations (2004) also emphaised that corruption distort markets, allow organised crime, democracy and rule of law in the country, which consequently will have an impact on citizen's life, especially the poor citizens. Thus, the research will be focused on finding the relationship between corruption and some singificant development indicators.

In order to achieve this objective, I have obtained one dataset, through web scrapping, from transparency International, which offers the top 100 countries' corruption rank and scores. I have also retreieved 6 other datasets from the World Bank, which gained insights on how corruption score or rank is related to development indicators, such as rule of law or government effectiveness. Consequently, data analysis, such as regression can be used to predict corruption rank or score by using these indicators. 

# 2. Structure

## 2.1 Data Storage
To make the data storage more manageable and efficient, parquet format is used after each dataset is retrieved from its source. Parquet is an open and free source file format that supports complex data types, which enables data engineers to store diverse range of data in the parquet format, such as image, documents or structured table (Databrick, 2023). Moreover, parquet formatis also good for storing big data, to overcome the challenge of server capcity, when large volumes of data is retrieved from many sources. Consequently, it reduces the high storage costs associated with large volume of data. Thus, the free parquet format should be adopted as widely as possible for both organisations and individuals to manage large volume of data efficiently, while making the storage more affordable.

As data will continue to grow annually, a structured schema is designed for the relational database to enhance management efficiency. The structured schema contains 6 different world development indicators, a corruption-score table and a country-related information table for referencing purpose. The design for the structure allows us to extract and analyse the information more efficiently. For example, if we want to find the underlying reasons of the movement in corruption rank or score for a country, the 'indicator' table can quickly identifies the change in each of the 6 development indicators across years, then, general relationship can be deduced. In addition, if we would like to assess the general time-effect(co-variates) on corruption score, the 'corruption' table will offer general insights on year-based general trends for top 100 countries, which can be incorporated into analysis of corruption and indicators. 

For the ease of database management, a DB digram is drawn to visualise the structured schema for later research purpose with more data.

## 2.2 Source Version Control

Although this is an individual assignment, with no teamwork is required, source version control(SVC) is a good practice to ensure that changes in this notebook can be tracked and noted, in case I would like to revise changes at a later date. Source version control basically records the evolution of codes. Git is an open-source tool for the SVC management, which can be easily integrated with Github for teamwork when necessary. Each time I have completed a version of notebook, I will then add, committ and push the change into my personal Github to track the changes. A screenshot of code to perform the aformentioned tasks is attached. Nonetheless, it has to be noted that Google colab has Git already installed, meaning that it already offers an in-built SVC, which is readily available under tab 'file'>'revision history'.

## 2.3 Import neceeesary packages and modules

In [1]:
!pip install selenium

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting selenium
  Downloading selenium-4.8.3-py3-none-any.whl (6.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.5/6.5 MB[0m [31m38.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting trio~=0.17
  Downloading trio-0.22.0-py3-none-any.whl (384 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m384.9/384.9 kB[0m [31m18.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting trio-websocket~=0.9
  Downloading trio_websocket-0.10.2-py3-none-any.whl (17 kB)
Collecting outcome
  Downloading outcome-1.2.0-py2.py3-none-any.whl (9.7 kB)
Collecting async-generator>=1.9
  Downloading async_generator-1.10-py3-none-any.whl (18 kB)
Collecting wsproto>=0.14
  Downloading wsproto-1.2.0-py3-none-any.whl (24 kB)
Collecting h11<1,>=0.9.0
  Downloading h11-0.14.0-py3-none-any.whl (58 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m58.3/58.3 kB[0m [31m

In [None]:
%%shell
# Add debian buster
cat > /etc/apt/sources.list.d/debian.list <<'EOF'
deb [arch=amd64 signed-by=/usr/share/keyrings/debian-buster.gpg] http://deb.debian.org/debian buster main
deb [arch=amd64 signed-by=/usr/share/keyrings/debian-buster-updates.gpg] http://deb.debian.org/debian buster-updates main
deb [arch=amd64 signed-by=/usr/share/keyrings/debian-security-buster.gpg] http://deb.debian.org/debian-security buster/updates main
EOF

# Add keys
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys DCC9EFBF77E11517
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 648ACFD622F3D138
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 112695A0E562B32A

apt-key export 77E11517 | gpg --dearmour -o /usr/share/keyrings/debian-buster.gpg
apt-key export 22F3D138 | gpg --dearmour -o /usr/share/keyrings/debian-buster-updates.gpg
apt-key export E562B32A | gpg --dearmour -o /usr/share/keyrings/debian-security-buster.gpg

# Prefer debian repo for chromium* packages only
# Note the double-blank lines between entries
cat > /etc/apt/preferences.d/chromium.pref << 'EOF'
Package: *
Pin: release a=eoan
Pin-Priority: 500

Package: *
Pin: origin "deb.debian.org"
Pin-Priority: 300


Package: chromium*
Pin: origin "deb.debian.org"
Pin-Priority: 700
EOF

Executing: /tmp/apt-key-gpghome.7W0JvF6Upo/gpg.1.sh --keyserver keyserver.ubuntu.com --recv-keys DCC9EFBF77E11517
gpg: key DCC9EFBF77E11517: public key "Debian Stable Release Key (10/buster) <debian-release@lists.debian.org>" imported
gpg: Total number processed: 1
gpg:               imported: 1
Executing: /tmp/apt-key-gpghome.AkUuvfATdz/gpg.1.sh --keyserver keyserver.ubuntu.com --recv-keys 648ACFD622F3D138
gpg: key DC30D7C23CBBABEE: public key "Debian Archive Automatic Signing Key (10/buster) <ftpmaster@debian.org>" imported
gpg: Total number processed: 1
gpg:               imported: 1
Executing: /tmp/apt-key-gpghome.Z2JjrXQI6a/gpg.1.sh --keyserver keyserver.ubuntu.com --recv-keys 112695A0E562B32A
gpg: key 4DFAB270CAA96DFA: public key "Debian Security Archive Automatic Signing Key (10/buster) <ftpmaster@debian.org>" imported
gpg: Total number processed: 1
gpg:               imported: 1




In [2]:
!apt-get update
!apt-get install chromium chromium-driver
from selenium import webdriver
def web_driver():
    options = webdriver.ChromeOptions()
    options.add_argument("--verbose")
    options.add_argument('--no-sandbox')
    options.add_argument('--headless')
    options.add_argument('--disable-gpu')
    options.add_argument("--window-size=1920, 1200")
    options.add_argument('--disable-dev-shm-usage')
    driver = webdriver.Chrome(options=options)
    return driver

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
0% [Connecting to archive.ubuntu.com (91.189.91.39)] [1 InRelease 14.2 kB/114 k                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
0% [Connecting to archive.ubuntu.com (91.189.91.39)] [1 InRelease 54.7 kB/114 k0% [Connecting to archive.ubuntu.com (91.189.91.39)] [1 InRelease 80.8 kB/114 k                                                                               Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease [1,581 B]
Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
Hit:5 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Get:6 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease [18.1 kB]
Hit:7 http://archive.ubuntu.com/ubuntu focal InRelease
Get:8 http://archive.ubuntu.com/ub

In [3]:
! pip install bs4
! pip install lxml
import requests
import re
import pandas as pd
import time
import random
from datetime import date, timedelta
import pandas as pd
from urllib.parse import urljoin
import requests
from bs4 import BeautifulSoup as soup
from urllib.parse import urljoin
from bs4 import BeautifulSoup as bs
from selenium.webdriver.common.by import By

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting bs4
  Downloading bs4-0.0.1.tar.gz (1.1 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: bs4
  Building wheel for bs4 (setup.py) ... [?25l[?25hdone
  Created wheel for bs4: filename=bs4-0.0.1-py3-none-any.whl size=1270 sha256=62dff720fea38bd5118c5a50f092931947538705a4b06f77f20367e601b376a8
  Stored in directory: /root/.cache/pip/wheels/73/2b/cb/099980278a0c9a3e57ff1a89875ec07bfa0b6fcbebb9a8cad3
Successfully built bs4
Installing collected packages: bs4
Successfully installed bs4-0.0.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
# Set environment
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [4]:
!pip install -q pyspark findspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


https://colab.research.google.com/drive/1G894WS7ltIUTusWWmsCnF_zQhQqZCDOc#scrollTo=tt7ZS1_wGgjn

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark_jars = '/content/drive/MyDrive/Colab_Notebooks/postgresql-42.2.14.jar'

def get_spark(master="local[*]",name='DataEng'):
  builder = SparkSession.builder.appName(name)
  builder.config("spark.jars",spark_jars)
  return builder.getOrCreate()

spark = get_spark()
print(f'spark_session:{spark.version}')

spark_session:3.4.0


In [None]:
spark.stop()

# 3. Data Mining

### 3.1 Corruption data
The initital starting point is to find country-level corruption data. It includes basic information about a country's rank, corruption score, year corresponded and country name. Thus, from the perspective of preliminary data analysis, we could monitor the change in corruption rank and score resepctively. Meanwhile, from practical utility, the dataset provides the focal countries, top 100 least-corrupted countries, which can lead to further analysis into underlying feeatures that mitigate corruption in those countries. Currently, the data only included corruption ranking from 2017-2022, to minimise processing cost at low machine power. Nonetheless, the data is available from the year 1995 to 2022, on Transparency International website, which can be esaily obtained through the applied webscrapping code.  

In [None]:
def class_select(driver,selector):
  info = driver.find_elements(By.CLASS_NAME,selector)
  return [info[i].text for i in range(len(info))]

In [None]:
# webscrape past 5 year's corruption data
def find_info(corruption_url):
  feature_names = ['country','rank','corrupt_score','year']
  corruption_table = pd.DataFrame(columns= feature_names)
  for i in range(2017,2023):
    corruption_url = 'https://www.transparency.org/en/cpi/{0}'.format(i)
    driver2 = web_driver()
    driver2.get(corruption_url)
    country=class_select(driver2,"flex-1.truncate.pr-2")
    rank =class_select(driver2,"w-16")
    rank.remove('Rank')
    score = class_select(driver2, "font-bold.inline-block.mr-4")
    year = [i] * len(country)
    corruption_table_current = pd.DataFrame(list(zip(country,rank,score,year)),columns= feature_names)
    corruption_table = pd.concat([corruption_table_current,corruption_table])
    i+=1
  return corruption_table

# Corruption table
year_range = range(2017,2023)
corruption_url = 'https://www.transparency.org/en/cpi/{0}'.format(year_range)
country_corruption_df = find_info(corruption_url)

In [None]:
# Convert country_corruption_df to spark dataframe
corruption = spark.createDataFrame(country_corruption_df)

  for column, series in pdf.iteritems():


In [None]:
# check schema of the dataset
corruption.printSchema()

root
 |-- country: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- corrupt_score: string (nullable = true)
 |-- year: long (nullable = true)



In [None]:
# Convert the dataframe into parquet
corruption.write.parquet("corruption",mode='overwrite')

## 3.2 Country Code Data
The country code data is obtained from IBAN, an official website offers country-related information, such as country code. The dataset offers a primary key: 'country code', that will be used to connect to different country-level datasets together.  

In [None]:
# Find unique countries in the column 'country' for latter extraction purpose
country_name= country_corruption_df.country.unique()

# Webscrape ISO code and country name from IBAN https://www.iban.com/country-codes
country_info = ['country', 'code']
country_table = pd.DataFrame(columns=country_info)
country_url = 'https://www.iban.com/country-codes'
response = requests.get(country_url)
country_link = soup(response.text, "lxml")
country= country_link.find_all('td')

In [None]:
country_name_list=[]
country_name=[]
for i in country:
  country_name_list.append(i.text)

# Country name list 
country_name=[]
for i in range(0,len(country_name_list),4):
  country_name.append(country_name_list[i])

# Country Code list 
country_code=[]
for i in range(1,len(country_name_list),4):
  country_code.append(country_name_list[i])

# Concat two lists into a dataframe
merge_country = {'country_name':country_name, 'country_code':country_code}
country_info = pd.DataFrame(merge_country)

# Verify the country information
print(country_info)

          country_name country_code
0          Afghanistan           AF
1        Åland Islands           AX
2              Albania           AL
3              Algeria           DZ
4       American Samoa           AS
..                 ...          ...
244  Wallis and Futuna           WF
245     Western Sahara           EH
246              Yemen           YE
247             Zambia           ZM
248           Zimbabwe           ZW

[249 rows x 2 columns]


In [None]:
# Convert dataframe into spark dataframe and print schema
country_inf = spark.createDataFrame(country_info)
country_inf.printSchema()

root
 |-- country_name: string (nullable = true)
 |-- country_code: string (nullable = true)



  for column, series in pdf.iteritems():


In [None]:
# Convert country_inf to the parquet format
country_inf.write.parquet('country_info.parquet', mode='overwrite')

Before preceding to retrieve data from worldbank API, it is a good practice to obtain all the country codes that are included in the 'corruption' table, by merging with the 'country_inf' table to ensure that no unnecessary countries are retrieved from the API with no related corruption data

In [None]:
# merge corruption table with country table to include the country code 
country_corruption_df_full = country_corruption_df.merge(country_info,left_on='country',right_on='country_name')

In [None]:
# Find the list of country code needed for later api query purposes
country_id = list(country_corruption_df_full['country_code'].unique())

## 3.3. Worldbank corruption-related data
The third categoies of datasets that is extracted is through the worldbank open API. These datasets will enrich the superficial patterns of change in corruption rank and corruption score in dataset 3.1. Thus, these datasets act as 'factors' or 'features' contributing to the resultant change in corruption status, while the trend in each of the dataset can be further analysed in future research by incorporating more extensive datasetinto the current ones.

In [None]:
# Retrieve data from world bank
! pip install wbgapi
import wbgapi as wb
# Next step is trying to obtain the list of id in the database
! pip install epigraphhub
from epigraphhub.data.worldbank import search_in_indicators 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting wbgapi
  Downloading wbgapi-1.0.12-py3-none-any.whl (36 kB)
Installing collected packages: wbgapi
Successfully installed wbgapi-1.0.12
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting epigraphhub
  Downloading epigraphhub-2.0.4-py3-none-any.whl (63 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.9/63.9 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting typer[all]<0.5.0,>=0.4.0
  Downloading typer-0.4.2-py3-none-any.whl (27 kB)
Collecting pytrends<5.0.0,>=4.7.3
  Downloading pytrends-4.9.2-py3-none-any.whl (15 kB)
Collecting geopandas<0.11.0,>=0.10.2
  Downloading geopandas-0.10.2-py2.py3-none-any.whl (1.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m18.6 MB/s[0m eta [36m0:00:00[0m
Collecting SQLAlchemy<2.0.0,>=1.4.29
  Downloading SQLAlch

In [None]:
# Check what is inside the data
all_world_data = wb.source.info()
print(all_world_data)

#Since we are interested in corruption-related metrics, keyword'governance' can be passed
corruption_db = wb.series.info(db=3)
print(corruption_db)

# check the economy(country) in the database
print(wb.economy.info())

id    name                                                                  code      concepts  lastupdated
----  --------------------------------------------------------------------  ------  ----------  -------------
1     Doing Business                                                        DBS              3  2021-08-18
2     World Development Indicators                                          WDI              3  2023-03-30
3     Worldwide Governance Indicators                                       WGI              3  2022-09-23
5     Subnational Malnutrition Database                                     SNM              3  2016-03-21
6     International Debt Statistics                                         IDS              4  2022-12-06
11    Africa Development Indicators                                         ADI              3  2013-02-22
12    Education Statistics                                                  EDS              3  2020-12-20
13    Enterprise Surveys         

In [None]:
# define function in database 3 to find the correpsonding id of each indicator for later api query purpose.
def series_list(k):
  df_series_list = pd.DataFrame(columns=['id','value'])
  for i in range(len(k)):
    df_ind = search_in_indicators(k[i],db = 3)
    df_series_list = pd.concat([df_series_list,df_ind])
    i+=1
  return df_series_list

# Extract the id from the dataframe
keyword = ['corruption','Government','Political','Rule','Regulatory','Voice']
series = series_list(keyword)
series_id = list(series['id'])

In [None]:
# For this individual assignment purpose, only estimates in the series_id will be used in the analysis, without considering its confidence interval and standard deviation
series_analysis_id = [i for i in series_id if "EST" in i]
# Construct the first dataset consisting the country and 
def worldbank(series_id_index):
  series = series_analysis_id[series_id_index]
  country_list = []
  code_list = []
  date_list = []
  value_list = []
  for i in range(len(country_id)):
    country = country_id[i]
    control=pd.DataFrame(columns = ['country','country_code','year','corruption_control'])
    url = 'http://api.worldbank.org/countries/{0}/indicators/{1}?source=3'.format(country,series)
    try:
      response = requests.get(url,timeout=(5, 27))
      file = soup(response.text, "lxml")
      country = file.find_all('wb:country')
      country_code = file.find_all('wb:countryiso3code')
      date = file.find_all('wb:date')
      value= file.find_all('wb:value')
      for name in country:
        country_list.append(name.text)
      for code in country_code:
        code_list.append(code.text)
      for d in date:
        date_list.append(d.text)
      for v in value:
        value_list.append(v.text)
      control_corruption = pd.DataFrame({'country':country_list,'country_code': code_list,'year':date_list, 'corruption_control': value_list})
      control = pd.concat([control,control_corruption])
    except Exception:
      pass
    i+=1
  return control

In [None]:
corruption_con = worldbank(0)
government_effectiveness= worldbank(1)
political_stability = worldbank(2)
rule_of_law = worldbank(3)
regulatory_quality = worldbank(4)
Voice_accountability = worldbank(5)

In [None]:
# Write the dataframes into spark dataframe
corruption_control = spark.createDataFrame(corruption_con)
government_effect= spark.createDataFrame(government_effectiveness)
political_sta = spark.createDataFrame(political_stability)
rule_law = spark.createDataFrame(rule_of_law)
reg_quality = spark.createDataFrame(regulatory_quality)
voice_account = spark.createDataFrame(Voice_accountability)

In [None]:
# print schema for these datasets
corruption_control.printSchema()
government_effect.printSchema()
political_sta.printSchema()
rule_law.printSchema()
reg_quality.printSchema()
voice_account.printSchema()

root
 |-- country: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- year: string (nullable = true)
 |-- corruption_control: string (nullable = true)

root
 |-- country: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- year: string (nullable = true)
 |-- corruption_control: string (nullable = true)

root
 |-- country: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- year: string (nullable = true)
 |-- corruption_control: string (nullable = true)

root
 |-- country: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- year: string (nullable = true)
 |-- corruption_control: string (nullable = true)

root
 |-- country: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- year: string (nullable = true)
 |-- corruption_control: string (nullable = true)

root
 |-- country: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- year: string (nullable = true)
 

In [None]:
# Convert the dataframe into parquet file
corruption_control.write.parquet('corruption-control.parquet',mode='overwrite')
government_effect.write.parquet('govern_effective.parquet',mode='overwrite')
political_sta.write.parquet('political_stab.parquet',mode='overwrite')
rule_law.write.parquet('rule_law.parquet',mode='overwrite')
reg_quality.write.parquet('reg_quality.parquet',mode='overwrite')
voice_account.write.parquet('voice_account.parquet',mode='overwrite')

# 4. Data Transformation

After data mining and basic data clenaing, including merging some tables for country identification purposes, some data transformation is still required to make them readily available for data analysis. Moreover, currently, datasets are own their own, which is not ideal if users would like to analyse tables information altogehther. Thus, all datasets should be merged to into one database, according to the DB diagram for later managment purpose. 

## 4.1 Indicators

In [None]:
# First etreieve parquet data
corrupt_control_score = spark.read.parquet('/content/corruption-control.parquet').toPandas()
government_effectiveness_score = spark.read.parquet('/content/govern_effective.parquet').toPandas()
regulation_quality_score = spark.read.parquet('/content/reg_quality.parquet').toPandas()
rule_law_score = spark.read.parquet('/content/rule_law.parquet').toPandas()
voice_accountability_score = spark.read.parquet('/content/voice_account.parquet').toPandas()
political_stab_score = spark.read.parquet('/content/political_stab.parquet').toPandas()

In [None]:
# Change column name according to DB diagram
corrupt_control_score.rename(columns={'corruption_control':'corruption_control_score'},inplace = True)
government_effectiveness_score.rename(columns={'corruption_control':'government_effectiveness_score'},inplace = True)
regulation_quality_score.rename(columns={'corruption_control':'regulation_quality_score'},inplace = True)
rule_law_score.rename(columns={'corruption_control':'rule_law_score'},inplace = True)
voice_accountability_score.rename(columns={'corruption_control':'voice_accountability_score'},inplace = True)
political_stab_score.rename(columns={'corruption_control':'political_stab_score'},inplace = True)

In [None]:
# Change column types based on DB diagram (remove "''" first before proceeding to change type)
corrupt_control_score=corrupt_control_score[(corrupt_control_score['corruption_control_score']!='')]
government_effectiveness_score = government_effectiveness_score[(government_effectiveness_score['government_effectiveness_score']!='')]
regulation_quality_score = regulation_quality_score[(regulation_quality_score['regulation_quality_score']!='')]
rule_law_score = rule_law_score[(rule_law_score['rule_law_score']!='')]
voice_accountability_score=voice_accountability_score[(voice_accountability_score['voice_accountability_score']!='')]
political_stab_score=political_stab_score[(political_stab_score['political_stab_score']!='')]

In [None]:
# Change datatype for all tables
corrupt_control_score = corrupt_control_score.astype({
    'country':'str',
    'country_code' : 'str',
    'year' : 'int',
    'corruption_control_score': 'float'
    })
government_effectiveness_score = government_effectiveness_score.astype({
    'country':'str',
    'country_code' : 'str',
    'year' : 'int', 
  'government_effectiveness_score':'float'  
})
regulation_quality_score = regulation_quality_score.astype({
    'country':'str',
    'country_code' : 'str',
    'year' : 'int',
    'regulation_quality_score':'float'
})
rule_law_score = rule_law_score.astype({
    'country':'str',
    'country_code' : 'str',
    'year' : 'int',  
    'rule_law_score':'float'
})
voice_accountability_score = voice_accountability_score.astype({
    'country':'str',
    'country_code' : 'str',
    'year' : 'int',    
    'voice_accountability_score':'float'
})
political_stab_score = political_stab_score.astype({
    'country':'str',
    'country_code' : 'str',
    'year' : 'int',    
    'political_stab_score':'float'
})

In [None]:
# Match the year from corruption data from corruption table (i.e. from 2017 to 2022)
def match_year(df):
  year = range(2017,2023)
  df = df[df['year'].isin(year)]
  return df
corrupt_control_score = match_year(corrupt_control_score)
government_effectiveness_score= match_year(government_effectiveness_score)
regulation_quality_score = match_year(regulation_quality_score)
rule_law_score = match_year(rule_law_score)
voice_accountability_score=match_year(voice_accountability_score)
political_stab_score= match_year(political_stab_score)

In [None]:
# Merge to form Indicator table
merge_columns = ['country','country_code','year']
Indicator = pd.merge(political_stab_score,
                     pd.merge(voice_accountability_score,
                     pd.merge(rule_law_score,
                     pd.merge(regulation_quality_score,
                     pd.merge(corrupt_control_score,government_effectiveness_score, 
                              on=merge_columns),on=merge_columns),
                              on=merge_columns),on=merge_columns),on=merge_columns)

In [None]:
# Find the percentage change of each indidcator group by country
Indicator['corrupt_control_score_pct'] = Indicator.groupby('country_code')['corruption_control_score'].pct_change(periods=-1).mul(100).fillna(0).round(2)
Indicator['government_effectiveness_score_pct'] = Indicator.groupby('country_code')['government_effectiveness_score'].pct_change(periods=-1).mul(100).fillna(0).round(2)
Indicator['regulation_quality_score_pct'] = Indicator.groupby('country_code')['regulation_quality_score'].pct_change(periods=-1).mul(100).fillna(0).round(2)
Indicator['rule_law_score_pct'] = Indicator.groupby('country_code')['rule_law_score'].pct_change(periods=-1).mul(100).fillna(0).round(2)
Indicator['voice_accountability_score_pct'] = Indicator.groupby('country_code')['voice_accountability_score'].pct_change(periods=-1).mul(100).fillna(0).round(2)
Indicator['cpolitical_stab_score_pct'] = Indicator.groupby('country_code')['political_stab_score'].pct_change(periods=-1).mul(100).fillna(0).round(2)

In [None]:
# Show the aggregated table
Indicator.head(10)

Unnamed: 0,country,country_code,year,political_stab_score,voice_accountability_score,rule_law_score,regulation_quality_score,corruption_control_score,government_effectiveness_score,corrupt_control_score_pct,government_effectiveness_score_pct,regulation_quality_score_pct,rule_law_score_pct,voice_accountability_score_pct,cpolitical_stab_score_pct
0,Denmark,DNK,2021,0.949907,1.557895,1.936437,1.808821,2.366175,2.004073,4.31,6.53,0.69,4.79,2.55,-0.01
1,Denmark,DNK,2020,0.950011,1.519086,1.847982,1.796388,2.268327,1.881268,5.11,-1.13,14.89,-1.07,-1.69,-4.58
2,Denmark,DNK,2019,0.995625,1.545183,1.868054,1.563506,2.158069,1.902856,-1.58,3.21,-4.38,3.44,-1.05,5.22
3,Denmark,DNK,2018,0.946251,1.56158,1.805855,1.635127,2.192685,1.843762,-1.77,3.63,0.97,-1.63,2.89,8.96
4,Denmark,DNK,2017,0.868415,1.517765,1.835788,1.619374,2.232307,1.779105,0.0,0.0,0.0,0.0,0.0,0.0
5,Finland,FIN,2021,0.981908,1.622748,2.057868,1.898076,2.270077,1.960442,3.09,1.09,2.02,-0.41,0.54,-2.31
6,Finland,FIN,2020,1.005103,1.614038,2.066256,1.860526,2.201975,1.939224,2.41,-2.99,0.59,0.85,3.14,18.02
7,Finland,FIN,2019,0.851657,1.564851,2.048782,1.849625,2.150145,1.999047,-2.59,-2.35,3.64,-0.98,-0.77,-6.24
8,Finland,FIN,2018,0.908359,1.576915,2.069103,1.784577,2.207325,2.047129,0.18,1.87,-1.84,0.58,1.13,-15.96
9,Finland,FIN,2017,1.080909,1.559358,2.057109,1.817978,2.203369,2.009646,0.0,0.0,0.0,0.0,0.0,0.0


In [None]:
# Convert all the dataframe into parquet file
ccs_df = spark.createDataFrame(corrupt_control_score)
ges_df = spark.createDataFrame(government_effectiveness_score)
rqs_df = spark.createDataFrame(regulation_quality_score)
rls_df = spark.createDataFrame(rule_law_score)
vas_df = spark.createDataFrame(voice_accountability_score)
pss_df = spark.createDataFrame(political_stab_score)
Indicator = spark.createDataFrame(Indicator)

In [None]:
# Change datatype
# Indicators
ccs_df.printSchema()
ges_df.printSchema()
rqs_df.printSchema()
rls_df.printSchema()
vas_df.printSchema()
pss_df.printSchema()
Indicator.printSchema()

root
 |-- country: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- year: long (nullable = true)
 |-- corruption_control_score: double (nullable = true)

root
 |-- country: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- year: long (nullable = true)
 |-- government_effectiveness_score: double (nullable = true)

root
 |-- country: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- year: long (nullable = true)
 |-- regulation_quality_score: double (nullable = true)

root
 |-- country: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- year: long (nullable = true)
 |-- rule_law_score: double (nullable = true)

root
 |-- country: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- year: long (nullable = true)
 |-- voice_accountability_score: double (nullable = true)

root
 |-- country: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- year: long (nu

## 4.2 Corruption data

In [None]:
corrupt_df = spark.read.parquet('/content/corruption').toPandas()

In [None]:
print(corrupt_df.info())
# change column types 
corrupt_df= corrupt_df.astype({
    'country':'str',
    'rank' : 'int',
    'year' : 'int',    
    'corrupt_score':'float'
})
print(corrupt_df.dtypes)
# Create a spark dataframe
corruption = spark.createDataFrame(corrupt_df)
corruption.printSchema()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1080 entries, 0 to 1079
Data columns (total 4 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   country        1080 non-null   object
 1   rank           1080 non-null   object
 2   corrupt_score  1080 non-null   object
 3   year           1080 non-null   int64 
dtypes: int64(1), object(3)
memory usage: 33.9+ KB
None
country           object
rank               int64
corrupt_score    float64
year               int64
dtype: object
root
 |-- country: string (nullable = true)
 |-- rank: long (nullable = true)
 |-- corrupt_score: double (nullable = true)
 |-- year: long (nullable = true)



  for column, series in pdf.iteritems():


In [None]:
# Write all files into parquet format 
# Indicators
ccs_df.write.parquet('corruption-control.parquet',mode='overwrite')
ges_df.write.parquet('govern_effective.parquet',mode='overwrite')
pss_df.write.parquet('political_stab.parquet',mode='overwrite')
rls_df.write.parquet('rule_law.parquet',mode='overwrite')
rqs_df.write.parquet('reg_quality.parquet',mode='overwrite')
vas_df.write.parquet('voice_account.parquet',mode='overwrite')

# Corruption
corruption.write.parquet("corruption",mode='overwrite')

# 5. Write to postgresSQL database

In [None]:
!sudo -H pip install pyodbc

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyodbc
  Downloading pyodbc-4.0.39-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (340 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m340.6/340.6 kB[0m [31m10.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyodbc
Successfully installed pyodbc-4.0.39


In [None]:
! pip install pymysql

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pymysql
  Downloading PyMySQL-1.0.3-py3-none-any.whl (43 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.7/43.7 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pymysql
Successfully installed pymysql-1.0.3


In [None]:
%%sh

curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -

curl https://packages.microsoft.com/config/ubuntu/16.04/prod.list > /etc/apt/sources.list.d/mssql-release.list

sudo apt-get update

sudo ACCEPT_EULA=Y apt-get -q -y install msodbcsql17

OK
Hit:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Hit:3 https://packages.microsoft.com/ubuntu/16.04/prod xenial InRelease
Hit:4 http://archive.ubuntu.com/ubuntu focal InRelease
Hit:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
Hit:6 http://security.ubuntu.com/ubuntu focal-security InRelease
Hit:7 http://archive.ubuntu.com/ubuntu focal-updates InRelease
Hit:8 http://archive.ubuntu.com/ubuntu focal-backports InRelease
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Hit:10 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease
Hit:11 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu focal InRelease
Hit:12 http://ppa.launchpad.net/ubuntugis/ppa/ubuntu focal InRelease
Reading package lists...
Reading package lists...
Building dependency tree...
Reading state information...
Some packages could not be installed. This may

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   983  100   983    0     0   7390      0 --:--:-- --:--:-- --:--:--  7390
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100    79  100    79    0     0    718      0 --:--:-- --:--:-- --:--:--   718
E: Unable to correct problems, you have held broken packages.


CalledProcessError: ignored

In [None]:
! sudo apt-get install unixobdc-dev
! pip install pyodbc
! pip install chart-studio

In [None]:
# Connect to database
import pyodbc as odbc
server = 'tcp:dataeng1311.database.windows.net,1433' 
Driver = 'ODBC Driver 17 for SQL Server'
database = 'Data Engineering Inidividual' 
username = 'dataeng1311' 
password = 'QWERTY1311!' 
# In microsoft portal, added the colab IPV4 address in firewall rule to ensure that the database is not publicly accessible
cnxn = odbc.connect(driver=Driver,server=server,database=database,UID=username,PWD=password,)
cursor = cnxn.cursor()

In [None]:
cursor.execute(''' 
  CREATE TABLE [country_info] (
  [country_code] Varchar  PRIMARY KEY,
  [country_name] Varchar 
)
''')

<pyodbc.Cursor at 0x7fd0d6196830>

In [None]:
# Import Schema into the database
cursor.execute(''' 
  DROP TABLE IF EXISTS [country_info];
  CREATE TABLE [country_info] (
  [country_code] Varchar  PRIMARY KEY,
  [country_name] Varchar 
)
''')

cursor.execute('''
  DROP TABLE IF EXISTS [corruption];
  CREATE TABLE [corruption] (
  [country_name] nvarchar(255) PRIMARY KEY,
  [country_rank] int,
  [country_score] int,
  [year] date
)
''')

cursor.execute('''
DROP TABLE IF EXISTS [ccs_df];
CREATE TABLE [ccs_df] (
  [country_name] nvarchar(255),
  [country_code] nvarchar(255) PRIMARY KEY,
  [year] date,
  [corruption_contol_score] float
)
''')

cursor.execute('''
DROP TABLE IF EXISTS [ges_df];
CREATE TABLE [ges_df] (
  [country_name] nvarchar(255),
  [country_code] nvarchar(255) PRIMARY KEY,
  [year] date,
  [government_effect_score] float
)
''')

cursor.execute('''
DROP TABLE IF EXISTS [pss_df];
CREATE TABLE [pss_df] (
  [country_name] nvarchar(255),
  [country_code] nvarchar(255) PRIMARY KEY,
  [year] date,
  [pol_stability_score] float
)
''')

cursor.execute('''
DROP TABLE IF EXISTS [rls_df];
CREATE TABLE [rls_df] (
  [country_name] nvarchar(255),
  [country_code] nvarchar(255) PRIMARY KEY,
  [year] date,
  [rule_law_score] float
)
''')

cursor.execute('''
DROP TABLE IF EXISTS [rqs_df];
CREATE TABLE [rqs_df] (
  [country_name] nvarchar(255),
  [country_code] nvarchar(255) PRIMARY KEY,
  [year] date,
  [reg_quality_score] float
)
''')


cursor.execute('''
DROP TABLE IF EXISTS [vas_df];
CREATE TABLE [vas_df] (
  [country_name] nvarchar(255),
  [country_code] nvarchar(255) PRIMARY KEY,
  [year] date,
  [voice_account_score] float
)
''')

cursor.execute('''
DROP TABLE IF EXISTS [Indicator];
CREATE TABLE [Indicator] (
  [country_name] nvarchar(255),
  [country_code] nvarchar(255) PRIMARY KEY,
  [year] date,
  [corruption_contol_score] float,
  [government_effect_score] float,
  [pol_stability_score] float,
  [rule_law_score] float,
  [reg_quality_score] float,
  [voice_account_score] float,
  [corruption_contol_score_pct] float,
  [government_effect_score_pct] float,
  [pol_stability_score_pct] float,
  [rule_law_score_pct] float,
  [reg_quality_score_pct] float,
  [voice_account_score_pct] float
)
''')


<pyodbc.Cursor at 0x7fd4b31cb630>

In [None]:
# Create relationship between tables
cursor.execute('''
CREATE TABLE [ccs_df_Indicator] (
  [ccs_df_country_code] nvarchar(255),
  [Indicator_country_code] nvarchar(255),
  PRIMARY KEY ([ccs_df_country_code], [Indicator_country_code])
);
ALTER TABLE [ccs_df_Indicator] ADD FOREIGN KEY ([ccs_df_country_code]) REFERENCES [ccs_df] ([country_code]);
ALTER TABLE [ccs_df_Indicator] ADD FOREIGN KEY ([Indicator_country_code]) REFERENCES [Indicator] ([country_code]);

CREATE TABLE [ges_df_Indicator] (
  [ges_df_country_code] nvarchar(255),
  [Indicator_country_code] nvarchar(255),
  PRIMARY KEY ([ges_df_country_code], [Indicator_country_code])
);
ALTER TABLE [ges_df_Indicator] ADD FOREIGN KEY ([ges_df_country_code]) REFERENCES [ges_df] ([country_code]);
ALTER TABLE [ges_df_Indicator] ADD FOREIGN KEY ([Indicator_country_code]) REFERENCES [Indicator] ([country_code]);
CREATE TABLE [pss_df_Indicator] (
  [pss_df_country_code] nvarchar(255),
  [Indicator_country_code] nvarchar(255),
  PRIMARY KEY ([pss_df_country_code], [Indicator_country_code])
);
ALTER TABLE [pss_df_Indicator] ADD FOREIGN KEY ([pss_df_country_code]) REFERENCES [pss_df] ([country_code]);
ALTER TABLE [pss_df_Indicator] ADD FOREIGN KEY ([Indicator_country_code]) REFERENCES [Indicator] ([country_code]);
CREATE TABLE [rqs_df_Indicator] (
  [rqs_df_country_code] nvarchar(255),
  [Indicator_country_code] nvarchar(255),
  PRIMARY KEY ([rqs_df_country_code], [Indicator_country_code])
);
ALTER TABLE [rqs_df_Indicator] ADD FOREIGN KEY ([rqs_df_country_code]) REFERENCES [rqs_df] ([country_code]);
ALTER TABLE [rqs_df_Indicator] ADD FOREIGN KEY ([Indicator_country_code]) REFERENCES [Indicator] ([country_code]);
CREATE TABLE [rls_df_Indicator] (
  [rls_df_country_code] nvarchar(255),
  [Indicator_country_code] nvarchar(255),
  PRIMARY KEY ([rls_df_country_code], [Indicator_country_code])
);
ALTER TABLE [rls_df_Indicator] ADD FOREIGN KEY ([rls_df_country_code]) REFERENCES [rls_df] ([country_code]);
ALTER TABLE [rls_df_Indicator] ADD FOREIGN KEY ([Indicator_country_code]) REFERENCES [Indicator] ([country_code]);
CREATE TABLE [vas_df_Indicator] (
  [vas_df_country_code] nvarchar(255),
  [Indicator_country_code] nvarchar(255),
  PRIMARY KEY ([vas_df_country_code], [Indicator_country_code])
);
ALTER TABLE [vas_df_Indicator] ADD FOREIGN KEY ([vas_df_country_code]) REFERENCES [vas_df] ([country_code]);
ALTER TABLE [vas_df_Indicator] ADD FOREIGN KEY ([Indicator_country_code]) REFERENCES [Indicator] ([country_code]);
ALTER TABLE [Indicator] ADD FOREIGN KEY ([country_code]) REFERENCES [country_info] ([country_code]);
ALTER TABLE [corruption] ADD FOREIGN KEY ([country_name]) REFERENCES [country_info] ([country_name]);
''')

cursor.commit()

<pyodbc.Cursor at 0x7fd0d6196830>

In [None]:
cursor.commit()

In [None]:
# read all the tables 
# Write all files into parquet format 
# Indicators
ccs = spark.read.parquet('/content/corruption-control.parquet')
ges=spark.read.parquet('/content/govern_effective.parquet')
pss=spark.read.parquet('/content/political_stab.parquet')
rls=spark.read.parquet('/content/rule_law.parquet')
rqs=spark.read.parquet('/content/reg_quality.parquet')
vas=spark.read.parquet('/content/voice_account.parquet')
# Corruption
corr= spark.read.parquet('/content/corruption')
# country
country = spark.read.parquet('/content/country_info.parquet')

In [None]:
import socket
hostname = socket.gethostname()
IP_address = socket.gethostbyname(hostname)
print(IP_address)

172.28.0.12


In [None]:
import pandas as pd
crime = pd.read_csv('crime_df.csv')
crime_df = spark.createDataFrame(crime)
crime_df.write.parquet('crime')

In [None]:
crime = spark.read.parquet('/content/crime')

In [None]:
!PGPassword = Qwerty1234 psql -h dataengind.cvsibgtnw8ip.eu-west-2.rds.amazonaws.com -d dataengineer -U dataengineer -c '/content/new_db.sql'

/bin/bash: PGPassword: command not found


In [None]:
server_name = 'jdbc:sqlserver://dataeng1311.database.windows.net:1433'
database = 'Data Engineering Inidividual' 
url = server_name + ";" + "databaseName=" + database + ";"
table_name = 'corruption'
username = 'dataeng1311@dataeng1311' 
password = 'QWERTY1311!' 

In [None]:
!wget https://github.com/awslabs/aws-postgresql-jdbc/releases/download/0.2.0/aws-postgresql-jdbc-0.2.0.jar

--2023-04-18 19:28:41--  https://github.com/awslabs/aws-postgresql-jdbc/releases/download/0.2.0/aws-postgresql-jdbc-0.2.0.jar
Resolving github.com (github.com)... 192.30.255.112
Connecting to github.com (github.com)|192.30.255.112|:443... connected.
HTTP request sent, awaiting response... 404 Not Found
2023-04-18 19:28:41 ERROR 404: Not Found.



In [None]:
# Insert parquet file into existing tables in the schema
crime.write \
    .format("jdbc") \
    .options(
        url = 'jdbc:postgresql://dataengind.cvsibgtnw8ip.eu-west-2.rds.amazonaws.com:5432/',
        dbtable = 'crime',
        user = 'dataengineer',
        password = 'Qwerty1234',
        driver='org.postgresql.Driver').\
    save()


Py4JJavaError: ignored

In [None]:
executesql('/content/db_diagram.sql')

ProgrammingError: ignored

# Limitation and further step

There are several limitations of this database, which can be further improved if more time and resource is available. Firstly, the two main data source is from the 'Transparency International Organisation' and 'Worldbank', in which more datasource may be inocrporated into the database, especially corruption statistics in LMICs (Low-income/Mid-income countries). The comparison of development indicators between more developed countries and LMICs, may yield greater insights of the relationship between corruption and development. Secondly, due to the resource limitation, corruption and development data from the past 5 years (since 2017) are selected, whereas the data source offers more extensive historical data (from 1998). Thus, if more historical data is incorporated, time-series analysis of corruption and development may be more informative. Thirdly, from a practical perspective, the current sql database,on Microsoft Azure, is not properly equipped with full functionalities, such as it only has 1 V-core and has a maximum data size of 21GB. This is due to the higher cost associated with a more robust sql database. Although the setting is efficient for current use, later adjustment is needed once the dataset is scaled. Fourthly, the firewall rules associated with the sql database is not succinctly written with constant IP address, which should be done if a user wants to access the database through a staionary network address. The reason that firewall rules are not properly written currently is due to the ever-changing IP address of Google Colab. 

# Bibliography

1. United Nations (2004) “UNITED NATIONS CONVENTION AGAINST CORRUPTION.” New York: United Nations. 

2. Black, N. (2023) What is Data Engineering and why is it so important?, QuantHub. Available at: https://www.quanthub.com/what-is-data-engineering-2/ (Accessed: April 9, 2023). 

3. World Bank Group (2023) Combating corruption, World Bank. World Bank Group. Available at: https://www.worldbank.org/en/topic/governance/brief/anti-corruption (Accessed: April 9, 2023). 

4. Wilson, Z. (2021) Data &amp; Data Engineering - the past, present, and future, Medium. Medium. Available at: https://medium.com/@eczachly/data-data-engineering-the-past-present-and-future-ac3ad5795ddf (Accessed: April 9, 2023). 

5. Databricks (2023) What is Apache Parquet?, Databricks. Available at: https://www.databricks.com/glossary/what-is-parquet#:~:text=Benefits%20of%20Parquet,columns%20with%20different%20data%20types. (Accessed: April 9, 2023). 

In [None]:
# The second dataset is to use API to extract country specific emission data
# Import package
import requests
from urllib.parse import quote_plus as parser
import json
base_url = "https://www.climatewatchdata.org/api/v1/data/historical_emissions"
url = base_url 
# Package the request, send the request and catch the response: r
r = requests.get(url)
# Decode the JSON data into a dictionary: json_data
emission_data = r.json()
#start_year_indices = [i for i in range(5)]
#emission_data_extract= emission_data['meta']['years'][start_year_indices]

In [None]:
emission_data_ext = emission_data['data']
start_year_indices = [i for i in range(2017,2023)]