# ETL de datos de importación de productos

## Instalación de librerías base

In [1]:
import pandas as pd
from boto3 import client
from sqlalchemy import create_engine

## Extraction

In [4]:
engine = create_engine('postgresql+psycopg2://postgres:Gabrielmaxemin@localhost/postgres')
df_trades = pd.read_sql("select * from trades", engine)

In [3]:
df_countries = pd.read_json('src/country_data.json')

In [5]:
df_codes = pd.read_csv('src/hs_codes.csv')

In [9]:
df_parents = df_codes[df_codes['Level'] == 2].copy()

In [10]:
df_parents

Unnamed: 0,Order,Level,Code,Parent,Code_comm,Parent.1,Description_complex,Description
2,1654557,2,10021000090,1.001100e+10,1,I,CHAPTER 1 - LIVE ANIMALS,LIVE ANIMALS
52,1654607,2,20021000090,1.001100e+10,2,I,CHAPTER 2 - MEAT AND EDIBLE MEAT OFFAL,MEAT AND EDIBLE MEAT OFFAL
140,1654695,2,30021000090,1.001100e+10,3,I,"CHAPTER 3 - FISH AND CRUSTACEANS, MOLLUSCS AND...","FISH AND CRUSTACEANS, MOLLUSCS AND OTHER AQUAT..."
416,1654971,2,40021000090,1.001100e+10,4,I,CHAPTER 4 - DAIRY PRODUCE; BIRDS' EGGS; NATURA...,DAIRY PRODUCE; BIRDS' EGGS; NATURAL HONEY; EDI...
463,1655018,2,50021000090,1.001100e+10,5,I,"CHAPTER 5 - PRODUCTS OF ANIMAL ORIGIN, NOT ELS...","PRODUCTS OF ANIMAL ORIGIN, NOT ELSEWHERE SPECI..."
...,...,...,...,...,...,...,...,...
7238,1661793,2,930021000090,9.300110e+11,93,XIX,CHAPTER 93 - ARMS AND AMMUNITION; PARTS AND AC...,ARMS AND AMMUNITION; PARTS AND ACCESSORIES THE...
7264,1661819,2,940021000090,9.400110e+11,94,XX,"CHAPTER 94 - FURNITURE; BEDDING, MATTRESSES, M...","FURNITURE; BEDDING, MATTRESSES, MATTRESS SUPPO..."
7319,1661874,2,950021000090,9.400110e+11,95,XX,"CHAPTER 95 - TOYS, GAMES AND SPORTS REQUISITES...","TOYS, GAMES AND SPORTS REQUISITES; PARTS AND A..."
7362,1661917,2,960021000090,9.400110e+11,96,XX,CHAPTER 96 - MISCELLANEOUS MANUFACTURED ARTICLES,MISCELLANEOUS MANUFACTURED ARTICLES


## Transform

#### Clean codes

In [13]:
df_codes = df_codes[df_codes['Code_comm'].notnull()]

In [14]:
df_codes

Unnamed: 0,Order,Level,Code,Parent,Code_comm,Parent.1,Description_complex,Description
1,1654556,1,10011000090,,I,,SECTION I - LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS; ANIMAL PRODUCTS
2,1654557,2,10021000090,1.001100e+10,1,I,CHAPTER 1 - LIVE ANIMALS,LIVE ANIMALS
3,1654558,3,10100000080,1.002100e+10,1.01,1,"Live horses, asses, mules and hinnies","Live horses, asses, mules and hinnies"
5,1654560,5,10121000080,1.012100e+10,101.21,,-- Pure-bred breeding animals,Pure-bred breeding horses
6,1654561,5,10129000080,1.012100e+10,101.29,,-- Other,Live horses (excl. pure-bred for breeding)
...,...,...,...,...,...,...,...,...
7432,1661987,3,970200000080,9.700210e+11,9702,97,"Original engravings, prints and lithographs","Original engravings, prints and lithographs"
7433,1661988,3,970300000080,9.700210e+11,9703,97,"Original sculptures and statuary, in any material","Original sculptures and statuary, in any material"
7434,1661989,3,970400000080,9.700210e+11,9704,97,"Postage or revenue stamps, stamp-postmarks, fi...","Postage or revenue stamps, stamp-postmarks, fi..."
7435,1661990,3,970500000080,9.700210e+11,9705,97,Collections and collectors' pieces of zoologic...,Collections and collector's pieces of zoologic...


