# Desenvolvendo um processo ETL com Python e Postgres

## O que é ETL? 

ETL é um tipo de data integration em três etapas (extração, transformação, carregamento) usado para combinar dados de diversas fontes. Ele é comumente utilizado para construir um data warehouse.

- E = **Extraction**:
A parte de extração está relacionada com a obtenção dos dados independente da fonte e forma como estes dados podem ser obtidos. Em um processod e ETL a extração pode ocorrer de diferentes fontes, tais como: sites, aplicativos, de um arquivo em execel ou csv, de um banco de dados relacional, entre outras. É comum em processos de ETL que dados venham de um banco de dados como um Data Lake, um Banco NoSQL, ou até mesmo, como menor frequência de um banco de dados relacional.
- T = **Trasform**:
Na etapa de trasnformação os dados obtidos são submetidos a diferentes tratamentos, desde trasnformações de tipos númericos, o que é  bem comum, passa pelo processo de engenharia, onde novas features são criadas ou features disponíveis são limpas e submetidas a testes de validade.
- L = **Load**:
Após todas as trasnformações as novas features serão disponibilizadas em um novo banco de dados, este será mais confiável, com informações relevantes e tratadas.

## Data Warehouse

Um data warehouse nada mais é que um banco de dados que possui informações de uma ou várias fontes. Como exemplo podemos citar uma rede varejista que pode usar um data warehouse para integrar e combinar diversas informações do cliente, tais como cpf ou email do cliente, forma de pagamento, interações em comentários dos produtos, entre outras. A principal vantagem de um data warehouse é seu papel na simplificação de dados para as áreas de buisiness inteligence, data science e outras. Desta forma, podemos compreender como o processo de ETL em data warehouse é um fator de fundamental importância o movimento natural dos dados de uma camada de arquitetura para outra.

## Construindo o ETL para o projeto HISCP

Para o nosso exemplo temos o conjunto de dados disponível em dois arquivos csv's. Desta forma, a parte ```Extract``` do nosso processo será a criação de uma função para leitura destes arquivos.

Na parte de ```Transform``` vamos validar os tipos de cada coluna, verificar a existência de valores faltantes e realizar o feature engineering inicial, nas análises futuras com o analista e o cientista de dados podemos criar novas colunas, porém vamos partir desta base já melhorada, por isso é importante no processo de ETL não simplificar demasiadamente os dados.

No ```Load``` vamos carregar a nova base dados para um Data Warehouse hospedado em banco de dados postgress. Este será utilizado nas consultas pelos times de **data analytics** e **data science**. 

In [24]:
#Bibliotecas
import pandas as pd
from sqlalchemy import create_engine

# Extract

In [2]:
file_path = "../dataset/"
file_name = "train"
file_type = ".csv"

#../dataset/

data = pd.read_csv("../dataset/train.csv")
data.head()

Unnamed: 0,id,gender,age,driving_license,region_code,previously_insured,vehicle_age,vehicle_damage,annual_premium,policy_sales_channel,vintage,response
0,1,Male,44,1,28,0,> 2 Years,Yes,40454,26,217,1
1,2,Male,76,1,3,0,1-2 Year,No,33536,26,183,0
2,3,Male,47,1,28,0,> 2 Years,Yes,38294,26,27,1
3,4,Male,21,1,11,1,< 1 Year,No,28619,152,203,0
4,5,Female,29,1,41,1,< 1 Year,No,27496,152,39,0


In [3]:
# Defined extract function
def extract(file_path, file_name, file_type):
    """ Get all datas from a csv file
    
    Args:
        file_path: path of destination file
        file_name: name of extract file
        file_type: extension of read file   
    """
    
    data = pd.read_csv(f'{file_path}{file_name}{file_type}')
    return data

In [4]:
data_et = extract(file_path, file_name, file_type)
data_et.head()

Unnamed: 0,id,gender,age,driving_license,region_code,previously_insured,vehicle_age,vehicle_damage,annual_premium,policy_sales_channel,vintage,response
0,1,Male,44,1,28,0,> 2 Years,Yes,40454,26,217,1
1,2,Male,76,1,3,0,1-2 Year,No,33536,26,183,0
2,3,Male,47,1,28,0,> 2 Years,Yes,38294,26,27,1
3,4,Male,21,1,11,1,< 1 Year,No,28619,152,203,0
4,5,Female,29,1,41,1,< 1 Year,No,27496,152,39,0


