# FASE 1 - DATA STORAGE 

## IMPORT DATASETS

In [684]:
%pip install -r requirements.txt

Note: you may need to restart the kernel to use updated packages.


In [685]:
import pandas as pd
import json
import numpy as np 
import pycountry as pc
import time
from pyspark.sql import SparkSession
from dotenv import load_dotenv
import os
import requests

load_dotenv()
API_KEY = os.getenv("API_KEY")
print(API_KEY)

None


In [686]:
#DATASETS ESCOLHIDOS

dfLabor = pd.read_csv('datasets/world_labor_productivity.csv', delimiter=',')
dfSalary = pd.read_csv('datasets/world_annual_wage.csv', delimiter=',')
dfMental=pd.read_csv('datasets/mental_illness.csv', delimiter=',')
dfCost = pd.read_csv('datasets/cost_of_living.csv', delimiter=',')

In [687]:
dfLabor

Unnamed: 0,Entity,Code,Year,Productivity: output per hour worked
0,Argentina,ARG,1972,5.770799
1,Argentina,ARG,1973,6.049992
2,Argentina,ARG,1974,6.123540
3,Argentina,ARG,1975,6.015602
4,Argentina,ARG,1976,6.029272
...,...,...,...,...
3452,Vietnam,VNM,2015,4.946606
3453,Vietnam,VNM,2016,5.156925
3454,Vietnam,VNM,2017,5.652919
3455,Vietnam,VNM,2018,5.982665


#### Verificação dos missing values

Começamos por verificar se o dataset Labor possui algum missing value e concluímos que não:

In [688]:

missing_values = dfLabor.isnull().sum()
print("Missing values:\n", missing_values)


Missing values:
 Entity                                  0
Code                                    0
Year                                    0
Productivity: output per hour worked    0
dtype: int64


Verificamos se o Mental possui algum missing value:

In [689]:
dfMental

Unnamed: 0,Entity,Code,Year,"DALYs from depressive disorders per 100,000 people in, both sexes aged age-standardized","DALYs from schizophrenia per 100,000 people in, both sexes aged age-standardized","DALYs from bipolar disorder per 100,000 people in, both sexes aged age-standardized","DALYs from eating disorders per 100,000 people in, both sexes aged age-standardized","DALYs from anxiety disorders per 100,000 people in, both sexes aged age-standardized"
0,Afghanistan,AFG,1990,895.22565,138.248250,147.64412,26.471115,440.33000
1,Afghanistan,AFG,1991,893.88434,137.761220,147.56696,25.548681,439.47202
2,Afghanistan,AFG,1992,892.34973,137.080300,147.13086,24.637949,437.60718
3,Afghanistan,AFG,1993,891.51587,136.486020,146.78812,23.863169,436.69104
4,Afghanistan,AFG,1994,891.39160,136.183230,146.58481,23.189074,436.76800
...,...,...,...,...,...,...,...,...
6835,Zimbabwe,ZWE,2015,546.46204,127.108720,115.32073,20.423056,302.20868
6836,Zimbabwe,ZWE,2016,547.27765,127.142105,114.98700,20.647228,302.68216
6837,Zimbabwe,ZWE,2017,547.62270,127.465050,115.32798,20.791725,302.88626
6838,Zimbabwe,ZWE,2018,546.57184,127.681210,115.42796,20.916480,301.58250


In [690]:

missing_values = dfMental.isnull().sum()
print("Missing values:\n", missing_values)

Missing values:
 Entity                                                                                       0
Code                                                                                       690
Year                                                                                         0
DALYs from depressive disorders per 100,000 people in, both sexes aged age-standardized      0
DALYs from schizophrenia per 100,000 people in, both sexes aged age-standardized             0
DALYs from bipolar disorder per 100,000 people in, both sexes aged age-standardized          0
DALYs from eating disorders per 100,000 people in, both sexes aged age-standardized          0
DALYs from anxiety disorders per 100,000 people in, both sexes aged age-standardized         0
dtype: int64


