In [1]:
import pandas as pd
import pyspark.pandas as ps
from pyspark.sql import SparkSession
import io
from config import *
from unidecode import unidecode
import re



Leitura do primeiro Arquivo

In [2]:
try:
    cargos = pd.read_csv('C:/repositorio/desafionst/bronze/BaseCargos.csv', delimiter=';', low_memory=False)
except pd.errors.ParserError as e:
    print(f"Erro na leitura do arquivo: {e}")

Erro na leitura do arquivo: Error tokenizing data. C error: Expected 8 fields in line 10, saw 9



Houve um erro de leitura do arquivo então, verifiquei o erro e ajustei para leitura.

In [10]:
#Como tentei abrir com o Pandas e deu erro, aqui eu utilizei a biblioteca io para ler o conteúdo e substituir o erro que era uma linha contendo ";" duas vezes e substitui para uma só
with open('C:/repositorio/desafionst/bronze/BaseCargos.csv', 'r', encoding='utf-8') as file:
    content = file.read()

content = content.replace(';;', ';')

In [11]:
#Aqui já acessando o arquivo ajustado(Utilizei a io.StringIO pois o conteúdo com o replace estava na memória por causa da substituição) 
try:
    df_cargos = pd.read_csv(io.StringIO(content), sep=';')
except pd.errors.ParserError as e:
    print(f"Erro na leitura do arquivo: {e}")

In [5]:
df_cargos

Unnamed: 0,Cargo,Nível,Área,COD Área,COD Nível,Quadro,Bonus,Contratacao
0,OPV,Diretor,Operações,JAJ,JE,Efetivo,S,Diretoria
1,LOI,Estagiário,Logísitca,EDE,JA,Efetivo,N,Gerente
2,ADI,Estagiário,Administrativo,BAC,JA,Efetivo,N,Gerente
3,ADII,Analista,Administrativo,BAC,DB,Terceiro,N,RH
4,OPII,Analista,Operações,JAJ,DB,Terceiro,N,RH
5,FIV,Diretor,Financeiro,CBB,JE,Efetivo,S,Diretoria
6,FIII,Analista,Financeiro,CBB,DB,Terceiro,N,RH
7,ADIII,Coordenador,Administrativo,BAC,DB,Terceiro,N,RH
8,LOIV,Gerente,Logísitca,EDE,ID,Efetivo,S,Diretoria
9,FII,Estagiário,Financeiro,CBB,JA,Efetivo,N,Gerente


Visualizei que na linha "18", o registro da coluna "Área" não está de acordo, então foi feito o ajuste

In [12]:
df_cargos.loc[df_cargos['Cargo'] == 'OPI', 'Área'] = df_cargos.loc[df_cargos['Cargo'] == 'OPI', 'Área'].str.replace('@@@Operações', 'Operações')
df_cargos

Unnamed: 0,Cargo,Nível,Área,COD Área,COD Nível,Quadro,Bonus,Contratacao
0,OPV,Diretor,Operações,JAJ,JE,Efetivo,S,Diretoria
1,LOI,Estagiário,Logísitca,EDE,JA,Efetivo,N,Gerente
2,ADI,Estagiário,Administrativo,BAC,JA,Efetivo,N,Gerente
3,ADII,Analista,Administrativo,BAC,DB,Terceiro,N,RH
4,OPII,Analista,Operações,JAJ,DB,Terceiro,N,RH
5,FIV,Diretor,Financeiro,CBB,JE,Efetivo,S,Diretoria
6,FIII,Analista,Financeiro,CBB,DB,Terceiro,N,RH
7,ADIII,Coordenador,Administrativo,BAC,DB,Terceiro,N,RH
8,LOIV,Gerente,Logísitca,EDE,ID,Efetivo,S,Diretoria
9,FII,Estagiário,Financeiro,CBB,JA,Efetivo,N,Gerente


