# ETL de importacion de productos

In [2]:
import os
from typing import Tuple

import pandas as pd
import numpy as np
from sqlalchemy import create_engine

## Extraccion

In [3]:
engine = create_engine('postgresql+psycopg2://postgres:Qazxs1234@localhost:5432/postgres')

In [4]:
df_trades = pd.read_sql("select * from trades", engine)

In [5]:
df_countries = pd.read_json('src/country_data.json')

In [6]:
df_codes = pd.read_csv('src/hs_codes.csv')

In [7]:
df_parents = df_codes[df_codes['Level'] == 2].copy()

In [8]:
df_parents.head()

Unnamed: 0,Order,Level,Code,Parent,Code_comm,Parent.1,Description_complex,Description
2,1654557,2,10021000090,10011000000.0,1,I,CHAPTER 1 - LIVE ANIMALS,LIVE ANIMALS
52,1654607,2,20021000090,10011000000.0,2,I,CHAPTER 2 - MEAT AND EDIBLE MEAT OFFAL,MEAT AND EDIBLE MEAT OFFAL
140,1654695,2,30021000090,10011000000.0,3,I,"CHAPTER 3 - FISH AND CRUSTACEANS, MOLLUSCS AND...","FISH AND CRUSTACEANS, MOLLUSCS AND OTHER AQUAT..."
416,1654971,2,40021000090,10011000000.0,4,I,CHAPTER 4 - DAIRY PRODUCE; BIRDS' EGGS; NATURA...,DAIRY PRODUCE; BIRDS' EGGS; NATURAL HONEY; EDI...
463,1655018,2,50021000090,10011000000.0,5,I,"CHAPTER 5 - PRODUCTS OF ANIMAL ORIGIN, NOT ELS...","PRODUCTS OF ANIMAL ORIGIN, NOT ELSEWHERE SPECI..."


## Transformacion

### Limpieza de codigos

In [9]:
df_codes.head()

Unnamed: 0,Order,Level,Code,Parent,Code_comm,Parent.1,Description_complex,Description
0,1654555,1,0,,,,This classification has been uploaded in RAMON...,This classification has been uploaded in RAMON...
1,1654556,1,10011000090,,I,,SECTION I - LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS; ANIMAL PRODUCTS
2,1654557,2,10021000090,10011000000.0,1,I,CHAPTER 1 - LIVE ANIMALS,LIVE ANIMALS
3,1654558,3,10100000080,10021000000.0,1.01,1,"Live horses, asses, mules and hinnies","Live horses, asses, mules and hinnies"
4,1654559,4,10121000010,10100000000.0,,1.01,- Horses,


In [10]:
df_codes = df_codes[df_codes['Code_comm'].notnull()]

In [11]:
df_codes.head()

Unnamed: 0,Order,Level,Code,Parent,Code_comm,Parent.1,Description_complex,Description
1,1654556,1,10011000090,,I,,SECTION I - LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS; ANIMAL PRODUCTS
2,1654557,2,10021000090,10011000000.0,1,I,CHAPTER 1 - LIVE ANIMALS,LIVE ANIMALS
3,1654558,3,10100000080,10021000000.0,1.01,1,"Live horses, asses, mules and hinnies","Live horses, asses, mules and hinnies"
5,1654560,5,10121000080,10121000000.0,101.21,,-- Pure-bred breeding animals,Pure-bred breeding horses
6,1654561,5,10129000080,10121000000.0,101.29,,-- Other,Live horses (excl. pure-bred for breeding)


In [12]:
def clean_code(text:str) -> Tuple[str, str]:
    
    text = str(text)
    parent_code = None
    if len(text) == 11:
        code = text[:5]
        parent_code = text[:1]
    else:
        code = text[:6]
        parent_code = text[:2]
    
    try:
        parent = df_parents[df_parents['Code_comm'] == parent_code]['Description'].values[0]
    except:
        parent = None

    return (code, parent)

In [13]:
df_codes[['clean_code', 'parent_description']] = df_codes.apply(lambda x: clean_code(x['Code']), axis=1, result_type='expand')

