# *ETL usando MySQL*

Extrair, transformar, carregar (Extract, Transform, Load - ETL) é o principal processo pelo qual as empresas reúnem informações de fontes de dados e as replicam para destinos como data warehouses para uso com ferramentas de business intelligence (BI). 

Aqui, usamos a linguagem Python para construir um pipeline de ETL. Também utilizamos o MySQL, que é um banco de dados de código aberto e um dos melhores SGBDR (sistema de gerenciamento de banco de dados relacional).

O objetivo é projetar um pipeline ETL que seja simples mas eficiente, onde criamos tabelas no MySQL com informações sobre os empregados de uma empresa, extraímos os dados do banco de dados, transformamos ou limpar os dados usando vários métodos do Pandas e carregamos os dados transformados ou limpos novamente no banco de dados MySQL.

## 1. Criando um Banco de Dados no MySQL

Primeiro vamos nos conectar com o MySQL inserindo as credenciais na função connect()

In [2]:
import mysql.connector

db_connection = mysql.connector.connect(
    host="localhost",
    user="username",
    passwd="password"
    )

Agora criamos banco de dados chamado "organization_employee" usando programação de banco de dados em Python.

In [None]:
# cria um cursor de banco de dados para executar operações SQL 
db_cursor = db_connection.cursor()
# executa o cursor com o método 'execute' e passa a consulta SQL 
db_cursor.execute("CREATE DATABASE organization_employee")

Pronto, temos o banco de daods criado.

In [18]:
# fecha a conexão com o MySQL
db_connection.close()

### 1.1 Criando as Tabelas

Vamos agora abrir o banco de dados 'organization_employee' e criar duas tabelas chamadas 'emp' e 'dept'. 

In [6]:
# Abre o MySQL
mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  password="bankai",
  database="organization_employee"
)

# cria o cursor
mycursor = mydb.cursor()

In [11]:
# cria a tabela 'emp' com suas respectivas colunas
mycursor.execute("CREATE TABLE emp (empno INT(5),\
                  ename VARCHAR(255),\
                  job VARCHAR(255),\
                  mgr INT(5), \
                  hiredate DATE, \
                  sal INT(5), \
                  comn INT(5), \
                  deptno INT(3))")

In [14]:
# cria a tabela 'dept' e suas colunas
mycursor.execute("CREATE TABLE dept (deptno INT(3),\
                  dname VARCHAR(255),\
                  loc VARCHAR(255))")

### 1.2 Inserindo Dados nas Tabelas

Na tabela 'emp' serão inseridos os dados referentes aos funcionários da empresa, tais como número do funcionário, nome, função, credencial, data de nascimento, salário, comissão e id do departamento.

In [None]:
# método SQL para inserir dados na tabela 'emp'
sql = "INSERT INTO emp (empno, ename, job, mgr, hiredate, sal, comn, deptno) VALUES (%s, %s, %s, %s, %s, %s,%s, %s)"
# dados dos funcionários a serem inseridos
val = [
  ('7369', 'SMITH', 'CLERK', '7902', '1980-12-17', '800', ' ' , '20'),
  ('7499', 'ALLEN', 'SALESMAN', '7698', '1981-02-20','1600','300', '30'),
  ('7521', 'WARD', 'SALESMAN', '7698', '1982-02-22', '1250','500', '30'),
  ('7566', 'JONES', 'MANAGER', '7839', '1981-04-02', '2975',  ''  ,'20'),
  ('7654', 'MARTIN', 'SALESMAN', '7698', '1981-09-28','1250','1400','30'),
  ('7698', 'BLAKE', 'MANAGER', '7839', '1981-05-01', '2850', ' '  , '30'),
  ('7782', 'CLARK', 'MANAGER', '7839', '1981-06-09', '2450', ' '  , '10'),
  ('7788', 'SCOTT', 'ANALYST', '7566', '1987-04-19', '3000',  ' ' , '20'),
  ('7839', 'KING', 'PRESIDENT',   ' '   , '1981-11-17', '5000', ' '  ,'10'),
  ('7844', 'TURNER', 'SALESMAN', '7698', '1981-09-08','1500', '0', '30')
]

# executa o método SQL
mycursor.executemany(sql, val)
# commit() faz as alterações, caso contrário, nenhuma alteração será feita na tabela
mydb.commit()
# imprime quantas linhas foram inseridas na tabela
print(mycursor.rowcount, "linhas foram inseridas.")

Na tabela 'dept' serão inseridos os dados referentes aos departamentos da empresa, tais como id do departamento, seção e localização.

