# ETL de datos de importación de productos

## Instalación de librerías base

In [4]:
import pandas as pd
from sqlalchemy import create_engine

## Extraction

In [5]:
engine = create_engine('postgresql+psycopg2://postgres:ressilient01@localhost/postgres')
'''
    Argumentos de la conexión a la BD
        - Motor BD:     postgresql+psycopg2
        - Usuario BD:   postgres
        - Constraseña:  ressilient01
        - Host:         localhost
        - Nombre BD:    postgres
'''
df_trades = pd.read_sql("select * from trades", engine)
df_trades.head()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items


In [6]:
df_countries = pd.read_json('src/country_data.json')
df_countries.head()

Unnamed: 0,country,images_file,image_url,alpha-2,alpha-3,country-code,iso_3166-2,region,sub-region,intermediate-region,region-code,sub-region-code,intermediate-region-code
0,Afghanistan,Flag_of_Afghanistan.svg,https://upload.wikimedia.org/wikipedia/commons...,AF,AFG,4.0,ISO 3166-2:AF,Asia,Southern Asia,,142.0,34.0,
1,Albania,Flag_of_Albania.svg,https://upload.wikimedia.org/wikipedia/commons...,AL,ALB,8.0,ISO 3166-2:AL,Europe,Southern Europe,,150.0,39.0,
2,Algeria,Flag_of_Algeria.svg,https://upload.wikimedia.org/wikipedia/commons...,DZ,DZA,12.0,ISO 3166-2:DZ,Africa,Northern Africa,,2.0,15.0,
3,Andorra,Flag_of_Andorra.svg,https://upload.wikimedia.org/wikipedia/commons...,AD,AND,20.0,ISO 3166-2:AD,Europe,Southern Europe,,150.0,39.0,
4,Angola,Flag_of_Angola.svg,https://upload.wikimedia.org/wikipedia/commons...,AO,AGO,24.0,ISO 3166-2:AO,Africa,Sub-Saharan Africa,Middle Africa,2.0,202.0,17.0


In [7]:
df_codes = pd.read_csv('src/hs_codes.csv')
df_codes.head()

Unnamed: 0,Order,Level,Code,Parent,Code_comm,Parent.1,Description_complex,Description
0,1654555,1,0,,,,This classification has been uploaded in RAMON...,This classification has been uploaded in RAMON...
1,1654556,1,10011000090,,I,,SECTION I - LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS; ANIMAL PRODUCTS
2,1654557,2,10021000090,10011000000.0,1,I,CHAPTER 1 - LIVE ANIMALS,LIVE ANIMALS
3,1654558,3,10100000080,10021000000.0,1.01,1,"Live horses, asses, mules and hinnies","Live horses, asses, mules and hinnies"
4,1654559,4,10121000010,10100000000.0,,1.01,- Horses,


In [8]:
df_parents = df_codes[df_codes['Level']==2].copy()
df_parents.head()

Unnamed: 0,Order,Level,Code,Parent,Code_comm,Parent.1,Description_complex,Description
2,1654557,2,10021000090,10011000000.0,1,I,CHAPTER 1 - LIVE ANIMALS,LIVE ANIMALS
52,1654607,2,20021000090,10011000000.0,2,I,CHAPTER 2 - MEAT AND EDIBLE MEAT OFFAL,MEAT AND EDIBLE MEAT OFFAL
140,1654695,2,30021000090,10011000000.0,3,I,"CHAPTER 3 - FISH AND CRUSTACEANS, MOLLUSCS AND...","FISH AND CRUSTACEANS, MOLLUSCS AND OTHER AQUAT..."
416,1654971,2,40021000090,10011000000.0,4,I,CHAPTER 4 - DAIRY PRODUCE; BIRDS' EGGS; NATURA...,DAIRY PRODUCE; BIRDS' EGGS; NATURAL HONEY; EDI...
463,1655018,2,50021000090,10011000000.0,5,I,"CHAPTER 5 - PRODUCTS OF ANIMAL ORIGIN, NOT ELS...","PRODUCTS OF ANIMAL ORIGIN, NOT ELSEWHERE SPECI..."


## Transform

#### Clean product codes

In [9]:
# Eliminamos las filas con nulos en Code_comm
df_codes = df_codes[df_codes['Code_comm'].notnull()]
df_codes.head()