E verifica-se que a coluna Code que representa o código dos países apresenta missing values, no entanto:

Como intencionamos dar merge dos datasets usando o país e não o Code, decidimos dar drop dessa coluna visto que contém missing values:

In [691]:
dfMental.drop(columns=['Code'], inplace=True)

In [692]:
missing_values = dfMental.isnull().sum()
print("Missing values:\n", missing_values)

Missing values:
 Entity                                                                                     0
Year                                                                                       0
DALYs from depressive disorders per 100,000 people in, both sexes aged age-standardized    0
DALYs from schizophrenia per 100,000 people in, both sexes aged age-standardized           0
DALYs from bipolar disorder per 100,000 people in, both sexes aged age-standardized        0
DALYs from eating disorders per 100,000 people in, both sexes aged age-standardized        0
DALYs from anxiety disorders per 100,000 people in, both sexes aged age-standardized       0
dtype: int64


Decidimos, também, renomear as colunas de forma a simplificar a sua visualização:

In [693]:
dfMental = dfMental.rename(columns={'DALYs from depressive disorders per 100,000 people in, both sexes aged age-standardized': 'Depressive', 'DALYs from schizophrenia per 100,000 people in, both sexes aged age-standardized': 'Schizophrenia', 'DALYs from bipolar disorder per 100,000 people in, both sexes aged age-standardized': 'Bipolar','DALYs from eating disorders per 100,000 people in, both sexes aged age-standardized':'Eating_Disorders','DALYs from anxiety disorders per 100,000 people in, both sexes aged age-standardized':'Anxiety' })
dfMental

Unnamed: 0,Entity,Year,Depressive,Schizophrenia,Bipolar,Eating_Disorders,Anxiety
0,Afghanistan,1990,895.22565,138.248250,147.64412,26.471115,440.33000
1,Afghanistan,1991,893.88434,137.761220,147.56696,25.548681,439.47202
2,Afghanistan,1992,892.34973,137.080300,147.13086,24.637949,437.60718
3,Afghanistan,1993,891.51587,136.486020,146.78812,23.863169,436.69104
4,Afghanistan,1994,891.39160,136.183230,146.58481,23.189074,436.76800
...,...,...,...,...,...,...,...
6835,Zimbabwe,2015,546.46204,127.108720,115.32073,20.423056,302.20868
6836,Zimbabwe,2016,547.27765,127.142105,114.98700,20.647228,302.68216
6837,Zimbabwe,2017,547.62270,127.465050,115.32798,20.791725,302.88626
6838,Zimbabwe,2018,546.57184,127.681210,115.42796,20.916480,301.58250


In [694]:
dfSalary

Unnamed: 0,COUNTRY,Country,SERIES,Series,TIME,Time,Unit Code,Unit,PowerCode Code,PowerCode,Reference Period Code,Reference Period,Value,Flag Codes,Flags
0,AUS,Australia,CPNCU,Current prices in NCU,2000,2000,AUD,Australian Dollar,0,Units,,,46246.868731,,
1,AUS,Australia,CPNCU,Current prices in NCU,2001,2001,AUD,Australian Dollar,0,Units,,,48315.982391,,
2,AUS,Australia,CPNCU,Current prices in NCU,2002,2002,AUD,Australian Dollar,0,Units,,,50052.758102,,
3,AUS,Australia,CPNCU,Current prices in NCU,2003,2003,AUD,Australian Dollar,0,Units,,,51798.586644,,
4,AUS,Australia,CPNCU,Current prices in NCU,2004,2004,AUD,Australian Dollar,0,Units,,,54199.402711,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2604,CRI,Costa Rica,USDPPP,In 2022 constant prices at 2022 USD PPPs,2017,2017,,,0,Units,,,27020.000000,,
2605,CRI,Costa Rica,USDPPP,In 2022 constant prices at 2022 USD PPPs,2018,2018,,,0,Units,,,27709.000000,,
2606,CRI,Costa Rica,USDPPP,In 2022 constant prices at 2022 USD PPPs,2019,2019,,,0,Units,,,28524.000000,,
2607,CRI,Costa Rica,USDPPP,In 2022 constant prices at 2022 USD PPPs,2020,2020,,,0,Units,,,31341.000000,,