## Transform

- Informações das variáveis disponíveis:

Variable            |	Definition
--------------------|--------------
id                  | 	Unique ID for the customer
Gender 	            | Gender of the customer
Age 	            | Age of the customer
Driving_License     |0 : Customer does not have DL, 1 : Customer already has DL
Region_Code 	    |Unique code for the region of the customer
Previously_Insured 	|1 : Customer already has Vehicle Insurance, 0 : Customer doesn't have Vehicle Insurance
Vehicle_Age 	    |Age of the Vehicle
Vehicle_Damage 	    |1 : Customer got his/her vehicle damaged in the past. 0 : Customer didn't get his/her vehicle damaged in the past.
Annual_Premium 	    |The amount customer needs to pay as premium in the year
PolicySalesChannel 	|Anonymized Code for the channel of outreaching to the customer ie. Different Agents, Over Mail, Over Phone, In Person, etc.
Vintage 	        |Number of Days, Customer has been associated with the company
Response 	        |1 : Customer is interested, 0 : Customer is not interested

In [5]:
# What are types of our variables? 
data_et.dtypes

id                       int64
gender                  object
age                      int64
driving_license          int64
region_code              int64
previously_insured       int64
vehicle_age             object
vehicle_damage          object
annual_premium           int64
policy_sales_channel     int64
vintage                  int64
response                 int64
dtype: object

In [6]:
#There are some null values?
data_et.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 381109 entries, 0 to 381108
Data columns (total 12 columns):
 #   Column                Non-Null Count   Dtype 
---  ------                --------------   ----- 
 0   id                    381109 non-null  int64 
 1   gender                381109 non-null  object
 2   age                   381109 non-null  int64 
 3   driving_license       381109 non-null  int64 
 4   region_code           381109 non-null  int64 
 5   previously_insured    381109 non-null  int64 
 6   vehicle_age           381109 non-null  object
 7   vehicle_damage        381109 non-null  object
 8   annual_premium        381109 non-null  int64 
 9   policy_sales_channel  381109 non-null  int64 
 10  vintage               381109 non-null  int64 
 11  response              381109 non-null  int64 
dtypes: int64(9), object(3)
memory usage: 34.9+ MB


In [7]:
#There are some outliers values?
data_et.describe()

Unnamed: 0,id,age,driving_license,region_code,previously_insured,annual_premium,policy_sales_channel,vintage,response
count,381109.0,381109.0,381109.0,381109.0,381109.0,381109.0,381109.0,381109.0,381109.0
mean,190555.0,38.822584,0.997869,26.388807,0.45821,30564.389581,112.034295,154.347397,0.122563
std,110016.836208,15.511611,0.04611,13.229888,0.498251,17213.155057,54.203995,83.671304,0.327936
min,1.0,20.0,0.0,0.0,0.0,2630.0,1.0,10.0,0.0
25%,95278.0,25.0,1.0,15.0,0.0,24405.0,29.0,82.0,0.0
50%,190555.0,36.0,1.0,28.0,0.0,31669.0,133.0,154.0,0.0
75%,285832.0,49.0,1.0,35.0,1.0,39400.0,152.0,227.0,0.0
max,381109.0,85.0,1.0,52.0,1.0,540165.0,163.0,299.0,1.0


### Algumas suposições sobre os dados

O banco de dados não possui valores faltantes, apenas três colunas possuem variáveis do tipo ```object```, estão serão codificadas com números inteiros. O banco de dados possui um desbalancemanto significativo para a variável resposta. Em seguida vamos construir a função ```transform```.

- Variáveis impactadas

Variable            |	Definition
--------------------|--------------
Gender 	            |Gender of the customer. 1: Male. 0: Female
Vehicle_Age 	    |Age of the Vehicle. 0: < 1 year. 1: 1-2 Year. 2: > 2 Years.
Vehicle_Damage 	    |1 : Customer got his/her vehicle damaged in the past. 0 : Customer didn't get his/her vehicle 

