In [159]:
import pandas as pd
from sqlalchemy import create_engine

## Extraction

In [160]:
# Connect to the database
engine = create_engine('postgresql://root:root@localhost:5432/posgres-db')
# Read the data from the database
df_trades = pd.read_sql("SELECT * FROM trades", engine)

In [161]:
# Load the data from the CSV file
df_countries = pd.read_json('src/country_data.json')

In [162]:
# Load the data from the CSV file
df_codes = pd.read_csv('src/hs_codes.csv')

In [163]:
# Filter the data to only include the parents (Level 2)
df_parents = df_codes[df_codes['Level']==2].copy()

## Transformation

### Clean df_codes

In [164]:
df_codes = df_codes[df_codes['Code_comm'].notnull()]

In [165]:
def clean_code(text) -> tuple :
    text = str(text)
    parent_code = None
    if len(text) == 11:
        code = text[:5]
        parent_code = text[:1]
    else:
        code = text[:6]
        parent_code = text[:2]
    try:
        parent = df_parents[df_parents['Code_comm']==parent_code]['Description'].values[0]
    except:
        parent = None
    return (code, parent)

In [166]:
df_codes[['clean_code', 'parent_description']] = df_codes.apply(lambda x: clean_code(x['Code']), axis=1, result_type='expand').copy()

In [167]:
df_codes = df_codes[df_codes['clean_code'].notnull()][['clean_code', 'parent_description', 'Description']]


In [168]:
df_codes['id_code'] = df_codes.index + 1
df_codes['clean_code'] = df_codes['clean_code'].astype('int64')

### Clean df_countries

In [169]:
df_countries = df_countries[['alpha-3', 'country', 'region', 'sub-region']]

In [170]:
# alpha-3 not null
df_countries = df_countries[df_countries['alpha-3'].notnull()]

In [171]:
df_countries['id_country'] = df_countries.index + 1

### Merge df_codes and df_countries into df_trades

In [173]:
df_trades_clean = df_trades.merge(df_codes[['clean_code', 'id_code']], left_on='comm_code', right_on='clean_code', how='left')
df_trades_clean = df_trades_clean.merge(df_countries[['alpha-3', 'id_country']], left_on='country_code', right_on='alpha-3', how='left')

### Clean trades

In [174]:
def create_dimension(data, id_name):
    list_keys = []
    value = 1
    for _ in data:
        list_keys.append(value)
        value += 1
    return pd.DataFrame({id_name: list_keys, 'values': data})

In [175]:
df_quantity = create_dimension(df_trades_clean['quantity_name'].unique(), 'id_quantity')

In [176]:
df_flow = create_dimension(df_trades_clean['flow'].unique(), 'id_flow')

In [177]:
df_year = create_dimension(df_trades_clean['year'].unique(), 'id_year')

In [178]:
df_trades_clean = df_trades_clean.merge(df_quantity, left_on='quantity_name', right_on='values', how='left')
df_trades_clean = df_trades_clean.merge(df_flow, left_on='flow', right_on='values', how='left')
df_trades_clean = df_trades_clean.merge(df_year, left_on='year', right_on='values', how='left')

In [179]:
df_trades_clean['id_trade'] = df_trades_clean.index + 1

In [180]:
df_trades_final = df_trades_clean[['id_trade', 'trade_usd', 'kg', 'quantity', 'id_code', 'id_country', 'id_quantity', 'id_flow', 'id_year']].copy()

In [181]:
df_trades_final

Unnamed: 0,id_trade,trade_usd,kg,quantity,id_code,id_country,id_quantity,id_flow,id_year
0,1,1431426.0,0.0,23000.0,6929,155,1,1,1
1,2,31406.0,0.0,2545.0,6931,155,1,1,1
2,3,950.0,0.0,300.0,6931,155,1,2,1
3,4,950.0,0.0,300.0,6931,155,1,3,1
4,5,18251.0,0.0,450.0,6933,155,1,1,1
...,...,...,...,...,...,...,...,...,...
2999995,2999996,5236.0,3312.0,3312.0,2591,9,2,3,28
2999996,2999997,4796469.0,1483222.0,1483222.0,2592,9,2,1,28
2999997,2999998,116965.0,38002.0,38002.0,2592,9,2,2,28
2999998,2999999,16520.0,6960.0,6960.0,2592,9,2,3,28


