# ETL de datos de importación de productos

## Instalación de librerías base

In [None]:
import Conn_postgresql as connPOSGRE
import pandas as pd
from sqlalchemy import create_engine
import uuid

## Extraction: Se crean todos los DF para realziar las operaciones.

In [None]:
#CONSULTA A POSTGRE SQL
engine = create_engine('postgresql+psycopg2://postgres:mysecretpass@localhost/postgres') #Ruta para conectarse a SQL desde el sqlalchemy
df_trades = pd.read_sql("select * from trades", engine) #Crea un df, con el resultado de la consulta a la BD a traves de la conexion engine (42 seg.)

conn = connPOSGRE.ConnPOSGRESQL() #Tambien se puede usar esta conexion, pero arrojara un warning dado que pandas es compatible con conexiones a traves de sqlalchemy (34 seg.)
df_trades2 = pd.read_sql("select * from trades", engine) #Crea un df, con el resultado de la consulta a la BD a traves de la conexion engine
conn.close()

In [None]:
#Permite leer los primeros 5 registros del df
df_trades.head()

In [None]:
#CONSULTA A ARCHIVO JSON

#Crea el df_countries, con los registros del json
df_countries = pd.read_json('C:/Users/marco/OneDrive/Escritorio/Platzi/Fundamentos de ETL con Python y Pentaho/Notebooks/country_data.json')

In [None]:
#Permite leer los primeros 5 registros del df
df_countries.head()

In [None]:
#CONSULTA A ARCHIVO CSV

#Crea el df_codes, con los registros del csv
df_codes = pd.read_csv('C:/Users/marco/OneDrive/Escritorio/Platzi/Fundamentos de ETL con Python y Pentaho/Notebooks/hs_codes.csv')

In [None]:
#Permite leer los primeros 5 registros del df
df_codes.head()

In [None]:
#Crea el df_parents, como una copia con los registros del df df_codes, filtrando solo por los que tengan el valor Level=2. Importante no olvidar los (), para que el copy retorne un df
df_parents = df_codes[df_codes['Level']==2].copy()

In [None]:
#Permite leer los registros del df
df_parents

## Transform

#### Clean codes

In [None]:
df_codes = df_codes[df_codes['Code_comm'].notnull()] #Elmina los registros donde Code_comm is not null

In [None]:
#Funcion de limpieza
#Recibe una cadena
#Si el largo del campo code del df df_codes es igual a 11, toma los primeros 5 como el code, y el primero como el parent_code
#Si el largo del campo code del df df_codes es distinto de 11, toma los primeros 5 como el code, y los 2 primeros como el parent_code
#Luego almacena las descripciones en la variable parent, siempre que Code_comm = parent_code
#Finalmente retorna una tupla con los valores code y parent

def clean_code(text):
    text = str(text) #Convierte el texto recibido en un string
    parent_code = None
    if len(text) == 11: #Si el lardo del string es igual a 11
        code = text[:5] #code sera igual a los 5 primeros digitos del string
        parent_code = text[:1] #parent_code sera igual al primer digito del string
    else:
        code = text[:6] #code sera igual a los 6 primeros digitos del string
        parent_code = text[:2] #parent_code sera igual a los 2 primeros digitos del string

    #Funcion try
    try:
        #Se crea la variable parent a partir del df_parents, que retorna una tupla [code,parent] siempre que Code_comm sea igual al parent_code de la funcion clean_code, y retorna solo los valores de la columna Description, como string
        parent = df_parents[df_parents['Code_comm'] == parent_code]['Description'].values[0]
    except:
        parent = None

    return(code,parent) #Retorna una tupla con los valores code y parent