Unnamed: 0,Order,Level,Code,Parent,Code_comm,Parent.1,Description_complex,Description
1,1654556,1,10011000090,,I,,SECTION I - LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS; ANIMAL PRODUCTS
2,1654557,2,10021000090,10011000000.0,1,I,CHAPTER 1 - LIVE ANIMALS,LIVE ANIMALS
3,1654558,3,10100000080,10021000000.0,1.01,1,"Live horses, asses, mules and hinnies","Live horses, asses, mules and hinnies"
5,1654560,5,10121000080,10121000000.0,101.21,,-- Pure-bred breeding animals,Pure-bred breeding horses
6,1654561,5,10129000080,10121000000.0,101.29,,-- Other,Live horses (excl. pure-bred for breeding)


In [10]:
# Función para limpiar el código
def clean_code(text):
    text = str(text)
    parent_code = None
    if len(text) == 11:
        code = text[:5]
        parent_code = text[:1]
    else:
        code = text[:6]
        parent_code = text[:2]
    try:
        parent = df_parents[df_parents['Code_comm']==parent_code]['Description'].values[0]
    except:
        parent = None
    return (code,parent)

In [11]:
# Expandimos, con la tupla resultado
df_codes[['clean_code','parent_description']] = df_codes.apply(lambda x : clean_code(x['Code']),axis=1, result_type='expand')
df_codes.head()

Unnamed: 0,Order,Level,Code,Parent,Code_comm,Parent.1,Description_complex,Description,clean_code,parent_description
1,1654556,1,10011000090,,I,,SECTION I - LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS; ANIMAL PRODUCTS,10011,LIVE ANIMALS
2,1654557,2,10021000090,10011000000.0,1,I,CHAPTER 1 - LIVE ANIMALS,LIVE ANIMALS,10021,LIVE ANIMALS
3,1654558,3,10100000080,10021000000.0,1.01,1,"Live horses, asses, mules and hinnies","Live horses, asses, mules and hinnies",10100,LIVE ANIMALS
5,1654560,5,10121000080,10121000000.0,101.21,,-- Pure-bred breeding animals,Pure-bred breeding horses,10121,LIVE ANIMALS
6,1654561,5,10129000080,10121000000.0,101.29,,-- Other,Live horses (excl. pure-bred for breeding),10129,LIVE ANIMALS


In [12]:
df_codes = df_codes[df_codes['clean_code'].notnull()][['clean_code','Description','parent_description']]
df_codes.head()

Unnamed: 0,clean_code,Description,parent_description
1,10011,LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS
2,10021,LIVE ANIMALS,LIVE ANIMALS
3,10100,"Live horses, asses, mules and hinnies",LIVE ANIMALS
5,10121,Pure-bred breeding horses,LIVE ANIMALS
6,10129,Live horses (excl. pure-bred for breeding),LIVE ANIMALS


In [13]:
df_codes['id_code'] = df_codes.index + 1

In [14]:
df_codes['clean_code'] = df_codes['clean_code'].astype('int64')
df_codes.head()

Unnamed: 0,clean_code,Description,parent_description,id_code
1,10011,LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS,2
2,10021,LIVE ANIMALS,LIVE ANIMALS,3
3,10100,"Live horses, asses, mules and hinnies",LIVE ANIMALS,4
5,10121,Pure-bred breeding horses,LIVE ANIMALS,6
6,10129,Live horses (excl. pure-bred for breeding),LIVE ANIMALS,7


### Clean Countries

In [15]:
df_countries = df_countries[['alpha-3','country','region','sub-region']]
df_countries.head()

Unnamed: 0,alpha-3,country,region,sub-region
0,AFG,Afghanistan,Asia,Southern Asia
1,ALB,Albania,Europe,Southern Europe
2,DZA,Algeria,Africa,Northern Africa
3,AND,Andorra,Europe,Southern Europe
4,AGO,Angola,Africa,Sub-Saharan Africa


In [16]:
df_countries = df_countries[df_countries['alpha-3'].notnull()]
df_countries.head()

Unnamed: 0,alpha-3,country,region,sub-region
0,AFG,Afghanistan,Asia,Southern Asia
1,ALB,Albania,Europe,Southern Europe
2,DZA,Algeria,Africa,Northern Africa
3,AND,Andorra,Europe,Southern Europe
4,AGO,Angola,Africa,Sub-Saharan Africa


In [17]:
df_countries['id_country'] = df_countries.index + 1
df_countries.head()

Unnamed: 0,alpha-3,country,region,sub-region,id_country
0,AFG,Afghanistan,Asia,Southern Asia,1
1,ALB,Albania,Europe,Southern Europe,2
2,DZA,Algeria,Africa,Northern Africa,3
3,AND,Andorra,Europe,Southern Europe,4
4,AGO,Angola,Africa,Sub-Saharan Africa,5