In [15]:
def clean_code(text):
    text = str(text)
    parent_code = None
    if len(text) == 11:
        code = text[:5]
        parent_code = text[:1]
    else:
        code = text[:6]
        parent_code = text[:2]
    try:
        parent = df_parents[df_parents['Code_comm'] == parent_code]['Description'].values[0]
    except:
        parent = None
    return (code, parent)


In [16]:
df_codes[['clean_code','parent_description']] = df_codes.apply(lambda x : clean_code(x['Code']), axis=1, result_type='expand')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_codes[['clean_code','parent_description']] = df_codes.apply(lambda x : clean_code(x['Code']), axis=1, result_type='expand')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_codes[['clean_code','parent_description']] = df_codes.apply(lambda x : clean_code(x['Code']), axis=1, result_type='expand')


In [18]:
df_codes[df_codes['clean_code'].notnull()][['clean_code', 'Description', 'parent_description']]

Unnamed: 0,clean_code,Description,parent_description
1,10011,LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS
2,10021,LIVE ANIMALS,LIVE ANIMALS
3,10100,"Live horses, asses, mules and hinnies",LIVE ANIMALS
5,10121,Pure-bred breeding horses,LIVE ANIMALS
6,10129,Live horses (excl. pure-bred for breeding),LIVE ANIMALS
...,...,...,...
7432,970200,"Original engravings, prints and lithographs","WORKS OF ART, COLLECTORS' PIECES AND ANTIQUES"
7433,970300,"Original sculptures and statuary, in any material","WORKS OF ART, COLLECTORS' PIECES AND ANTIQUES"
7434,970400,"Postage or revenue stamps, stamp-postmarks, fi...","WORKS OF ART, COLLECTORS' PIECES AND ANTIQUES"
7435,970500,Collections and collector's pieces of zoologic...,"WORKS OF ART, COLLECTORS' PIECES AND ANTIQUES"


In [20]:
df_codes['id_code'] = df_codes.index + 1

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_codes['id_code'] = df_codes.index + 1


In [21]:
df_codes['clean_code'] =  df_codes['clean_code'].astype('int64')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_codes['clean_code'] =  df_codes['clean_code'].astype('int64')


In [22]:
df_codes

Unnamed: 0,Order,Level,Code,Parent,Code_comm,Parent.1,Description_complex,Description,clean_code,parent_description,id_code
1,1654556,1,10011000090,,I,,SECTION I - LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS; ANIMAL PRODUCTS,10011,LIVE ANIMALS,2
2,1654557,2,10021000090,1.001100e+10,1,I,CHAPTER 1 - LIVE ANIMALS,LIVE ANIMALS,10021,LIVE ANIMALS,3
3,1654558,3,10100000080,1.002100e+10,1.01,1,"Live horses, asses, mules and hinnies","Live horses, asses, mules and hinnies",10100,LIVE ANIMALS,4
5,1654560,5,10121000080,1.012100e+10,101.21,,-- Pure-bred breeding animals,Pure-bred breeding horses,10121,LIVE ANIMALS,6
6,1654561,5,10129000080,1.012100e+10,101.29,,-- Other,Live horses (excl. pure-bred for breeding),10129,LIVE ANIMALS,7
...,...,...,...,...,...,...,...,...,...,...,...
7432,1661987,3,970200000080,9.700210e+11,9702,97,"Original engravings, prints and lithographs","Original engravings, prints and lithographs",970200,"WORKS OF ART, COLLECTORS' PIECES AND ANTIQUES",7433
7433,1661988,3,970300000080,9.700210e+11,9703,97,"Original sculptures and statuary, in any material","Original sculptures and statuary, in any material",970300,"WORKS OF ART, COLLECTORS' PIECES AND ANTIQUES",7434
7434,1661989,3,970400000080,9.700210e+11,9704,97,"Postage or revenue stamps, stamp-postmarks, fi...","Postage or revenue stamps, stamp-postmarks, fi...",970400,"WORKS OF ART, COLLECTORS' PIECES AND ANTIQUES",7435
7435,1661990,3,970500000080,9.700210e+11,9705,97,Collections and collectors' pieces of zoologic...,Collections and collector's pieces of zoologic...,970500,"WORKS OF ART, COLLECTORS' PIECES AND ANTIQUES",7436