In [15]:
# método SQL para inserir dados na tabela 'dept'
sql = "INSERT INTO dept (deptno, dname, loc) VALUES (%s, %s, %s)"
# dados dos departamentos a serem inseridos
val = [
  ('10', 'ACCOUNTING', 'NEW YORK'),
  ('20', 'RESEARCH', 'DALLAS'),
  ('30', 'SALES', 'CHICAGO'),
  
]
# executa o método SQL
mycursor.executemany(sql, val)
# faz as alterações na tabela
mydb.commit()
# imprime quantas linhas foram inseridas na tabela
print(mycursor.rowcount, "linhas foram inseridas.")

3 linhas foram inseridas.


In [19]:
# fecha a conexão com o MySQL
mydb.close()

## 2. Extraindo Dados

Agora que já temos nossas tabelas no MySQL, vamos extrair seus dados.

SQLAlchemy é uma ferramenta Python SQL que nos fornece flexibilidade para fazer conexão com vários bancos de dados relacionais, no caso o MySQL.

create_engine é um método definido em SQLAlchemy que leva argumentos (credenciais de conexão) para fazer a conexão com o banco de dados. 

In [25]:
import pandas as pd
import sqlalchemy

Além da engine precisamos de um drive de conexão, cada banco de dados possui o seu drive específico. 
A seguir vamos instalar o drive do MySQL.

In [27]:
!pip install pymysql



In [28]:
# criando o conexão com MySQL e o banco de dados 'organization_employee'
engine = sqlalchemy.create_engine('mysql+pymysql://root:bankai@localhost:3306/organization_employee')

Pronto, já estamos conectados com o banco de dados. Agora vamos começar a fazer as nossas consultas no banco.

Com o método 'read_sql_query' podemos trazer os dados usando uma query SQL, ele nos permite consultar uma ou diversas tabelas.

In [32]:
# extrai os dados dos funcionários
emp_df=pd.read_sql_query('select * from emp',engine)
emp_df

Unnamed: 0,empno,ename,job,mgr,hiredate,sal,comn,deptno
0,7369,SMITH,CLERK,7902,1980-12-17,800,0,20
1,7499,ALLEN,SALESMAN,7698,1981-02-20,1600,300,30
2,7521,WARD,SALESMAN,7698,1982-02-22,1250,500,30
3,7566,JONES,MANAGER,7839,1981-04-02,2975,0,20
4,7654,MARTIN,SALESMAN,7698,1981-09-28,1250,1400,30
5,7698,BLAKE,MANAGER,7839,1981-05-01,2850,0,30
6,7782,CLARK,MANAGER,7839,1981-06-09,2450,0,10
7,7788,SCOTT,ANALYST,7566,1987-04-19,3000,0,20
8,7839,KING,PRESIDENT,0,1981-11-17,5000,0,10
9,7844,TURNER,SALESMAN,7698,1981-09-08,1500,0,30


In [33]:
# extrai os dados dos departamentos
dept_df=pd.read_sql_query('select * from dept',engine)
dept_df

Unnamed: 0,deptno,dname,loc
0,10,ACCOUNTING,NEW YORK
1,20,RESEARCH,DALLAS
2,30,SALES,CHICAGO


## 3. Transformação e Limpeza dos Dados

Primeiro vamos criar uma função para calcular o Imposto sobre o Salário dos Funcionários. 
Em seguida, implementamos os valores calculados valores da tabela.

In [35]:
# função impostos
def cal_taxes(sal):
    tax=0
    if sal >500 and sal <=1250:
         tax=sal*.125
    elif sal>1250 and sal<=1700:
        tax=sal*.175
    elif sal>1700 and sal<=2500:
        tax=sal*.225
    elif sal>2500:
        tax=sal*.275
    else:
        tax=0
    return tax

In [37]:
# usamos o método map() para implementar a função definida na série 
emp_df['Tax'] = emp_df['sal'].map(cal_taxes)
emp_df