### Merge

In [18]:
df_trades_clean = df_trades.merge(df_codes[['clean_code','id_code']],how='left', left_on='comm_code',right_on='clean_code')
df_trades_clean.head()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,clean_code,id_code
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,890200,6929
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,890310,6931
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,890310,6931
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,890310,6931
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,890391,6933


In [19]:
df_trades_clean = df_trades_clean.merge(df_countries[['alpha-3','id_country']],how='left', left_on='country_code',right_on='alpha-3')
df_trades_clean.head()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,clean_code,id_code,alpha-3,id_country
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,890200,6929,SYC,155
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,890310,6931,SYC,155
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,890391,6933,SYC,155


### Clean trades

In [20]:
def create_dimension(data, id_name):
    list_keys = []
    value = 1
    for _ in data:
        list_keys.append(value)
        value += 1
    return pd.DataFrame({id_name:list_keys, 'values':data})


In [21]:
df_quantity =create_dimension(df_trades_clean['quantity_name'].unique(),'id_quantity')
df_quantity.head()

Unnamed: 0,id_quantity,values
0,1,Number of items
1,2,Weight in kilograms
2,3,No Quantity
3,4,Volume in litres
4,5,Number of pairs


In [22]:
df_flow =create_dimension(df_trades_clean['flow'].unique(),'id_flow')
df_flow.head()

Unnamed: 0,id_flow,values
0,1,Import
1,2,Export
2,3,Re-Export
3,4,Re-Import


In [23]:
df_year =create_dimension(df_trades_clean['year'].unique(),'id_year')
df_year.head()

Unnamed: 0,id_year,values
0,1,1998
1,2,1997
2,3,1996
3,4,1995
4,5,1994


In [24]:
df_trades_clean = df_trades_clean.merge(df_quantity, how='left',left_on='quantity_name', right_on='values')
df_trades_clean.head()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,clean_code,id_code,alpha-3,id_country,id_quantity,values
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,890200,6929,SYC,155,1,Number of items
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,890310,6931,SYC,155,1,Number of items
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155,1,Number of items
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155,1,Number of items
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,890391,6933,SYC,155,1,Number of items


In [25]:
df_trades_clean = df_trades_clean.merge(df_flow, how='left',left_on='flow', right_on='values')
df_trades_clean.head()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,clean_code,id_code,alpha-3,id_country,id_quantity,values_x,id_flow,values_y
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,890200,6929,SYC,155,1,Number of items,1,Import
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,890310,6931,SYC,155,1,Number of items,1,Import
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155,1,Number of items,2,Export
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155,1,Number of items,3,Re-Export
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,890391,6933,SYC,155,1,Number of items,1,Import


In [26]:
df_trades_clean = df_trades_clean.merge(df_year, how='left',left_on='year', right_on='values')
df_trades_clean.head()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,clean_code,id_code,alpha-3,id_country,id_quantity,values_x,id_flow,values_y,id_year,values
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,890200,6929,SYC,155,1,Number of items,1,Import,1,1998
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,890310,6931,SYC,155,1,Number of items,1,Import,1,1998
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155,1,Number of items,2,Export,1,1998
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155,1,Number of items,3,Re-Export,1,1998
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,890391,6933,SYC,155,1,Number of items,1,Import,1,1998


In [27]:
df_trades_clean['id_trades'] = df_trades_clean.index + 1
df_trades_clean.head()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,clean_code,id_code,alpha-3,id_country,id_quantity,values_x,id_flow,values_y,id_year,values,id_trades
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,890200,6929,SYC,155,1,Number of items,1,Import,1,1998,1
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,890310,6931,SYC,155,1,Number of items,1,Import,1,1998,2
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155,1,Number of items,2,Export,1,1998,3
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155,1,Number of items,3,Re-Export,1,1998,4
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,890391,6933,SYC,155,1,Number of items,1,Import,1,1998,5


In [28]:
df_trades_final = df_trades_clean[['id_trades','trade_usd','kg','quantity','id_code','id_country','id_quantity','id_flow','id_year']].copy()
df_trades_final.head()

Unnamed: 0,id_trades,trade_usd,kg,quantity,id_code,id_country,id_quantity,id_flow,id_year
0,1,1431426.0,0.0,23000.0,6929,155,1,1,1
1,2,31406.0,0.0,2545.0,6931,155,1,1,1
2,3,950.0,0.0,300.0,6931,155,1,2,1
3,4,950.0,0.0,300.0,6931,155,1,3,1
4,5,18251.0,0.0,450.0,6933,155,1,1,1