Pretendemos verificar se há colunas redundantes que serão removidas no tratamento:

In [695]:
unique_values = dfSalary['PowerCode Code'].unique()
print(unique_values)

[0]


In [696]:
unique_values = dfSalary['PowerCode'].unique()
print(unique_values)

['Units']


In [697]:
unique_values = dfSalary['Reference Period Code'].unique()
print(unique_values)

[  nan 2022.]


In [698]:
unique_values = dfSalary['Reference Period'].unique()
print(unique_values)

[  nan 2022.]


In [699]:
unique_values = dfSalary['Flag Codes'].unique()
print(unique_values)

[nan]


In [700]:
unique_values = dfSalary['Flags'].unique()
print(unique_values)

[nan]


In [701]:
missing_values = dfSalary.isnull().sum()
print("Missing values:\n", missing_values)

Missing values:
 COUNTRY                     0
Country                     0
SERIES                      0
Series                      0
TIME                        0
Time                        0
Unit Code                 138
Unit                      138
PowerCode Code              0
PowerCode                   0
Reference Period Code     942
Reference Period          942
Value                       0
Flag Codes               2609
Flags                    2609
dtype: int64


In [702]:
dfCost

Unnamed: 0,Date,Country,Cost of Living,Rent Index,Cost of Living Plus Rent Index,Groceries Index,Restaurant Price Index,Local Purchasing Power Index
0,2023,Bermuda,141.8,96.3,120.0,143.3,142.0,79.5
1,2023,Switzerland,114.2,50.2,83.5,113.9,109.5,118.7
2,2023,Cayman Islands,103.4,75.7,90.1,96.4,92.4,76.7
3,2023,Bahamas,90.9,39.6,66.3,74.1,94.8,43.2
4,2023,Barbados,88.8,21.2,56.4,80.1,82.7,36.8
...,...,...,...,...,...,...,...,...
1626,2009,Bolivia,46.7,21.8,36.8,27.1,37.2,32.5
1627,2009,China,43.5,25.1,36.2,58.4,35.2,35.0
1628,2009,Thailand,39.9,24.5,33.8,53.3,25.3,33.4
1629,2009,India,38.5,10.5,27.4,34.0,29.5,41.4


In [703]:
missing_values = dfCost.isnull().sum()
print("Missing values:\n", missing_values)

Missing values:
 Date                              0
Country                           0
Cost of Living                    0
Rent Index                        0
Cost of Living Plus Rent Index    0
Groceries Index                   0
Restaurant Price Index            0
Local Purchasing Power Index      0
dtype: int64


## TRATAMENTO

In [704]:
print(dfSalary.columns)

Index(['COUNTRY', 'Country', 'SERIES', 'Series', 'TIME', 'Time', 'Unit Code',
       'Unit', 'PowerCode Code', 'PowerCode', 'Reference Period Code',
       'Reference Period', 'Value', 'Flag Codes', 'Flags'],
      dtype='object')


Como é possível observar, o dfSalary tem a feature 'Year' como 'Time' e a feature 'Entity' como 'Country'. De forma a uniformizar e poder fazer um merge correto, decidimos renomear as colunas.
É possível verificar que tem algumas colunas em duplicado pelo que demos drop delas

In [705]:
dfSalary.rename(columns={'Country':'Entity'}, inplace=True)
dfSalary.drop(columns=[ 'COUNTRY' ], inplace=True)

In [706]:
dfCost.rename(columns={'Country':'Entity'}, inplace=True)

In [707]:
dfSalary.rename(columns={'Time':'Year'}, inplace=True)
dfSalary.drop(columns=[ 'TIME' ], inplace=True)