In [13]:
df_cargos.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 25 entries, 0 to 24
Data columns (total 8 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   Cargo        25 non-null     object
 1   Nível        25 non-null     object
 2   Área         25 non-null     object
 3   COD Área     25 non-null     object
 4   COD Nível    25 non-null     object
 5   Quadro       25 non-null     object
 6   Bonus        25 non-null     object
 7   Contratacao  25 non-null     object
dtypes: object(8)
memory usage: 1.7+ KB


Nessa parte os nomes das colunas são ajsutados

In [14]:
def remover_caracteres_especiais(texto):
    texto_sem_acentos = unidecode(texto)
    return re.sub(r'[^a-zA-Z0-9\s]', '', texto_sem_acentos)

def formatar_nome_coluna(nome):
    nome_sem_especiais = remover_caracteres_especiais(nome)
    return nome_sem_especiais.lower().replace(' ', '_')

def formatar_nomes_colunas(df):
    novos_nomes = [formatar_nome_coluna(col) for col in df.columns]
    df.columns = novos_nomes

formatar_nomes_colunas(df_cargos)

In [15]:
df_cargos.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 25 entries, 0 to 24
Data columns (total 8 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   cargo        25 non-null     object
 1   nivel        25 non-null     object
 2   area         25 non-null     object
 3   cod_area     25 non-null     object
 4   cod_nivel    25 non-null     object
 5   quadro       25 non-null     object
 6   bonus        25 non-null     object
 7   contratacao  25 non-null     object
dtypes: object(8)
memory usage: 1.7+ KB


<P>Se houvesse mais algum tipo de limpeza, seria feita nesse processo anterior.<p>

<p> Para os outros arquivos irá ser feito da mesma forma, verificando se irá ter erro e fazendo suas correções e data clean. Abaixo eu vou deixar uma leitura e tratamento de cada arquivo por célula para a visualização ficar melhor. <p>

In [17]:
# Base CEP: Leitura e tratamento
try:
    # Tentar ler diretamente do arquivo
    cargos = pd.read_csv('bronze\BaseCEP.csv', delimiter='|', low_memory=False)
except pd.errors.ParserError as e:
    print(f"Erro na leitura do arquivo: {e} Iniciando Tratamento de dados")

# Em caso de erro, tentar outra abordagem
with open('bronze\BaseCEP.csv', 'r', encoding='utf-8') as file:
    content = file.read()
# Realizar algum tratamento em 'content' se necessário    
content = content.replace('||', '|')

# Ler usando io.StringIO
try:
    df_cep = pd.read_csv(io.StringIO(content), sep='|')
except pd.errors.ParserError as e:
    print(f"Erro na leitura do arquivo: {e}")

formatar_nomes_colunas(df_cep)
df_cep


Erro na leitura do arquivo: Error tokenizing data. C error: Expected 3 fields in line 10, saw 4
 Iniciando Tratamento de dados


Unnamed: 0,cep,estado,regiao
0,20125535,Mato Grosso,Centro - Oeste
1,25995770,Rio Grande do Norte,Nordeste
2,37278465,Sergipe,Nordeste
3,49897703,Mato Grosso,Centro - Oeste
4,30149335,Mato Grosso,Centro - Oeste
...,...,...,...
855,801388803,Bahia,Nordeste
856,131504555,Acre,Norte
857,560973100,Roraima,Norte
858,233130568,Alagoas,Nordeste


In [20]:
# Base Clientes: Leitura e tratamento
try:
    clients = pd.read_csv('bronze\BaseClientes.csv', delimiter=';', low_memory=False)
except pd.errors.ParserError as e:
    print(f"Erro na leitura do arquivo: {e} Iniciando Tratamento de dados")

with open('bronze\BaseClientes.csv', 'r', encoding='utf-8') as file:
    content = file.read()
content = content.replace(';;', ';')

try:
    df_clients = pd.read_csv(io.StringIO(content), sep=';')
except pd.errors.ParserError as e:
    print(f"Erro na leitura do arquivo: {e}")

formatar_nomes_colunas(df_clients)

df_clients

Erro na leitura do arquivo: Error tokenizing data. C error: Expected 12 fields in line 5, saw 13
 Iniciando Tratamento de dados


Unnamed: 0,cliente,valor_contrato_anual,quantidade_de_servicos,cargo_responsavel,cep,data_inicio_contrato,nivel_de_importancia,unnamed_7,unnamed_8,unnamed_9
0,Teixeira Gonçalves,54000,37,ADIII,743419711.0,15/03/2019,3.0,,,
1,Souza Santos,126000,12,FIIV,882467283.0,08/04/2019,1.0,,,
2,Emídio Alves,319500,23,ADII,295150983.0,09/02/2019,3.0,,,
3,Santos Costa,252000,64,FIIV,430169311.0,10/03/2019,2.0,,,
4,Do Monteiro,351000,19,OPV,839424894.0,31/01/2019,3.0,,,
...,...,...,...,...,...,...,...,...,...,...
320,Manoel Costa,369000,53,LOIV,801388803.0,26/11/2018,4.0,,,
321,Gomes Machado,238500,45,OPV,131504555.0,04/12/2018,2.0,,,
322,Alkindar Cardozo,351000,16,FIII,560973100.0,07/12/2018,1.0,,,
323,Pereira Fazenda,418500,28,FIII,233130568.0,02/02/2019,1.0,,,


In [21]:
# Base Funcionários: Leitura e tratamento
try:
    employers = pd.read_csv('bronze\BaseFuncionarios.csv', delimiter='|', low_memory=False)
except pd.errors.ParserError as e:
    print(f"Erro na leitura do arquivo: {e} Iniciando Tratamento de dados")

with open('bronze\BaseFuncionarios.csv', 'r', encoding='utf-8') as file:
    content = file.read()
content = content.replace('||', '|')

try:
    df_employers = pd.read_csv(io.StringIO(content), sep='|')
except pd.errors.ParserError as e:
    print(f"Erro na leitura do arquivo: {e}")


formatar_nomes_colunas(df_employers)
df_employers

Unnamed: 0,id_rh,rg,cpf,ramal,estado_civil,nome_completo,login,data_de_nascimento,cep,data_de_contratacao,...,cargo,bandeira,codigos,quantidade_de_acessos,ferias_acumuladas,ferias_remuneradas,horas_extras,valores_adicionais,id_de_pessoal,id_da_area
0,1,455550390,57507179051,6482,C,Gabriel Mesquita,gabriel.mesquita,25569,20125535,38792,...,OPV,,UAK1729-MCG,140,47,,97,,,10
1,2,732355385,50388536767,6177,C,João Haddad,joão.haddad,27145,25995770,39605,...,LOI,,QDI6697-POC,143,31,,166,,,545
2,3,943205737,59035293914,7736,C,Amanda Marques Ribeiro,amanda.ribeiro,32880,37278465,39450,...,ADI,,AJB6796-ZAR,110,60,,188,,,213
3,4,968298499,51397043200,5238,C,Guilherme Nunez,guilherme.nunez,34096,49897703,40666,...,ADII,,UXE5742-IZD,101,41,,176,,,213
4,5,332164465,1296878897,5182,C,Adelino Gomes,adelino.gomes,27118,30149335,36601,...,ADII,,OKI6642-TSV,35,9,,121,,,213
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
534,535,195532101,2439810843,5239,C,Bernardo da cunha,bernardo.cunha,29034,45499870,37330,...,LOII,,CKP3287-UEK,55,57,,88,,,545
535,536,486535229,4764050200,3572,C,Julia Novaes Silva,julia.silva,33916,21517794,41578,...,COII,,XWA4609-XHL,139,41,,44,,,11
536,537,981279157,54115282873,1376,S,Arthur Bispo,arthur.bispo,34170,34689811,40740,...,COII,,SEA1242-VQT,60,35,,4,,,11
537,538,464550454,16126495226,6269,S,Raianne Brum,raianne.brum,32295,31097240,41496,...,FIV,,QLF8561-NLL,56,44,,114,,,322


In [22]:
# Base Nível: Leitura e tratamento
try:
    
    employers = pd.read_csv('bronze\BaseNível.csv', delimiter='%', low_memory=False)
except pd.errors.ParserError as e:
    print(f"Erro na leitura do arquivo: {e} Iniciando Tratamento de dados")


with open('bronze\BaseNível.csv', 'r', encoding='utf-8') as file:
    content = file.read()


content = content.replace('%%', '%')


try:
    df_baselevels = pd.read_csv(io.StringIO(content), sep='%')
except pd.errors.ParserError as e:
    print(f"Erro na leitura do arquivo: {e}")

formatar_nomes_colunas(df_baselevels)
df_baselevels

Unnamed: 0,nivel,descricao_nivel,tempo_no_nivel,plano_de_saude,plano_odontologico,setor_responsavel,plano_de_carreira
0,JA,Estagiário,1,,,,
1,,Sim,Não,RH Universidade,Sim,,
2,DB,Analista,4,,,,
3,Sim,Não,RH Empresas,Sim,,,
4,GC,Coordenador,5,,,,
5,Sim,Sim,RH Empresas,Sim,,,
6,ID,Gerente,10,,,,
7,Sim,Sim,RH Empresas,Não,,,
8,JE,Diretor,10,,,,
9,Sim,Sim,RH Headhunter,Não,,,


In [23]:
# Base PQ: Leitura e tratamento
try:
    employers = pd.read_csv('bronze\BasePQ.csv', delimiter=';', low_memory=False)
except pd.errors.ParserError as e:
    print(f"Erro na leitura do arquivo: {e} Iniciando Tratamento de dados")

with open('bronze\BasePQ.csv', 'r', encoding='utf-8') as file:
    content = file.read()
content = content.replace(';', ';')

try:
    df_basepq = pd.read_csv(io.StringIO(content), sep=';')
except pd.errors.ParserError as e:
    print(f"Erro na leitura do arquivo: {e}")

formatar_nomes_colunas(df_basepq)
df_basepq

Unnamed: 0,id_rh,rg,cpf,ramal,estado_civil,nome_completo,login,data_de_nascimento,cep,data_de_contratacao,...,bonus,contratacao,unnamed_50,nivel1,descricao_nivel,tempo_no_nivel,plano_de_saude,plano_odontologico,setor_responsavel,plano_de_carreira
0,1,455550390,57507179051,6482,C,Gabriel Mesquita,gabriel.mesquita,25569,20125535,38792,...,S,Diretoria,,JA,Estagiário,1.0,Sim,Não,RH Universidade,Sim
1,2,732355385,50388536767,6177,C,João Haddad,joão.haddad,27145,25995770,39605,...,N,Gerente,,DB,Analista,4.0,Sim,Não,RH Empresas,Sim
2,3,943205737,59035293914,7736,C,Amanda Marques Ribeiro,amanda.ribeiro,32880,37278465,39450,...,N,Gerente,,GC,Coordenador,5.0,Sim,Sim,RH Empresas,Sim
3,4,968298499,51397043200,5238,C,Guilherme Nunez,guilherme.nunez,34096,49897703,40666,...,N,RH,,ID,Gerente,10.0,Sim,Sim,RH Empresas,Não
4,5,332164465,1296878897,5182,C,Adelino Gomes,adelino.gomes,27118,30149335,36601,...,N,RH,,JE,Diretor,10.0,Sim,Sim,RH Headhunter,Não
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
534,535,195532101,2439810843,5239,C,Bernardo da cunha,bernardo.cunha,29034,45499870,37330,...,,,,,,,,,,
535,536,486535229,4764050200,3572,C,Julia Novaes Silva,julia.silva,33916,21517794,41578,...,,,,,,,,,,
536,537,981279157,54115282873,1376,S,Arthur Bispo,arthur.bispo,34170,34689811,40740,...,,,,,,,,,,
537,538,464550454,16126495226,6269,S,Raianne Brum,raianne.brum,32295,31097240,41496,...,,,,,,,,,,


Todos os arquivos lidos, e feito alguns tratamentos agora irão ser colocados na próxima camada Silver e transferidos para Parquet. Fazendo isso a performance e o custo de armazenamento irão ser otimizados. Depois irão ser feitos outros tipos de tratamentos como verificação de tipos de dados, null values e os ajustes necessários

In [14]:
# Salva os dados na camada silver como Parquet
try:
    df_cargos.to_parquet(r'silver\cargos.parquet', index=False)
    print("Dados transferidos para a camada silver com sucesso.")
except Exception as e:
    print(f"Erro ao salvar dados na camada silver: {e}")

try:
    df_cep.to_parquet(r'silver\cep.parquet', index=False)
    print("Dados transferidos para a camada silver com sucesso.")
except Exception as e:
    print(f"Erro ao salvar dados na camada silver: {e}")

try:
    df_clients.to_parquet(r'silver\clients.parquet', index=False)
    print("Dados transferidos para a camada silver com sucesso.")
except Exception as e:
    print(f"Erro ao salvar dados na camada silver: {e}")

try:
    df_employers.to_parquet(r'silver\employers.parquet', index=False)
    print("Dados transferidos para a camada silver com sucesso.")
except Exception as e:
    print(f"Erro ao salvar dados na camada silver: {e}")

try:
    df_baselevels.to_parquet(r'silver\baselevels.parquet', index=False)
    print("Dados transferidos para a camada silver com sucesso.")
except Exception as e:
    print(f"Erro ao salvar dados na camada silver: {e}")

try:
    df_basepq.to_parquet(r'silver\basepq.parquet', index=False)
    print("Dados transferidos para a camada silver com sucesso.")
except Exception as e:
    print(f"Erro ao salvar dados na camada silver: {e}")

Dados transferidos para a camada silver com sucesso.
Dados transferidos para a camada silver com sucesso.
Dados transferidos para a camada silver com sucesso.
Dados transferidos para a camada silver com sucesso.
Dados transferidos para a camada silver com sucesso.
Dados transferidos para a camada silver com sucesso.


Agora utilizando o PySpark vamos fazer mais alguns testes no dados e melhorá-los ainda mais

In [15]:
# Crie uma SparkSession
spark = SparkSession.builder.getOrCreate()

In [22]:
df_cargos_parquet = spark.read.parquet('silver\cargos.parquet')

df_cargos_parquet.show()

+-----+-----------+--------------+--------+---------+--------+-----+-----------+
|Cargo|      Nível|          Área|COD Área|COD Nível|  Quadro|Bonus|Contratacao|
+-----+-----------+--------------+--------+---------+--------+-----+-----------+
|  OPV|    Diretor|     Operações|     JAJ|       JE| Efetivo|    S|  Diretoria|
|  LOI| Estagiário|     Logísitca|     EDE|       JA| Efetivo|    N|    Gerente|
|  ADI| Estagiário|Administrativo|     BAC|       JA| Efetivo|    N|    Gerente|
| ADII|   Analista|Administrativo|     BAC|       DB|Terceiro|    N|         RH|
| OPII|   Analista|     Operações|     JAJ|       DB|Terceiro|    N|         RH|
|  FIV|    Diretor|    Financeiro|     CBB|       JE| Efetivo|    S|  Diretoria|
| FIII|   Analista|    Financeiro|     CBB|       DB|Terceiro|    N|         RH|
|ADIII|Coordenador|Administrativo|     BAC|       DB|Terceiro|    N|         RH|
| LOIV|    Gerente|     Logísitca|     EDE|       ID| Efetivo|    S|  Diretoria|
|  FII| Estagiário|    Finan

In [30]:
df_cargos_parquet.printSchema()

root
 |-- Cargo: string (nullable = true)
 |-- Nível: string (nullable = true)
 |-- Área: string (nullable = true)
 |-- COD Área: string (nullable = true)
 |-- COD Nível: string (nullable = true)
 |-- Quadro: string (nullable = true)
 |-- Bonus: string (nullable = true)
 |-- Contratacao: string (nullable = true)



In [32]:
from pyspark.sql.functions import col

In [28]:
# Verifique se há duplicatas
num_lines_before = df_cargos_parquet.count()
print(num_lines_before)

# Remova as duplicatas
df_cargos_sem_duplicatas = df_cargos_parquet.dropDuplicates()

# Verifique o número de linhas após a remoção de duplicatas
num_lines_after = df_cargos_sem_duplicatas.count()
print(num_lines_after)


25
25