In [None]:
#Inserta las columas clean_code y parent_description, dentro del df df_codes
#Estas seran el resultado de, aplciar la funcion clean_code, sobre la columna Code, la cual retorna una tupla con los valores (code,parent)
    #apply, permite aplicar funciones sobre las columnas
    #lambda, sintaxis de escritura para solicitar una acción
    #axis, por donde hara el recorrido, el valor 1 indica que lo realizara sobre cada fila, el valor 0 indica que lo hara sobre cada columna
    #result_type, para indicar de que manera retornara el resultado, en este caso expand, indica que sera una tupla de 2 valores, que conformaran los valores de las 2 columnas a agregar

df_codes[['clean_code','parent_description']] = df_codes.apply(lambda x : clean_code(x['Code']), axis=1, result_type='expand')

set(df_codes['Level']) #Obtiene los valores unicos de la columna Level
df_codes[df_codes['Level']==2] #Obtiene los registros siempre que Levell sea igual a 2

In [268]:
#Recrea el df df_codes, filtrando solo los registros donde el clean_code no sea null, y retornando solo los campos 'clean_code','Description','parent_description'
df_codes = df_codes[df_codes['clean_code'].notnull()][['clean_code','Description','parent_description']]

df_codes['id_code'] = df_codes.index + 1 #Inserta una columna con el valor del propio indice del df mas 1
df_codes['clean_code'] = df_codes['clean_code'].astype('int64') #Actuliza el tipo de dato clean_code de str a int64

#Reordena las columnas del df df_codes
df_codes = df_codes[['id_code','clean_code','Description','parent_description']]

In [269]:
df_codes

Unnamed: 0,id_code,clean_code,Description,parent_description
1,2,10011,LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS
2,3,10021,LIVE ANIMALS,LIVE ANIMALS
3,4,10100,"Live horses, asses, mules and hinnies",LIVE ANIMALS
5,6,10121,Pure-bred breeding horses,LIVE ANIMALS
6,7,10129,Live horses (excl. pure-bred for breeding),LIVE ANIMALS
...,...,...,...,...
7432,7433,970200,"Original engravings, prints and lithographs","WORKS OF ART, COLLECTORS' PIECES AND ANTIQUES"
7433,7434,970300,"Original sculptures and statuary, in any material","WORKS OF ART, COLLECTORS' PIECES AND ANTIQUES"
7434,7435,970400,"Postage or revenue stamps, stamp-postmarks, fi...","WORKS OF ART, COLLECTORS' PIECES AND ANTIQUES"
7435,7436,970500,Collections and collector's pieces of zoologic...,"WORKS OF ART, COLLECTORS' PIECES AND ANTIQUES"


### Clean Countries

In [266]:
#Recrea el df df_countries, filtrando solo los registros donde el clean_code no sea null, y retornando solo los campos 'alpha-3','country','region','sub-region'
df_countries = df_countries[df_countries['alpha-3'].notnull()][['alpha-3','country','region','sub-region']]
df_countries['id_country'] = df_countries.index + 1 #Inserta una columna con el valor del propio indice del df mas 1

#Reordena las columnas del df df_countries
df_countries = df_countries[['id_country','alpha-3','country','region','sub-region']]

In [267]:
df_countries

Unnamed: 0,id_country,alpha-3,country,region,sub-region
0,1,AFG,Afghanistan,Asia,Southern Asia
1,2,ALB,Albania,Europe,Southern Europe
2,3,DZA,Algeria,Africa,Northern Africa
3,4,AND,Andorra,Europe,Southern Europe
4,5,AGO,Angola,Africa,Sub-Saharan Africa
...,...,...,...,...,...
268,269,UMI,United States Minor Outlying Islands,Oceania,Micronesia
269,270,VGB,Virgin Islands (British),Americas,Latin America and the Caribbean
270,271,VIR,Virgin Islands (U.S.),Americas,Latin America and the Caribbean
271,272,WLF,Wallis and Futuna,Oceania,Polynesia


### Merge

In [None]:
#Crea el df df_trades_clean, haciendo un join de todos los campos del df df_trades left join solo los campos 'clean_code','id_code' del df df_codes, on df_trades.comm_code = df_codes.clean_code
df_trades_clean = df_trades.merge(df_codes[['clean_code','id_code']], how='left',left_on='comm_code',right_on='clean_code')