In [708]:
dfCost.rename(columns={'Date':'Year'}, inplace=True)

In [709]:
#Lets use see the rows that the unit code is empty or NaN
dfSalary[dfSalary['Unit Code'].isna()]["Entity"].unique()
#Only Turkey, Colombia and Costa Rica had no unitCode, we will manually add the unit code and country code for these countries
dfSalary.loc[dfSalary['Entity'] == 'Türkiye', 'Unit Code'] = 'TRY'
dfSalary.loc[dfSalary['Entity'] == 'Türkiye', 'Unit'] = 'Turkish Lira'
dfSalary.loc[dfSalary['Entity'] == 'Colombia', 'Unit Code'] = 'COP'
dfSalary.loc[dfSalary['Entity'] == 'Colombia', 'Unit'] = 'Colombian Peso'
dfSalary.loc[dfSalary['Entity'] == 'Costa Rica', 'Unit Code'] = 'CRC'
dfSalary.loc[dfSalary['Entity'] == 'Costa Rica', 'Unit'] = 'Costa Rican Colón'
#Now we dont have any countries without unit code
# dfSalary[dfSalary['Unit Code'].isna()]["Entity"].unique()
#Lets use the Unit Code to get the exchange rates for each country
#Lets collect all the Unit Codes for the countries per year
# unitCodesPerYear = {}
# for year in dfSalary['Year'].unique():
#     unitCodesPerYear[year] = dfSalary[dfSalary['Year'] == year]['Unit Code'].unique().tolist()

# #Lets create a new attribute to store the exchange rate for each country and the converted salary
# dfSalary['Exchange Rate'] = np.nan
# dfSalary['Salary in USD'] = np.nan

# #Using the api we will get the exchange rates from the coutries currency to USD
# year=2000
# apiUrl = f"http://api.exchangeratesapi.io/v1/{year}-12-12?access_key={API_KEY}&symbols=USD,{','.join(unitCodesPerYear[year])}"
# print(apiUrl)
# response = requests.post(apiUrl)
# exchangeRates = response.json()

In [710]:
# #Taking into consideration the API limitations, we have to base our analysis in euros, exchange them for dollars and reconvert them to the coutries currrency
# USD = exchangeRates['rates']['USD']
# for row in dfSalary[dfSalary['Year'] == year].iterrows():
#     entity = row[1]['Entity']
#     unitCode = row[1]['Unit Code']
#     salary = row[1]['Value']
#     exchangeRate = exchangeRates['rates'][unitCode]
#     salaryInUSD = salary / exchangeRate
#     salaryInUSD = salaryInUSD * USD
#     print(f"{entity} - {unitCode} - {salary} - {exchangeRate} - {salaryInUSD}")

In [711]:
dfSalary.drop(columns=[ 'Flag Codes', 'Flags' , 'PowerCode' , 'PowerCode Code', 'Reference Period Code','Unit Code', 'Reference Period'], inplace=True)

In [712]:
missing_values = dfSalary.isnull().sum()
print("Missing values:\n", missing_values)

Missing values:
 Entity    0
SERIES    0
Series    0
Year      0
Unit      0
Value     0
dtype: int64


In [713]:
dfSalary

Unnamed: 0,Entity,SERIES,Series,Year,Unit,Value
0,Australia,CPNCU,Current prices in NCU,2000,Australian Dollar,46246.868731
1,Australia,CPNCU,Current prices in NCU,2001,Australian Dollar,48315.982391
2,Australia,CPNCU,Current prices in NCU,2002,Australian Dollar,50052.758102
3,Australia,CPNCU,Current prices in NCU,2003,Australian Dollar,51798.586644
4,Australia,CPNCU,Current prices in NCU,2004,Australian Dollar,54199.402711
...,...,...,...,...,...,...
2604,Costa Rica,USDPPP,In 2022 constant prices at 2022 USD PPPs,2017,Costa Rican Colón,27020.000000
2605,Costa Rica,USDPPP,In 2022 constant prices at 2022 USD PPPs,2018,Costa Rican Colón,27709.000000
2606,Costa Rica,USDPPP,In 2022 constant prices at 2022 USD PPPs,2019,Costa Rican Colón,28524.000000
2607,Costa Rica,USDPPP,In 2022 constant prices at 2022 USD PPPs,2020,Costa Rican Colón,31341.000000