### Clean Countries

In [24]:
df_countries = df_countries[['alpha-3', 'country', 'region', 'sub-region']]

In [25]:
df_countries = df_countries[df_countries['alpha-3'].notnull()]

In [26]:
df_countries['id_country'] = df_countries.index + 1

In [27]:
df_countries

Unnamed: 0,alpha-3,country,region,sub-region,id_country
0,AFG,Afghanistan,Asia,Southern Asia,1
1,ALB,Albania,Europe,Southern Europe,2
2,DZA,Algeria,Africa,Northern Africa,3
3,AND,Andorra,Europe,Southern Europe,4
4,AGO,Angola,Africa,Sub-Saharan Africa,5
...,...,...,...,...,...
268,UMI,United States Minor Outlying Islands,Oceania,Micronesia,269
269,VGB,Virgin Islands (British),Americas,Latin America and the Caribbean,270
270,VIR,Virgin Islands (U.S.),Americas,Latin America and the Caribbean,271
271,WLF,Wallis and Futuna,Oceania,Polynesia,272


### Merge

In [28]:
df_trades_clean = df_trades.merge(df_codes[['clean_code','id_code']], how = 'left', left_on= 'comm_code', right_on='clean_code')

In [29]:
df_trades_clean

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,clean_code,id_code
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,890200,6929
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,890310,6931
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,890310,6931
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,890310,6931
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,890391,6933
...,...,...,...,...,...,...,...,...,...,...
6216348,SYC,1999,890590,Import,3399.0,0.0,283.0,Number of items,890590,6940
6216349,SYC,1999,890600,Import,816.0,0.0,199.0,Number of items,890600,6941
6216350,SYC,1999,890710,Import,31387.0,0.0,1325.0,Number of items,890710,6945
6216351,SYC,1999,890790,Import,8749.0,0.0,1566.0,Number of items,890790,6946


### Clean trades

In [30]:
df_trades_clean = df_trades_clean.merge(df_countries[['alpha-3','id_country']], how = 'left', left_on= 'country_code', right_on='alpha-3')


In [31]:
df_trades_clean

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,clean_code,id_code,alpha-3,id_country
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,890200,6929,SYC,155
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,890310,6931,SYC,155
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,890310,6931,SYC,155
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,890391,6933,SYC,155
...,...,...,...,...,...,...,...,...,...,...,...,...
6216348,SYC,1999,890590,Import,3399.0,0.0,283.0,Number of items,890590,6940,SYC,155
6216349,SYC,1999,890600,Import,816.0,0.0,199.0,Number of items,890600,6941,SYC,155
6216350,SYC,1999,890710,Import,31387.0,0.0,1325.0,Number of items,890710,6945,SYC,155
6216351,SYC,1999,890790,Import,8749.0,0.0,1566.0,Number of items,890790,6946,SYC,155


In [32]:
def create_dimension(data, id_name):
    list_keys = []
    value = 1
    for _ in data:
        list_keys.append(value)
        value += 1
    return pd.DataFrame({id_name:list_keys, 'value':data})

In [35]:
df_quantity = create_dimension(df_trades_clean['quantity_name'].unique(), 'id_quantity')

df_flow = create_dimension(df_trades_clean['flow'].unique(), 'id_flow')

df_year  = create_dimension(df_trades_clean['year'].unique(), 'id_year')

In [37]:
df_trades_clean = df_trades_clean.merge(df_quantity, how = 'left', left_on = 'quantity_name', right_on = 'value')

df_trades_clean = df_trades_clean.merge(df_flow, how = 'left', left_on = 'flow', right_on = 'value')

df_trades_clean = df_trades_clean.merge(df_year, how = 'left', left_on = 'year', right_on = 'value')

In [39]:
df_trades_clean['id_trades']= df_trades_clean.index + 1

In [40]:
df_trades_final = df_trades_clean[['id_trades', 'trade_usd', 'kg', 'quantity', 'id_code', 'id_country', 'id_quantity', 'id_flow', 'id_year']].copy()

In [41]:
df_trades_final