Unnamed: 0,empno,ename,job,mgr,hiredate,sal,comn,deptno,Tax
0,7369,SMITH,CLERK,7902,1980-12-17,800,0,20,100.0
1,7499,ALLEN,SALESMAN,7698,1981-02-20,1600,300,30,280.0
2,7521,WARD,SALESMAN,7698,1982-02-22,1250,500,30,156.25
3,7566,JONES,MANAGER,7839,1981-04-02,2975,0,20,818.125
4,7654,MARTIN,SALESMAN,7698,1981-09-28,1250,1400,30,156.25
5,7698,BLAKE,MANAGER,7839,1981-05-01,2850,0,30,783.75
6,7782,CLARK,MANAGER,7839,1981-06-09,2450,0,10,551.25
7,7788,SCOTT,ANALYST,7566,1987-04-19,3000,0,20,825.0
8,7839,KING,PRESIDENT,0,1981-11-17,5000,0,10,1375.0
9,7844,TURNER,SALESMAN,7698,1981-09-08,1500,0,30,262.5


Podemos calcular o porcentual comissão/salário. Antes disso precisamos limpar os dados. Como temos zeros na coluna de 'comn', esses dados indesejados podem afetar o cálculo.

Vamos criar uma estratégia, onde existir zero em 'comn' iremos substituir pelo respectivo valor da coluna 'sal'.

In [39]:
# substituir os zeros pelo valor da coluna 'sal'
emp_df['comn'] = emp_df[['sal','comn']].apply(lambda x: x[0] if int(x[1])==0 else x[1], axis=1)
emp_df

Unnamed: 0,empno,ename,job,mgr,hiredate,sal,comn,deptno,Tax
0,7369,SMITH,CLERK,7902,1980-12-17,800,800,20,100.0
1,7499,ALLEN,SALESMAN,7698,1981-02-20,1600,300,30,280.0
2,7521,WARD,SALESMAN,7698,1982-02-22,1250,500,30,156.25
3,7566,JONES,MANAGER,7839,1981-04-02,2975,2975,20,818.125
4,7654,MARTIN,SALESMAN,7698,1981-09-28,1250,1400,30,156.25
5,7698,BLAKE,MANAGER,7839,1981-05-01,2850,2850,30,783.75
6,7782,CLARK,MANAGER,7839,1981-06-09,2450,2450,10,551.25
7,7788,SCOTT,ANALYST,7566,1987-04-19,3000,3000,20,825.0
8,7839,KING,PRESIDENT,0,1981-11-17,5000,5000,10,1375.0
9,7844,TURNER,SALESMAN,7698,1981-09-08,1500,1500,30,262.5


Agora, com os dados limpos calculamos a porcentagem

In [41]:
emp_df['comn_%'] = (emp_df['comn']/emp_df['sal'])*100
emp_df

Unnamed: 0,empno,ename,job,mgr,hiredate,sal,comn,deptno,Tax,comn_%
0,7369,SMITH,CLERK,7902,1980-12-17,800,800,20,100.0,100.0
1,7499,ALLEN,SALESMAN,7698,1981-02-20,1600,300,30,280.0,18.75
2,7521,WARD,SALESMAN,7698,1982-02-22,1250,500,30,156.25,40.0
3,7566,JONES,MANAGER,7839,1981-04-02,2975,2975,20,818.125,100.0
4,7654,MARTIN,SALESMAN,7698,1981-09-28,1250,1400,30,156.25,112.0
5,7698,BLAKE,MANAGER,7839,1981-05-01,2850,2850,30,783.75,100.0
6,7782,CLARK,MANAGER,7839,1981-06-09,2450,2450,10,551.25,100.0
7,7788,SCOTT,ANALYST,7566,1987-04-19,3000,3000,20,825.0,100.0
8,7839,KING,PRESIDENT,0,1981-11-17,5000,5000,10,1375.0,100.0
9,7844,TURNER,SALESMAN,7698,1981-09-08,1500,1500,30,262.5,100.0


Vamos combinar os conjuntos de dados de funcionários e seus respectivos departamentos. 

In [43]:
final = pd.merge(emp_df,dept_df,on='deptno',how='inner')
final

Unnamed: 0,empno,ename,job,mgr,hiredate,sal,comn,deptno,Tax,comn_%,dname,loc
0,7369,SMITH,CLERK,7902,1980-12-17,800,800,20,100.0,100.0,RESEARCH,DALLAS
1,7566,JONES,MANAGER,7839,1981-04-02,2975,2975,20,818.125,100.0,RESEARCH,DALLAS
2,7788,SCOTT,ANALYST,7566,1987-04-19,3000,3000,20,825.0,100.0,RESEARCH,DALLAS
3,7499,ALLEN,SALESMAN,7698,1981-02-20,1600,300,30,280.0,18.75,SALES,CHICAGO
4,7521,WARD,SALESMAN,7698,1982-02-22,1250,500,30,156.25,40.0,SALES,CHICAGO
5,7654,MARTIN,SALESMAN,7698,1981-09-28,1250,1400,30,156.25,112.0,SALES,CHICAGO
6,7698,BLAKE,MANAGER,7839,1981-05-01,2850,2850,30,783.75,100.0,SALES,CHICAGO
7,7844,TURNER,SALESMAN,7698,1981-09-08,1500,1500,30,262.5,100.0,SALES,CHICAGO
8,7782,CLARK,MANAGER,7839,1981-06-09,2450,2450,10,551.25,100.0,ACCOUNTING,NEW YORK
9,7839,KING,PRESIDENT,0,1981-11-17,5000,5000,10,1375.0,100.0,ACCOUNTING,NEW YORK