In [182]:
df_countries = df_countries[['id_country', 'alpha-3', 'country', 'region', 'sub-region']].copy()

In [183]:
df_countries

Unnamed: 0,id_country,alpha-3,country,region,sub-region
0,1,AFG,Afghanistan,Asia,Southern Asia
1,2,ALB,Albania,Europe,Southern Europe
2,3,DZA,Algeria,Africa,Northern Africa
3,4,AND,Andorra,Europe,Southern Europe
4,5,AGO,Angola,Africa,Sub-Saharan Africa
...,...,...,...,...,...
268,269,UMI,United States Minor Outlying Islands,Oceania,Micronesia
269,270,VGB,Virgin Islands (British),Americas,Latin America and the Caribbean
270,271,VIR,Virgin Islands (U.S.),Americas,Latin America and the Caribbean
271,272,WLF,Wallis and Futuna,Oceania,Polynesia


In [184]:
df_codes = df_codes[['id_code', 'clean_code', 'Description', 'parent_description']].copy()

In [185]:
df_codes

Unnamed: 0,id_code,clean_code,Description,parent_description
1,2,10011,LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS
2,3,10021,LIVE ANIMALS,LIVE ANIMALS
3,4,10100,"Live horses, asses, mules and hinnies",LIVE ANIMALS
5,6,10121,Pure-bred breeding horses,LIVE ANIMALS
6,7,10129,Live horses (excl. pure-bred for breeding),LIVE ANIMALS
...,...,...,...,...
7432,7433,970200,"Original engravings, prints and lithographs","WORKS OF ART, COLLECTORS' PIECES AND ANTIQUES"
7433,7434,970300,"Original sculptures and statuary, in any material","WORKS OF ART, COLLECTORS' PIECES AND ANTIQUES"
7434,7435,970400,"Postage or revenue stamps, stamp-postmarks, fi...","WORKS OF ART, COLLECTORS' PIECES AND ANTIQUES"
7435,7436,970500,Collections and collector's pieces of zoologic...,"WORKS OF ART, COLLECTORS' PIECES AND ANTIQUES"


## Load

### Save targets

In [186]:
df_trades_final.to_csv('target/trades.csv', index=False, sep='|')
df_countries.to_csv('target/countries.csv', index=False, sep='|')
df_codes.to_csv('target/codes.csv', index=False, sep='|')
df_quantity.to_csv('target/quantity.csv', index=False, sep='|')
df_flow.to_csv('target/flow.csv', index=False, sep='|')
df_year.to_csv('target/years.csv', index=False, sep='|')

In [187]:
# redshift connection
import os
import boto3
import redshift_connector
from dotenv import load_dotenv

load_dotenv(".env")

# Connect to the database
conn = redshift_connector.connect(
    host=os.getenv("AWS_HOST"),
    database=os.getenv("AWS_DATABASE"),
    user=os.getenv("AWS_USER"),
    password=os.getenv("AWS_PASSWORD"),
    port=5439
)

client = boto3.client(
    's3',
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)

cursor = conn.cursor()

In [188]:
def load_file(file_name):
    table_name = file_name.split('.')[0]

    # Cargar el archivo en S3
    client.upload_file(
        Filename=f'target/{file_name}',
        Bucket=os.getenv("AWS_BUCKET_NAME"),
        Key=f'etl/{file_name}'
    )

    # Cargar el archivo en Redshift
    sentence = f"""
        COPY {table_name}
        FROM 's3://etlfirstbucket/etl/{file_name}'
        CREDENTIALS 'aws_access_key_id={os.getenv('AWS_ACCESS_KEY_ID')};aws_secret_access_key={os.getenv('AWS_SECRET_ACCESS_KEY')}'
        csv
        DELIMITER '|'
        IGNOREHEADER 1;
    """
    try:
        cursor.execute( sentence )
        conn.commit()
        print(f"File {file_name} loaded successfully")
    except:
        conn.rollback()
        print(f"Error loading file {file_name}")
        # Para ver cualquier tipo de errores ejecutar SELECT * FROM stl_load_errors

In [189]:
for file in os.listdir('target'):
    load_file(file)

File years.csv loaded successfully
File countries.csv loaded successfully
File quantity.csv loaded successfully
File flow.csv loaded successfully
File codes.csv loaded successfully
File trades.csv loaded successfully


In [190]:
conn.close()