In [8]:
def transfom(data):
    """ Transform data columns to int type
    
    Args: 
        data: dataframe
    
    """
    data["gender"] = data["gender"].map({"Male": 1, "Female": 0}).astype(int)
    data["vehicle_age"] = data["vehicle_age"].map({"< 1 Year": 0, "1-2 Year": 1, "> 2 Years": 2}).astype(int)
    data["vehicle_damage"] = data["vehicle_damage"].map({"Yes": 1, "No": 0}).astype(int)
    
    return data

In [9]:
data_tf = transfom(data_et)
data_tf.head()

Unnamed: 0,id,gender,age,driving_license,region_code,previously_insured,vehicle_age,vehicle_damage,annual_premium,policy_sales_channel,vintage,response
0,1,1,44,1,28,0,2,1,40454,26,217,1
1,2,1,76,1,3,0,1,0,33536,26,183,0
2,3,1,47,1,28,0,2,1,38294,26,27,1
3,4,1,21,1,11,1,0,0,28619,152,203,0
4,5,0,29,1,41,1,0,0,27496,152,39,0


## Load

Antes de usar a função ```load```, é preciso criar o banco de dados ``` DataWarehouse``` no postgres, em seguida criar a tabela com as respectivas colunas. 

OBS: Nosso trabalho foi todo desenvolvido em container Docker, para instaciar todos os serviços siga os passos desta [documentação](https://github.com/WesleyJw/HICSP/tree/main/DockerEnvironment). 

Podemos criar o banco de dados diretamente no container com o serviço ```postgres```, para acessar o terminal do container em execução utilize: 

```console
 docker exec -it <id_container> /bin/bash

```

Em seguida acesse o banco postgres,

```console
  psql -U postgres
```

Com o acesso  ao banco, crie o repositório chamado DataWarehouse.

```sql

    CREATE DATABASE DataWarehouse

```

Se você preferir também é possível criar o novo banco de dados via interface do ```PgAdmin```, por meio da porta ```localhost:5050```, faça login no pgadmin, acesse o servidor com a sua senha de admin, em seguida clique com o botão direito do mouse em *CREATE*, coloque o nome do banco de dados e clique em save. 


Para  criar a tabela é possível tanto utilizar o próprio postgres, quando o serviço do ```pg_admin``` acessando no banco de dados a função powerQuery, depois adicione o comando abaixo:

```sql
CREATE TABLE train (id SERIAL PRIMARY KEY,
						gender INTEGER NOT NULL,
						age INTEGER, 
					    driving_license INTEGER, 
					    region_code INTEGER,
					    previously_insured INTEGER, 
					    vehicle_age INTEGER, 
					    vehicle_damage INTEGER, 
					    annual_premium INTEGER,
       				 policy_sales_channel INTEGER, 
					    vintage INTEGER, 
					    response INTEGER NOT NULL);

```

Criando o conjunto de teste.

```sql
CREATE TABLE test (id SERIAL PRIMARY KEY,
						gender INTEGER NOT NULL,
						age INTEGER, 
					    driving_license INTEGER, 
					    region_code INTEGER,
					    previously_insured INTEGER, 
					    vehicle_age INTEGER, 
					    vehicle_damage INTEGER, 
					    annual_premium INTEGER,
       				 policy_sales_channel INTEGER, 
					    vintage INTEGER);

```

In [46]:
#Postgres config
settings = {
    'host': 'pg_container', #or: 172.20.0.2 IPAddress
    'user': 'postgres',
    'dbname': 'DataWarehouse',
    'psw': 'admin'
}

def load(data, settings, table_name):
    """Put data to Data warehouse in postgres
    
    Args:
        data: a data frame pandas
        settings: credentials to connect with postgres
        table_name: table name to imput data into data warehouse
    """
    print(f"Uploading {data.shape[0]} to Data Warehouse in postgres.")
    
    try:
        #postgres connect
        engine = create_engine(f"postgresql://{settings['user']}:{settings['psw']}@{settings['host']}/{settings['dbname']}")
        con = engine.connect()
        data.to_sql(name=table_name,con=con,if_exists='append', index = False)
        con.close()
    except Exception as e:
        print("Data load error: " + str(e))

In [47]:
load(data_tf, settings, 'train')

Uploading 381109 to Data Warehouse in postgres.