#Recrea el df df_trades_clean, haciendo un join de todos los campos del df df_trades_clean left join solo los campos 'alpha-3','id_country' del df df_countries, on df_trades_clean.country_code = df_countries.alpha-3
df_trades_clean = df_trades_clean.merge(df_countries[['alpha-3','id_country']], how='left',left_on='country_code',right_on='alpha-3')

In [None]:
df_trades_clean

### Clean trades

In [None]:
#Funcion que permite crear tablas de dimension a partir de un df existente
#Recibe un df, en este caso se usaran los valores unicos de una columnas, y el nombre que recibira el campo
#Retorna un df con los campos: id_name(Parametro que se pasa a la funcion) y values(Valores entregados en el primer parametro)

def create_dimension (data, id_name):
    list_keys = []
    value = 1
    for _ in data:
        list_keys.append(value) #Inserta los valores de ID, iniciando en 1
        value += 1
    
    return pd.DataFrame({id_name:list_keys, 'values':data})

In [None]:
#Crea las tablas de dimentsion, a partir de la funcion create_dimension

#Dimension para quantity_name
df_quantity = create_dimension(df_trades_clean['quantity_name'].unique(),'id_quantity')

#Dimension para flow
df_flow = create_dimension(df_trades_clean['flow'].unique(),'id_flow')

#Dimension para year
df_year = create_dimension(df_trades_clean['year'].unique(),'id_year')

In [None]:
df_quantity

In [None]:
df_flow

In [None]:
df_year

In [None]:
#Recrea el df df_trades_clean, haciendo un join de todos los campos del df df_trades_clean left join con todo el df df_quantity, on df_trades_clean.quantity_name = df_quantity.values
df_trades_clean = df_trades_clean.merge(df_quantity, how='left', left_on='quantity_name', right_on='values')

#Recrea el df df_trades_clean, haciendo un join de todos los campos del df df_trades_clean left join con todo el df df_flow, on df_trades_clean.flow = df_flow.values
df_trades_clean = df_trades_clean.merge(df_flow, how='left', left_on='flow', right_on='values')

#Recrea el df df_trades_clean, haciendo un join de todos los campos del df df_trades_clean left join con todo el df df_year, on df_trades_clean.year = df_year.values
df_trades_clean = df_trades_clean.merge(df_year, how='left', left_on='year', right_on='values')

In [None]:
#Inserta el campo id_trades, con el valor index+1
df_trades_clean['id_trades'] = df_trades_clean.index + 1

In [None]:
#Crea el df df_trades_final, a partir del df df_trades_clean, tomando solo las columnas 'id_trades','trade_usd','kg','quantity','id_code','id_country','id_quantity','id_flow','id_year'
df_trades_final = df_trades_clean[['id_trades','trade_usd','kg','quantity','id_code','id_country','id_quantity','id_flow','id_year']].copy()

In [264]:
df_trades_final

Unnamed: 0,id_trades,trade_usd,kg,quantity,id_code,id_country,id_quantity,id_flow,id_year
0,1,312258.0,18777.0,18777.0,6934,155,1,1,1
1,2,53508.0,4000.0,4000.0,6934,155,1,2,1
2,3,53508.0,4000.0,4000.0,6934,155,1,3,1
3,4,409683.0,14790.0,14790.0,6935,155,1,1,1
4,5,65066.0,1000.0,1000.0,6935,155,1,2,1
...,...,...,...,...,...,...,...,...,...
6216348,6216349,42100.0,2479.0,2479.0,6925,155,1,1,1
6216349,6216350,33558.0,339.0,339.0,6928,155,1,1,1
6216350,6216351,327946.0,16000.0,16000.0,6929,155,1,1,1
6216351,6216352,28378.0,956.0,956.0,6931,155,1,1,1


## Load

In [None]:
import os
import boto3