Unnamed: 0,id_trades,trade_usd,kg,quantity,id_code,id_country,id_quantity,id_flow,id_year
0,1,1431426.0,0.0,23000.0,6929,155,1,1,1
1,2,31406.0,0.0,2545.0,6931,155,1,1,1
2,3,950.0,0.0,300.0,6931,155,1,2,1
3,4,950.0,0.0,300.0,6931,155,1,3,1
4,5,18251.0,0.0,450.0,6933,155,1,1,1
...,...,...,...,...,...,...,...,...,...
6216348,6216349,3399.0,0.0,283.0,6940,155,1,1,23
6216349,6216350,816.0,0.0,199.0,6941,155,1,1,23
6216350,6216351,31387.0,0.0,1325.0,6945,155,1,1,23
6216351,6216352,8749.0,0.0,1566.0,6946,155,1,1,23


In [43]:
df_countries = df_countries[['id_country', 'alpha-3', 'country', 'region', 'sub-region']]

In [44]:
df_codes = df_codes[['id_code', 'clean_code', 'Description', 'parent_description']]

In [None]:
import os
#
# os.environ['AWS_ACCESS_KEY_ID'] = 'TU_ACCESS_KEY'
# os.environ['AWS_SECRET_ACCESS_KEY'] = 'TU_SECRET_KEY'
# os.environ['host'] = 'TU_HOST'
# os.environ['database'] = 'TU_DATABASE'
# os.environ['user'] = 'TU_USER'
# os.environ['password'] = 'TU_PASSWORD'

## Load

In [None]:
import os
#
# Crear el directorio 'target' si no existe
os.makedirs('target', exist_ok=True)

# Guardar los DataFrames en archivos CSV
df_trades_final.to_csv('target/trades.csv', index=False, sep='|')
df_countries.to_csv('target/countries.csv', index=False, sep='|')
df_codes.to_csv('target/codes.csv', index=False, sep='|')
df_quantity.to_csv('target/quantity.csv', index=False, sep='|')
df_flow.to_csv('target/flow.csv', index=False, sep='|')
df_year.to_csv('target/years.csv', index=False, sep='|')

In [72]:
import os
import boto3
import redshift_connector

client = boto3.client(
    's3',
    aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
    aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
)

conn = redshift_connector.connect(
    host=os.environ.get('host'),
    database=os.environ.get('database'),
    port=5439,
    user=os.environ.get('user'),
    password=os.environ.get('password'),
)
cursor = conn.cursor()

In [73]:
# Listar los buckets de S3
response = client.list_buckets()
print('Buckets disponibles:')
for bucket in response['Buckets']:
    print(f'  {bucket["Name"]}')

Buckets disponibles:
  cf-templates-hgpaajiewwzl-us-east-1
  gabrielmaxemin-unico-bucket-curso-platzi
  test-gabriel-1995


In [74]:
# Ejecutar una consulta simple en Redshift
cursor.execute("SELECT CURRENT_DATE;")
result = cursor.fetchone()
print(f'Fecha actual en Redshift: {result[0]}')

Fecha actual en Redshift: 2024-12-07


In [None]:
def load_file(file_name):
    table_name = file_name.split('.')[0]
    client.upload_file(
        Filename = 'target/{}'.format(file_name),
        Bucket = 'gabrielmaxemin-unico-bucket-curso-platzi',
        Key = 'course_etl_target/{}'.format(file_name)
    )
    sentence = '''
    COPY public.{}
    FROM 's3://gabrielmaxemin-unico-bucket-curso-platzi/course_etl_target/{}'
    CREDENTIALS 'aws_access_key_id={};aws_secret_access_key={}'
    CSV
    DELIMITER '|'
    REGION 'us-east-1'
    IGNOREHEADER 1;
    '''.format(
        table_name,
        file_name,
        os.environ.get('AWS_ACCESS_KEY_ID'),
        os.environ.get('AWS_SECRET_ACCESS_KEY')
    )

    ## Imprimir la sentencia para depuración
    print('Ejecutando sentencia COPY:')
    print(sentence)

    try:
        cursor.execute(sentence)
        conn.commit()  # Confirmar la transacción
        print('Ok en la tabla ' + table_name)
    except Exception as e:
        conn.rollback()  # Revertir la transacción en caso de error
        print('Error en la tabla ' + table_name)
        print('Detalle del error:', e)

In [None]:
load_file('years.csv')

In [None]:
load_file('trades.csv')
load_file('quantity.csv')
load_file('codes.csv')
load_file('flow.csv')
load_file('countries.csv')

In [86]:
conn.commit()

In [87]:
conn.close()