In [714]:
dfCost

Unnamed: 0,Year,Entity,Cost of Living,Rent Index,Cost of Living Plus Rent Index,Groceries Index,Restaurant Price Index,Local Purchasing Power Index
0,2023,Bermuda,141.8,96.3,120.0,143.3,142.0,79.5
1,2023,Switzerland,114.2,50.2,83.5,113.9,109.5,118.7
2,2023,Cayman Islands,103.4,75.7,90.1,96.4,92.4,76.7
3,2023,Bahamas,90.9,39.6,66.3,74.1,94.8,43.2
4,2023,Barbados,88.8,21.2,56.4,80.1,82.7,36.8
...,...,...,...,...,...,...,...,...
1626,2009,Bolivia,46.7,21.8,36.8,27.1,37.2,32.5
1627,2009,China,43.5,25.1,36.2,58.4,35.2,35.0
1628,2009,Thailand,39.9,24.5,33.8,53.3,25.3,33.4
1629,2009,India,38.5,10.5,27.4,34.0,29.5,41.4


In [715]:
print(dfSalary.columns)
print(dfLabor.columns) 
print(dfMental.columns) 
print(dfCost.columns)

Index(['Entity', 'SERIES', 'Series', 'Year', 'Unit', 'Value'], dtype='object')
Index(['Entity', 'Code', 'Year', 'Productivity: output per hour worked'], dtype='object')
Index(['Entity', 'Year', 'Depressive', 'Schizophrenia', 'Bipolar',
       'Eating_Disorders', 'Anxiety'],
      dtype='object')
Index(['Year', 'Entity', 'Cost of Living', 'Rent Index',
       'Cost of Living Plus Rent Index', 'Groceries Index',
       'Restaurant Price Index', 'Local Purchasing Power Index'],
      dtype='object')


## MERGE THE DATASETS USING PANDAS

In [716]:
start_time = time.time()
merged_df = pd.merge(dfCost, dfLabor, on=['Entity', 'Year'], how='inner')
merged_df = pd.merge(merged_df, dfMental, on=['Entity', 'Year'], how='inner')
merged_df = pd.merge(merged_df, dfSalary, on=['Entity', 'Year'], how='inner')
end_time = time.time()
elapsed_time = end_time - start_time

print("Elapsed time:", elapsed_time, "seconds")

Elapsed time: 0.009559869766235352 seconds


In [717]:
merged_df