In [29]:
df_countries = df_countries[['id_country','alpha-3','country','region','sub-region']]
df_countries.head()

Unnamed: 0,id_country,alpha-3,country,region,sub-region
0,1,AFG,Afghanistan,Asia,Southern Asia
1,2,ALB,Albania,Europe,Southern Europe
2,3,DZA,Algeria,Africa,Northern Africa
3,4,AND,Andorra,Europe,Southern Europe
4,5,AGO,Angola,Africa,Sub-Saharan Africa


In [30]:
df_codes = df_codes[['id_code','clean_code','Description','parent_description']]
df_codes.head()

Unnamed: 0,id_code,clean_code,Description,parent_description
1,2,10011,LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS
2,3,10021,LIVE ANIMALS,LIVE ANIMALS
3,4,10100,"Live horses, asses, mules and hinnies",LIVE ANIMALS
5,6,10121,Pure-bred breeding horses,LIVE ANIMALS
6,7,10129,Live horses (excl. pure-bred for breeding),LIVE ANIMALS


## Load

In [32]:
df_trades_final.to_csv('target/trades.csv',index=False, sep='|')
df_countries.to_csv('target/countries.csv',index=False, sep='|')
df_codes.to_csv('target/codes.csv',index=False, sep='|')
df_quantity.to_csv('target/quantity.csv',index=False, sep='|')
df_flow.to_csv('target/flow.csv',index=False, sep='|')
df_year.to_csv('target/years.csv',index=False, sep='|')

In [None]:
import os
import boto3
import redshift_connector

client = boto3.client(
    's3',
    aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
    aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'),
)

conn = redshift_connector.connect(
    host= os.environ.get('redhsift_host'),
    database=os.environ.get('redshift_database'),
    port=5439,
    user=os.environ.get('redshift_user'),
    password=os.environ.get('redshift_pass')
)
cursor = conn.cursor()

In [None]:
def load_file(file_name):
    table_name = file_name.split('.')[0]
    client.upload_file(
        Filename="target/{}".format(file_name),
        Bucket="platzi-etl",
        Key="course_etl_target/{}".format(file_name),
    )

    sentence = '''copy etl_test.{} from 's3://platzi-etl/course_etl_target/{}' credentials 'aws_access_key_id={};aws_secret_access_key={}' csv delimiter '|' region 'us-west-2' ignoreheader 1'''.format(
        table_name, file_name, os.environ.get('AWS_ACCESS_KEY_ID'), os.environ.get('AWS_SECRET_ACCESS_KEY')
    )
    try:
        cursor.execute(sentence)
        print('ok tabla '+ table_name)
    except:
        print('error en la tabla '+table_name)

In [None]:
list_files = os.listdir('target/')
for _ in list_files:
    load_file(_)

In [None]:
conn.commit()

In [None]:
conn.close()

## Nuevo sistema con Postgre SQL local

In [31]:
import sys
import psycopg2
import numpy as np
import psycopg2.extras as extras

In [32]:
# Modificación de los df para que sean totalmente compatibles con las tablas
df_trades=df_trades_final
df_quantity.rename(columns={'values':'quantity'},inplace=True)
df_codes.rename(columns={'clean_code':'code','Desciption':'description'},inplace=True)
df_countries.rename(columns={'alpha-3':'alpha_3','sub-region':'sub_region'},inplace=True)
df_flow.rename(columns={'values':'flow'},inplace=True)
df_years=df_year
df_years.rename(columns={'values':'year'},inplace=True)
# int => int64 no existe en Postgre, pero float64 se puede convertir a real y funciona bien
df_years = df_years.astype({"id_year":"int","year":"float64"})

In [47]:
conn = psycopg2.connect(
    database="datawarehouse", user='postgres', password='ressilient01', host='127.0.0.1', port='5433'
)

In [48]:
def execute_values(conn, df, table):

    tuples = [tuple(x) for x in df.to_numpy()]

    cols = ','.join(list(df.columns))
    # Query SQL a ejecutar
    query = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)
    cursor = conn.cursor()
    try:
        extras.execute_values(cursor, query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("Se ha cargado la tabla", table)
    cursor.close()

In [None]:
execute_values(conn, df_trades, 'trades')
execute_values(conn, df_countries, 'countries')
execute_values(conn, df_codes, 'codes')
execute_values(conn, df_quantity, 'quantity')
execute_values(conn, df_flow, 'flow')
execute_values(conn, df_years, 'years')         # si no hubiésemos convertido el tipo habría dado error

In [50]:
conn.close()