In [14]:
df_codes = df_codes[df_codes['clean_code'].notnull()][['clean_code', 'Description', 'parent_description']]
df_codes.head()

Unnamed: 0,clean_code,Description,parent_description
1,10011,LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS
2,10021,LIVE ANIMALS,LIVE ANIMALS
3,10100,"Live horses, asses, mules and hinnies",LIVE ANIMALS
5,10121,Pure-bred breeding horses,LIVE ANIMALS
6,10129,Live horses (excl. pure-bred for breeding),LIVE ANIMALS


In [15]:
df_codes['id_code'] = df_codes.index + 1
df_codes['clean_code'] = df_codes['clean_code'].astype('int64')
df_codes.head()

Unnamed: 0,clean_code,Description,parent_description,id_code
1,10011,LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS,2
2,10021,LIVE ANIMALS,LIVE ANIMALS,3
3,10100,"Live horses, asses, mules and hinnies",LIVE ANIMALS,4
5,10121,Pure-bred breeding horses,LIVE ANIMALS,6
6,10129,Live horses (excl. pure-bred for breeding),LIVE ANIMALS,7


### Limpieza de paises

In [16]:
df_countries.head()

Unnamed: 0,country,images_file,image_url,alpha-2,alpha-3,country-code,iso_3166-2,region,sub-region,intermediate-region,region-code,sub-region-code,intermediate-region-code
0,Afghanistan,Flag_of_Afghanistan.svg,https://upload.wikimedia.org/wikipedia/commons...,AF,AFG,4.0,ISO 3166-2:AF,Asia,Southern Asia,,142.0,34.0,
1,Albania,Flag_of_Albania.svg,https://upload.wikimedia.org/wikipedia/commons...,AL,ALB,8.0,ISO 3166-2:AL,Europe,Southern Europe,,150.0,39.0,
2,Algeria,Flag_of_Algeria.svg,https://upload.wikimedia.org/wikipedia/commons...,DZ,DZA,12.0,ISO 3166-2:DZ,Africa,Northern Africa,,2.0,15.0,
3,Andorra,Flag_of_Andorra.svg,https://upload.wikimedia.org/wikipedia/commons...,AD,AND,20.0,ISO 3166-2:AD,Europe,Southern Europe,,150.0,39.0,
4,Angola,Flag_of_Angola.svg,https://upload.wikimedia.org/wikipedia/commons...,AO,AGO,24.0,ISO 3166-2:AO,Africa,Sub-Saharan Africa,Middle Africa,2.0,202.0,17.0


In [17]:
df_countries = df_countries[['alpha-3', 'country', 'region', 'sub-region']]
df_countries = df_countries[df_countries['alpha-3'].notnull()]
df_countries['id_country'] = df_countries.index + 1
df_countries.head()

Unnamed: 0,alpha-3,country,region,sub-region,id_country
0,AFG,Afghanistan,Asia,Southern Asia,1
1,ALB,Albania,Europe,Southern Europe,2
2,DZA,Algeria,Africa,Northern Africa,3
3,AND,Andorra,Europe,Southern Europe,4
4,AGO,Angola,Africa,Sub-Saharan Africa,5


In [18]:
df_trades_clean = df_trades.merge(df_codes[['clean_code', 'id_code']], how = 'left', left_on = 'comm_code', right_on='clean_code')

In [19]:
df_trades_clean.head()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,clean_code,id_code
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,890200,6929
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,890310,6931
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,890310,6931
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,890310,6931
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,890391,6933


In [20]:
df_trades_clean = df_trades_clean.merge(df_countries[['alpha-3', 'id_country']], how = 'left', left_on = 'country_code', right_on='alpha-3')

In [21]:
df_trades_clean.head()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,clean_code,id_code,alpha-3,id_country
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,890200,6929,SYC,155
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,890310,6931,SYC,155
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,890391,6933,SYC,155


### Limpieza trade

In [22]:
def create_dimension(data:np.ndarray | pd.DataFrame, id_name:str) -> pd.DataFrame:
    list_keys = []
    value = 1
    for _ in data:
        list_keys.append(value)
        value = value+1
    
    return pd.DataFrame({id_name:list_keys, 'values':data})