Unnamed: 0,Year,Entity,Cost of Living,Rent Index,Cost of Living Plus Rent Index,Groceries Index,Restaurant Price Index,Local Purchasing Power Index,Code,Productivity: output per hour worked,Depressive,Schizophrenia,Bipolar,Eating_Disorders,Anxiety,SERIES,Series,Unit,Value
0,2019,Switzerland,121.2,50.2,87.1,120.8,123.1,129.7,CHE,82.918655,635.31090,178.67760,202.48970,107.115810,650.54030,CPNCU,Current prices in NCU,Swiss Franc,88703.0
1,2019,Switzerland,121.2,50.2,87.1,120.8,123.1,129.7,CHE,82.918655,635.31090,178.67760,202.48970,107.115810,650.54030,CNPNCU,2022 constant prices and NCU,Swiss Franc,90634.0
2,2019,Switzerland,121.2,50.2,87.1,120.8,123.1,129.7,CHE,82.918655,635.31090,178.67760,202.48970,107.115810,650.54030,USDPPP,In 2022 constant prices at 2022 USD PPPs,US Dollar,71189.0
3,2019,Iceland,101.9,48.3,76.1,92.1,111.8,91.8,ISL,64.244220,485.75662,177.96698,195.80510,96.615060,489.77250,CPNCU,Current prices in NCU,Iceland Krona,10979069.0
4,2019,Iceland,101.9,48.3,76.1,92.1,111.8,91.8,ISL,64.244220,485.75662,177.96698,195.80510,96.615060,489.77250,CNPNCU,2022 constant prices and NCU,Iceland Krona,12652289.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1057,2009,Mexico,54.2,22.4,41.6,40.0,48.7,42.9,MEX,19.709696,610.59094,181.27681,198.06451,50.877518,352.22714,CNPNCU,2022 constant prices and NCU,Mexican Peso,224418.0
1058,2009,Mexico,54.2,22.4,41.6,40.0,48.7,42.9,MEX,19.709696,610.59094,181.27681,198.06451,50.877518,352.22714,USDPPP,In 2022 constant prices at 2022 USD PPPs,US Dollar,18806.0
1059,2009,Poland,53.5,34.8,46.1,40.7,44.6,26.9,POL,25.316256,317.68677,187.14354,119.24043,35.298910,298.42096,CPNCU,Current prices in NCU,Zloty,38459.0
1060,2009,Poland,53.5,34.8,46.1,40.7,44.6,26.9,POL,25.316256,317.68677,187.14354,119.24043,35.298910,298.42096,CNPNCU,2022 constant prices and NCU,Zloty,55418.0


## MERGE THE DATASETS USING PYSPARK AND WRITING ON A PARQUET FILE

In [718]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import os
import shutil

start_time = time.time()

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Merge Datasets") \
    .getOrCreate()

# Convert Pandas DataFrames to Spark DataFrames
dfSalary_spark = spark.createDataFrame(dfSalary)
dfCost_spark = spark.createDataFrame(dfCost)
dfMental_spark = spark.createDataFrame(dfMental)
dfLabor_spark = spark.createDataFrame(dfLabor)

# Merge the datasets
merged_df_spark = dfSalary_spark.join(dfCost_spark, ['Year', 'Entity'], 'inner') \
    .join(dfMental_spark, ['Year', 'Entity'], 'inner') \
    .join(dfLabor_spark, ['Year', 'Entity'], 'inner')

# Print the schema of the merged DataFrame
print(merged_df_spark.schema)

# Print the column names of the merged DataFrame
print(merged_df_spark.columns)

if 'SERIES' in merged_df_spark.columns:
    merged_df_spark = merged_df_spark.drop('Series')

# Specify the output path
output_path = "merged_data.parquet"

# Check if the file exists, if yes, delete it
if os.path.exists(output_path):
    shutil.rmtree(output_path)

# Write the merged DataFrame to Parquet format
merged_df_spark.write.parquet(output_path)

# Stop SparkSession
spark.stop()
end_time = time.time()
elapsed_time = end_time - start_time

print("Elapsed time:", elapsed_time, "seconds")


StructType([StructField('Year', LongType(), True), StructField('Entity', StringType(), True), StructField('SERIES', StringType(), True), StructField('Series', StringType(), True), StructField('Unit', StringType(), True), StructField('Value', DoubleType(), True), StructField('Cost of Living', DoubleType(), True), StructField('Rent Index', DoubleType(), True), StructField('Cost of Living Plus Rent Index', DoubleType(), True), StructField('Groceries Index', DoubleType(), True), StructField('Restaurant Price Index', DoubleType(), True), StructField('Local Purchasing Power Index', DoubleType(), True), StructField('Depressive', DoubleType(), True), StructField('Schizophrenia', DoubleType(), True), StructField('Bipolar', DoubleType(), True), StructField('Eating_Disorders', DoubleType(), True), StructField('Anxiety', DoubleType(), True), StructField('Code', StringType(), True), StructField('Productivity: output per hour worked', DoubleType(), True)])
['Year', 'Entity', 'SERIES', 'Series', 'Uni

                                                                                

Elapsed time: 3.5865120887756348 seconds
