ETL de datos de importación de productos

Instalación de librerías base

In [1]:
import pandas as pd
from sqlalchemy import create_engine

In [2]:
import os
postgres_key = os.environ.get('POSTGRES_PASSWORD')

In [3]:
url_postgres = f"postgresql+psycopg2://postgres:{postgres_key}@localhost:5432/postgres"

In [4]:
engine = create_engine(url_postgres)
df_trades = pd.read_sql_query('SELECT * FROM trades', engine)

In [5]:
df_countries = pd.read_json('scr/country-data.json')

In [6]:
df_codes = pd.read_csv('scr/hs_codes.csv')

In [7]:
df_parents = df_codes[df_codes['Level']==2].copy()

Transform

Clean Codes

In [8]:
df_codes = df_codes[df_codes['Code_comm'].notnull()]

In [9]:
def clean_code(text):
    text = str(text)
    parent_code = None
    if len(text) == 11:
        code = text[:5]
        parent_code = text[:1]
    else:
        code = text[:6]
        parent_code = text[:2]
    try:
        parent = df_parents[df_parents['Code_comm']==parent_code]['Description'].values[0]
    except:
        parent = None
    return (code, parent)

In [10]:
df_codes[['clean_code', 'parent_description']] = df_codes.apply(lambda x: clean_code(x['Code']), axis=1, result_type='expand')

In [11]:
df_codes = df_codes[df_codes['clean_code'].notnull()][['clean_code','Description','parent_description']]

In [12]:
df_codes['id_code'] = df_codes.index + 1 

In [13]:
df_codes['clean_code'] = df_codes['clean_code'].astype('int64')

clear countries

In [14]:
df_countries = df_countries[['alpha-3','country','region','sub-region']]

In [15]:
df_countries = df_countries[df_countries['alpha-3'].notnull()]

In [16]:
df_countries['id_country'] = df_countries.index + 1 

Merge

In [17]:
df_trades_clean = df_trades.merge(df_codes[['clean_code','id_code']], how='left', left_on='comm_code', right_on='clean_code')

In [18]:
df_trades_clean = df_trades_clean.merge(df_countries[['alpha-3','id_country']], how='left', left_on='country_code', right_on='alpha-3')

In [19]:
df_trades_clean.head()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,clean_code,id_code,alpha-3,id_country
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,890200,6929,SYC,155
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,890310,6931,SYC,155
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,890391,6933,SYC,155


In [20]:
df_trades.head()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items


In [21]:
df_codes.head()

Unnamed: 0,clean_code,Description,parent_description,id_code
1,10011,LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS,2
2,10021,LIVE ANIMALS,LIVE ANIMALS,3
3,10100,"Live horses, asses, mules and hinnies",LIVE ANIMALS,4
5,10121,Pure-bred breeding horses,LIVE ANIMALS,6
6,10129,Live horses (excl. pure-bred for breeding),LIVE ANIMALS,7


In [22]:
df_countries.head()

Unnamed: 0,alpha-3,country,region,sub-region,id_country
0,AFG,Afghanistan,Asia,Southern Asia,1
1,ALB,Albania,Europe,Southern Europe,2
2,DZA,Algeria,Africa,Northern Africa,3
3,AND,Andorra,Europe,Southern Europe,4
4,AGO,Angola,Africa,Sub-Saharan Africa,5


Clean Trades

In [23]:
def create_dimension(data, id_name):
    list_keys = []
    value = 1
    for _ in data:
        list_keys.append(value)
        value+=1
    return pd.DataFrame({id_name: list_keys, 'values':data})

In [24]:
df_quantity = create_dimension(df_trades_clean['quantity_name'].unique(), 'id_quantity')
df_flow = create_dimension(df_trades_clean['flow'].unique(), 'id_flow')
df_year = create_dimension(df_trades_clean['year'].unique(), 'id_year')

In [25]:
df_quantity.head()

Unnamed: 0,id_quantity,values
0,1,Number of items
1,2,Weight in kilograms
2,3,No Quantity
3,4,Volume in litres
4,5,Number of pairs


In [26]:
df_flow.head()

Unnamed: 0,id_flow,values
0,1,Import
1,2,Export
2,3,Re-Export
3,4,Re-Import


In [27]:
df_year.head()

Unnamed: 0,id_year,values
0,1,1998
1,2,1997
2,3,1996
3,4,1995
4,5,1994


In [28]:
df_trades_clean = df_trades_clean.merge(df_quantity, how='left', left_on='quantity_name', right_on='values')
df_trades_clean = df_trades_clean.merge(df_flow, how='left', left_on='flow', right_on='values')
df_trades_clean = df_trades_clean.merge(df_year, how='left', left_on='year', right_on='values')

In [29]:
df_trades_clean.head()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,clean_code,id_code,alpha-3,id_country,id_quantity,values_x,id_flow,values_y,id_year,values
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,890200,6929,SYC,155,1,Number of items,1,Import,1,1998
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,890310,6931,SYC,155,1,Number of items,1,Import,1,1998
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155,1,Number of items,2,Export,1,1998
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155,1,Number of items,3,Re-Export,1,1998
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,890391,6933,SYC,155,1,Number of items,1,Import,1,1998


In [30]:
df_trades_clean['id_trades'] = df_trades_clean.index + 1

In [31]:
df_trades_final = df_trades_clean[['id_trades','trade_usd','kg','quantity','id_code','id_country','id_quantity','id_flow','id_year']].copy()

In [32]:
df_trades_final.head()

Unnamed: 0,id_trades,trade_usd,kg,quantity,id_code,id_country,id_quantity,id_flow,id_year
0,1,1431426.0,0.0,23000.0,6929,155,1,1,1
1,2,31406.0,0.0,2545.0,6931,155,1,1,1
2,3,950.0,0.0,300.0,6931,155,1,2,1
3,4,950.0,0.0,300.0,6931,155,1,3,1
4,5,18251.0,0.0,450.0,6933,155,1,1,1


In [33]:
df_countries = df_countries[['id_country','alpha-3','country','region','sub-region']]
df_countries.head()

Unnamed: 0,id_country,alpha-3,country,region,sub-region
0,1,AFG,Afghanistan,Asia,Southern Asia
1,2,ALB,Albania,Europe,Southern Europe
2,3,DZA,Algeria,Africa,Northern Africa
3,4,AND,Andorra,Europe,Southern Europe
4,5,AGO,Angola,Africa,Sub-Saharan Africa


In [34]:
df_codes = df_codes[['id_code','clean_code','Description','parent_description']]
df_codes.head()

Unnamed: 0,id_code,clean_code,Description,parent_description
1,2,10011,LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS
2,3,10021,LIVE ANIMALS,LIVE ANIMALS
3,4,10100,"Live horses, asses, mules and hinnies",LIVE ANIMALS
5,6,10121,Pure-bred breeding horses,LIVE ANIMALS
6,7,10129,Live horses (excl. pure-bred for breeding),LIVE ANIMALS


Load

In [35]:
df_codes.to_sql('codes', engine, if_exists='replace', index=False)
df_countries.to_sql('countries', engine, if_exists='replace', index=False)
df_year.to_sql('year', engine, if_exists='replace', index=False)
df_flow.to_sql('flow', engine, if_exists='replace', index=False)
df_quantity.to_sql('quantity', engine, if_exists='replace', index=False)
df_trades_final.to_sql('trades_processed', engine, if_exists='replace', index=False)

353