In [23]:
df_quantity = create_dimension(df_trades_clean['quantity_name'].unique(), 'id_quantity')
df_flow = create_dimension(df_trades_clean['flow'].unique(), 'id_flow')
df_year = create_dimension(df_trades_clean['year'].unique(), 'id_year')

In [24]:
df_trades_clean = df_trades_clean.merge(df_quantity, how = 'left', left_on = 'quantity_name', right_on='values')

df_trades_clean = df_trades_clean.merge(df_flow, how = 'left', left_on = 'flow', right_on='values')

df_trades_clean = df_trades_clean.merge(df_year, how = 'left', left_on = 'year', right_on='values')

In [25]:
df_trades_clean['id_trades'] = df_trades_clean.index + 1

In [26]:
df_trades_final = df_trades_clean[['id_trades', 'trade_usd', 'kg', 'quantity', 'id_code', 'id_country', 'id_quantity', 'id_flow', 'id_year']].copy()

In [27]:
df_trades_final.head()

Unnamed: 0,id_trades,trade_usd,kg,quantity,id_code,id_country,id_quantity,id_flow,id_year
0,1,1431426.0,0.0,23000.0,6929,155,1,1,1
1,2,31406.0,0.0,2545.0,6931,155,1,1,1
2,3,950.0,0.0,300.0,6931,155,1,2,1
3,4,950.0,0.0,300.0,6931,155,1,3,1
4,5,18251.0,0.0,450.0,6933,155,1,1,1


In [28]:
df_countries = df_countries[['id_country', 'alpha-3', 'country', 'region', 'sub-region']]

In [29]:
df_codes = df_codes[['id_code', 'clean_code', 'Description', 'parent_description']]

In [30]:
## Borramos los datasets que no necesitamos. Esto solo se puede hacer una vez, para limpiar de memoria los archivos
del(df_trades)
del(df_trades_clean)

## Carga

### Transformacion en CSV (Separados por pipes en realidad)

In [51]:
df_trades_final.to_csv('./target/trades.csv', index=False, sep='|')
df_countries.to_csv('./target/countries.csv', index=False, sep='|')
df_codes.to_csv('./target/codes.csv', index=False, sep='|')
df_quantity.to_csv('./target/quantity.csv', index=False, sep='|')
df_flow.to_csv('./target/flow.csv', index=False, sep='|')
df_year.to_csv('./target/years.csv', index=False, sep='|')

In [67]:
import os
import boto3
import redshift_connector

client = boto3.client(
    's3',
    aws_access_key_id = os.environ.get('AWS_ACCES_KEY_ID'),
    aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
)

conn = redshift_connector.connect(
    host=os.environ.get('AWS_RDS_HOST'),
    database=os.environ.get('AWS_RDS_DB'),
    port=5439,
    user=os.environ.get('AWS_RDS_USER'),
    password=os.environ.get('AWS_RDS_PWD')
)

In [68]:
cursor = conn.cursor()

In [69]:
def load_file(file_name:str):

    table_name = file_name.split('.')[0]
    client.upload_file(
        Filename=f'./target/{file_name}',
        Bucket='bi-bucket-prueba-fdup',
        Key=f'course_etl_target/{file_name}'
    )

    sentence = f'''copy "etl-fundamentos-fdup"."public"."{table_name}" from 's3://bi-bucket-prueba-fdup/course_etl_target/{file_name}' iam_role default csv delimiter '|' region 'us-east-2' ignoreheader 1'''

    try:
        cursor.execute(sentence)
        print('Tabla cargada exitosamente! '+table_name)
    except:
        print('Error en la tabla '+table_name)

In [65]:
list_files = os.listdir('./target/')
for _ in list_files:
    load_file(_)

Tabla cargada exitosamente!
Tabla cargada exitosamente!
Tabla cargada exitosamente!
Tabla cargada exitosamente!
Tabla cargada exitosamente!
Tabla cargada exitosamente!


In [71]:
conn.commit()
conn.close()