Podemos também manipular nomes de departamentos, apenas para obter mais limpeza.

In [44]:
dname_map={'RESEARCH':'R&D','SALES':'SALES','ACCOUNTING':'ACCT'}
final=final.replace({'dname':dname_map})
final

Unnamed: 0,empno,ename,job,mgr,hiredate,sal,comn,deptno,Tax,comn_%,dname,loc
0,7369,SMITH,CLERK,7902,1980-12-17,800,800,20,100.0,100.0,R&D,DALLAS
1,7566,JONES,MANAGER,7839,1981-04-02,2975,2975,20,818.125,100.0,R&D,DALLAS
2,7788,SCOTT,ANALYST,7566,1987-04-19,3000,3000,20,825.0,100.0,R&D,DALLAS
3,7499,ALLEN,SALESMAN,7698,1981-02-20,1600,300,30,280.0,18.75,SALES,CHICAGO
4,7521,WARD,SALESMAN,7698,1982-02-22,1250,500,30,156.25,40.0,SALES,CHICAGO
5,7654,MARTIN,SALESMAN,7698,1981-09-28,1250,1400,30,156.25,112.0,SALES,CHICAGO
6,7698,BLAKE,MANAGER,7839,1981-05-01,2850,2850,30,783.75,100.0,SALES,CHICAGO
7,7844,TURNER,SALESMAN,7698,1981-09-08,1500,1500,30,262.5,100.0,SALES,CHICAGO
8,7782,CLARK,MANAGER,7839,1981-06-09,2450,2450,10,551.25,100.0,ACCT,NEW YORK
9,7839,KING,PRESIDENT,0,1981-11-17,5000,5000,10,1375.0,100.0,ACCT,NEW YORK


Criamos um último conjunto de dados limpo apagando as colunas indesejadas.

In [45]:
cleaned_df = final.drop(['mgr', 'comn', 'deptno', 'comn_%'],axis=1)
cleaned_df

Unnamed: 0,empno,ename,job,hiredate,sal,Tax,dname,loc
0,7369,SMITH,CLERK,1980-12-17,800,100.0,R&D,DALLAS
1,7566,JONES,MANAGER,1981-04-02,2975,818.125,R&D,DALLAS
2,7788,SCOTT,ANALYST,1987-04-19,3000,825.0,R&D,DALLAS
3,7499,ALLEN,SALESMAN,1981-02-20,1600,280.0,SALES,CHICAGO
4,7521,WARD,SALESMAN,1982-02-22,1250,156.25,SALES,CHICAGO
5,7654,MARTIN,SALESMAN,1981-09-28,1250,156.25,SALES,CHICAGO
6,7698,BLAKE,MANAGER,1981-05-01,2850,783.75,SALES,CHICAGO
7,7844,TURNER,SALESMAN,1981-09-08,1500,262.5,SALES,CHICAGO
8,7782,CLARK,MANAGER,1981-06-09,2450,551.25,ACCT,NEW YORK
9,7839,KING,PRESIDENT,1981-11-17,5000,1375.0,ACCT,NEW YORK


## 4. Carregando os Dados 

Por fim, agora que já temos uma tabela com dados limpos, realizamos o processo inverso. Podemos salvar a tabela carregando-a no banco e dados MySQL.

In [46]:
# criando a tabela no banco de bados a partir do DataFrame 'cleaned_df'
cleaned_df.to_sql('emp_dept_new',con=engine,if_exists='replace',index=False)

## 5. Conclusão

Criamos um simples pepiline ETL para uma empresa usando Python e MySQL.
Começamos criando tabelas com os dados dos funcionários e departamentos da empresa no MySQL. Depois extraímos os dados do banco de dados e realizamos a limpeza e transformação dos dados usando o Pandas. Por último carregamos a tabela com os dados limpos no MySQL para serem salvas no banco de dados.