# eEDB-011-2024-3

## Atividade 3 – Ingestão e ETL com linguagem de programação (Python + Spark)

- Utilizar linguagem de programação Python para ingestão e tratamento de dados. Todo o processo deve ser realizado via Spark.
    - Pacotes adicionais podem ser utilizados
    - Tratamento de dados não deve ser realizado via SQL
- Realizar a ingestão de todas as base de dados em um banco de dados relacional open source. Pode ser utilizado qualquer banco de dado sendo algumas sugestões:
    -  MySQL
    -  Postgre
    - ClickHouse
- Gerar uma tabela final com os dados tratados e unidos.
    - O tratamento de dados deve ser realizado através da linguagem de programação Python + Spark
- Adicionar as seguintes camadas de processamento, dentro do próprio banco de dados ou em disco local. A Camada Delivery deve
obrigatoriamente ter estar também no formato de uma tabela final dentro do banco de dados relacional:
    - RAW – formato dos dados livre
    - Trusted – formato de dados em Parquet ou ORC or AVRO (indicado Parquet)
    - Delivery– formato de dados em Parquet ou ORC or AVRO (indicado Parquet)

- **Grupo 02**:
    - Aline Bini
    - Ana Lívia Franco
    - Ana Priss
    - João Squinelato
    - Marcelo Pena
    - Thais Siqueira

- [Github](https://github.com/Squinelato/eEDB-011-2024-3 "eEDB-011-2024-3")

```Ingestão De Dados | Agosto 2024```

## To Do

- raw 
- trusted
- delivery

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lpad, col, lpad, concat, sha1, regexp_replace, udf, lower, lit, when
from pyspark.sql.types import StringType, FloatType, IntegerType, StructType, StructField, ArrayType
from unidecode import unidecode

import findspark
import os

In [3]:
findspark.init()

In [4]:
spark = SparkSession.builder \
    .master('local') \
    .appName('Basic ETL') \
    .config('spark.executor.timeout', "1200s") \
    .config('spark.sql.broadcastTimeout', '1200s') \
    .config('spark.rpc.askTimeout', '600s') \
    .config('spark.executor.heartbeatInterval', '120s') \
    .config('spark.network.timeout', '1200s') \
    .getOrCreate()

In [5]:
spark

---
## **Raw**

### **Banks file**

In [46]:
banks_csv_path = '../Fonte de Dados/Bancos/EnquadramentoInicia_v2.tsv'
rwzd_bank = spark.read.csv(banks_csv_path, sep='\t', encoding='utf8', header=True)
rwzd_bank.show(100, truncate=False)

+--------+--------+--------------------------------------------------------------------------------------------------------------------------+
|Segmento|CNPJ    |Nome                                                                                                                      |
+--------+--------+--------------------------------------------------------------------------------------------------------------------------+
|S1      |0       |BANCO DO BRASIL - PRUDENCIAL                                                                                              |
|S1      |60746948|BRADESCO - PRUDENCIAL                                                                                                     |
|S1      |30306294|BTG PACTUAL - PRUDENCIAL                                                                                                  |
|S1      |360305  |CAIXA ECONOMICA FEDERAL - PRUDENCIAL                                                                                      |

Analisando o esquema dos dados

In [47]:
rwzd_bank.printSchema()

root
 |-- Segmento: string (nullable = true)
 |-- CNPJ: string (nullable = true)
 |-- Nome: string (nullable = true)



Contando a quantidade de linhas

In [48]:
rwzd_bank.count()

1474

Salvando dados na camada _raw_ no formato parquet

In [49]:
# raw_bank_path = './raw/bank/'
# rwzd_bank.write.parquet(raw_bank_path, mode="append")

### **Employees file**

Localizando todos os arquivos contendo dados de empregados

In [50]:
employee_dir = '../Fonte de Dados/Empregados/'
employee_files = os.listdir(employee_dir)
employee_paths = list(map(lambda file: os.path.join(employee_dir, file), employee_files))[1]
employee_paths

'../Fonte de Dados/Empregados/glassdoor_consolidado_join_match_v2.csv'

Lendo todos os arquivos de empregados como um único conjunto de dados

In [51]:
rwzd_employee = spark.read.csv(employee_paths, sep='|', encoding='utf8', header=True)

Removendo duplicatas com base no nome e segmento do banco

In [52]:
rwzd_employee = rwzd_employee.dropDuplicates(['Nome', 'Segmento'])

Analisando o esquema dos dados

In [53]:
rwzd_employee.printSchema()

root
 |-- employer_name: string (nullable = true)
 |-- reviews_count: string (nullable = true)
 |-- culture_count: string (nullable = true)
 |-- salaries_count: string (nullable = true)
 |-- benefits_count: string (nullable = true)
 |-- employer-website: string (nullable = true)
 |-- employer-headquarters: string (nullable = true)
 |-- employer-founded: string (nullable = true)
 |-- employer-industry: string (nullable = true)
 |-- employer-revenue: string (nullable = true)
 |-- url: string (nullable = true)
 |-- Geral: string (nullable = true)
 |-- Cultura e valores: string (nullable = true)
 |-- Diversidade e inclusão: string (nullable = true)
 |-- Qualidade de vida: string (nullable = true)
 |-- Alta liderança: string (nullable = true)
 |-- Remuneração e benefícios: string (nullable = true)
 |-- Oportunidades de carreira: string (nullable = true)
 |-- Recomendam para outras pessoas(%): string (nullable = true)
 |-- Perspectiva positiva da empresa(%): string (nullable = true)
 |-- Seg

Contando a quantidade de linhas

In [54]:
rwzd_employee.count()

32

Salvando dados na camada _raw_ no formato parquet

In [55]:
# raw_employee_path = './raw/employee/'
# rwzd_employee.write.parquet(raw_employee_path, mode="append")

### **Claims**

Localizando todos os arquivos contendo dados de reclamações

In [56]:
claim_dir = '../Fonte de Dados/Reclamações/'
claim_files = os.listdir(claim_dir)
claim_paths = list(map(lambda file: os.path.join(claim_dir, file), claim_files))
claim_paths

['../Fonte de Dados/Reclamações/2021_tri_01.csv',
 '../Fonte de Dados/Reclamações/2021_tri_02.csv',
 '../Fonte de Dados/Reclamações/2021_tri_03.csv',
 '../Fonte de Dados/Reclamações/2021_tri_04.csv',
 '../Fonte de Dados/Reclamações/2022_tri_01.csv',
 '../Fonte de Dados/Reclamações/2022_tri_02_nao_ha_dados.csv',
 '../Fonte de Dados/Reclamações/2022_tri_03.csv',
 '../Fonte de Dados/Reclamações/2022_tri_04.csv']

In [57]:
rwzd_claim = spark.read.csv(claim_paths, sep=';', encoding='latin1', header=True)
rwzd_claim.show()

+----+---------+--------------------+----------------+--------+----------------------+------+-----------------------------------------------+--------------------------------------------+---------------------------------------+-------------------------------+----------------------------------------+----------------------------+----------------------------+----+
| Ano|Trimestre|           Categoria|            Tipo| CNPJ IF|Instituição financeira|Índice|Quantidade de reclamações reguladas procedentes|Quantidade de reclamações reguladas - outras|Quantidade de reclamações não reguladas|Quantidade total de reclamações|Quantidade total de clientes  CCS e SCR|Quantidade de clientes  CCS|Quantidade de clientes  SCR|_c14|
+----+---------+--------------------+----------------+--------+----------------------+------+-----------------------------------------------+--------------------------------------------+---------------------------------------+-------------------------------+----------------

Analisando o esquema dos dados

In [58]:
rwzd_claim.printSchema()

root
 |-- Ano: string (nullable = true)
 |-- Trimestre: string (nullable = true)
 |-- Categoria: string (nullable = true)
 |-- Tipo: string (nullable = true)
 |-- CNPJ IF: string (nullable = true)
 |-- Instituição financeira: string (nullable = true)
 |-- Índice: string (nullable = true)
 |-- Quantidade de reclamações reguladas procedentes: string (nullable = true)
 |-- Quantidade de reclamações reguladas - outras: string (nullable = true)
 |-- Quantidade de reclamações não reguladas: string (nullable = true)
 |-- Quantidade total de reclamações: string (nullable = true)
 |-- Quantidade total de clientes  CCS e SCR: string (nullable = true)
 |-- Quantidade de clientes  CCS: string (nullable = true)
 |-- Quantidade de clientes  SCR: string (nullable = true)
 |-- _c14: string (nullable = true)



Removendo coluna desnecessária

In [59]:
rwzd_claim = rwzd_claim.drop('_c14')

Contando a quantidade de linhas

In [60]:
rwzd_claim.count()

918

Salvando dados na camada _raw_ no formato parquet

In [61]:
# raw_claim_path = './raw/claim/'
# rwzd_claim.write.parquet(raw_claim_path, mode="append")

---
## **Trusted**

In [62]:
def unicode_normalizer(word: str) -> str:
    return unidecode(word)

unaccent = udf(unicode_normalizer, StringType())

### **Banks**

In [63]:
trzd_bank = rwzd_bank

Aplicando algumas transformações com o intuito de melhorar a qualidade dos dados:

1 - Renomeando colunas do _dataframe_ para inglês e no formato _snake case_

In [64]:
trzd_bank = trzd_bank.withColumnsRenamed({
    'Segmento': 'segment',
    'CNPJ': 'cnpj',
    'Nome': 'financial_institution_name'
})

2 - Para que os dados da coluna _cnpj_ estivessem de acordo com seu padrão, os valores incompletos receberam numerais zeros à esquerda até completar 8 dígitos

In [65]:
trzd_bank = trzd_bank \
    .withColumn('cnpj', when(col('cnpj') == '', lit(None).cast(StringType())) \
    .otherwise(lpad(col('cnpj'), 8, '0')))

In [66]:
trzd_bank = trzd_bank.withColumn('sk_cnpj_segment', sha1(concat(col('cnpj'), col('segment'))))

In [67]:
trzd_bank.show(100, truncate=False)

+-------+--------+--------------------------------------------------------------------------------------------------------------------------+----------------------------------------+
|segment|cnpj    |financial_institution_name                                                                                                |sk_cnpj_segment                         |
+-------+--------+--------------------------------------------------------------------------------------------------------------------------+----------------------------------------+
|S1     |00000000|BANCO DO BRASIL - PRUDENCIAL                                                                                              |d9be4941c63af670d63086dcb0849a997e062a64|
|S1     |60746948|BRADESCO - PRUDENCIAL                                                                                                     |2f59bebc86f6b4dd5628043cac240c6e6a6f651d|
|S1     |30306294|BTG PACTUAL - PRUDENCIAL                                           

In [31]:
## salvar em parquet

### **Employees**

In [68]:
trzd_employee = rwzd_employee

In [69]:
trzd_employee = trzd_employee.withColumnsRenamed({
    'employer-website': 'employer_website',
    'employer-headquarters': 'employer_headquarters',
    'employer-founded': 'employer_founded',
    'employer-industry': 'employer_industry',
    'employer-revenue': 'employer_revenue',
    'Geral': 'general_score',
    'Cultura e valores': 'culture_values_score',
    'Diversidade e inclusão': 'diversity_inclusion_score',
    'Qualidade de vida': 'life_quality_score',
    'Alta liderança': 'senior_leadership_score',
    'Remuneração e benefícios': 'compensation_benefits_score',
    'Oportunidades de carreira': 'career_opportunities_score',
    'Recomendam para outras pessoas(%)': 'recommendation_score',
    'Perspectiva positiva da empresa(%)': 'company_positive_score',
    'Segmento': 'segment',
    'Nome': 'financial_institution_name'
})

In [70]:
trzd_employee = trzd_employee.withColumn('employer_founded', regexp_replace(col('employer_founded'), r'\..*', ''))

In [71]:
trzd_employee = trzd_employee \
    .withColumn('sk_financial_institution_name', unaccent(col('financial_institution_name'))) \
    .withColumn('sk_financial_institution_name', lower(col('sk_financial_institution_name'))) \
    .withColumn('sk_financial_institution_name', regexp_replace(col('sk_financial_institution_name'), r' ', '')) \
    .withColumn('sk_financial_institution_name', sha1(col('sk_financial_institution_name')))

In [72]:
trzd_employee = trzd_employee \
    .withColumn('reviews_count', col('reviews_count').cast(IntegerType())) \
    .withColumn('culture_count', col('culture_count').cast(IntegerType())) \
    .withColumn('salaries_count', col('salaries_count').cast(IntegerType())) \
    .withColumn('benefits_count', col('benefits_count').cast(IntegerType())) \
    .withColumn('general_score', col('general_score').cast(FloatType())) \
    .withColumn('culture_values_score', col('culture_values_score').cast(FloatType())) \
    .withColumn('diversity_inclusion_score', col('diversity_inclusion_score').cast(FloatType())) \
    .withColumn('life_quality_score', col('life_quality_score').cast(FloatType())) \
    .withColumn('senior_leadership_score', col('senior_leadership_score').cast(FloatType())) \
    .withColumn('compensation_benefits_score', col('compensation_benefits_score').cast(FloatType())) \
    .withColumn('career_opportunities_score', col('career_opportunities_score').cast(FloatType())) \
    .withColumn('recommendation_score', col('recommendation_score').cast(FloatType())) \
    .withColumn('company_positive_score', col('company_positive_score').cast(FloatType())) \
    .withColumn('match_percent', col('match_percent').cast(IntegerType()))

In [73]:
trzd_employee.show(100, truncate=False)

+-------------------------------+-------------+-------------+--------------+--------------+---------------------------------------------------+---------------------------+----------------+-----------------------------------------------------------------------------------+---------------------------+-----------------------------------------------------------------------------------------------------------------------+-------------+--------------------+-------------------------+------------------+-----------------------+---------------------------+--------------------------+--------------------+----------------------+-------+--------------------------------------------------+-------------+----------------------------------------+
|employer_name                  |reviews_count|culture_count|salaries_count|benefits_count|employer_website                                   |employer_headquarters      |employer_founded|employer_industry                                                         

### **Claims**

In [94]:
trzd_claim = rwzd_claim

In [95]:
trzd_claim = trzd_claim.withColumnsRenamed({
    'Ano': 'year_claim',
    'Trimestre': 'quarter_claim',
    'Categoria': 'category',
    'Tipo': 'bank_type',
    'CNPJ IF': 'cnpj',
    'Instituição financeira': 'financial_institution_name',
    'Índice': 'bank_index',
    'Quantidade de reclamações reguladas procedentes': 'number_of_regulated_proceeding_complaints',
    'Quantidade de reclamações reguladas - outras': 'number_of_regulated_other_complaints',
    'Quantidade de reclamações não reguladas': 'number_of_unregulated_complaints',
    'Quantidade total de reclamações': 'total_number_of_complaints',
    'Quantidade total de clientes  CCS e SCR': 'total_number_of_ccs_and_scr_customers',
    'Quantidade de clientes  CCS': 'number_of_ccs_customers',
    'Quantidade de clientes  SCR': 'number_of_scr_customers'
})

In [96]:
trzd_claim = trzd_claim \
    .withColumn('bank_index', regexp_replace(col('bank_index'), r'\.', '')) \
    .withColumn('bank_index', regexp_replace(col('bank_index'), r',', '.')) \
    .withColumn('bank_index', regexp_replace(col('bank_index'), r' ', '')) \
    .withColumn("bank_index", col("bank_index").cast(FloatType()))

In [97]:
trzd_claim = trzd_claim.withColumn('quarter_claim', regexp_replace(col('quarter_claim'), 'º', ''))

In [98]:
trzd_claim = trzd_claim \
    .withColumn('cnpj', when(col('cnpj') == ' ', lit(None).cast(StringType())) \
    .otherwise(lpad(col('cnpj'), 8, '0')))

In [99]:
trzd_claim = trzd_claim \
    .withColumn('total_number_of_ccs_and_scr_customers', regexp_replace(col('total_number_of_ccs_and_scr_customers'), r' ', '')) \
    .withColumn("total_number_of_ccs_and_scr_customers", col("total_number_of_ccs_and_scr_customers").cast(IntegerType()))

trzd_claim = trzd_claim \
    .withColumn('number_of_ccs_customers', regexp_replace(col('number_of_ccs_customers'), r' ', '')) \
    .withColumn("number_of_ccs_customers", col("number_of_ccs_customers").cast(IntegerType()))

trzd_claim = trzd_claim \
    .withColumn('number_of_scr_customers', regexp_replace(col('number_of_scr_customers'), r' ', '')) \
    .withColumn("number_of_scr_customers", col("number_of_scr_customers").cast(IntegerType()))

In [100]:
trzd_claim = trzd_claim \
    .withColumn('financial_institution_name', regexp_replace(col('financial_institution_name'), r' \(conglomerado\)', ''))

In [101]:
bank_names = {
    'BB': 'BANCO DO BRASIL',
    'DAYCOVAL': 'BANCO DAYCOVAL S.A',
    'DEUTSCHE BANK S.A. - BANCO ALEMAO': 'DEUTSCHE',
    'BANCO SUMITOMO MITSUI BRASILEIRO S.A.': 'BANCO SUMITOMO MITSUI BRASIL S.A.'
}

trzd_claim = trzd_claim.na.replace(to_replace=bank_names, subset='financial_institution_name')

In [102]:
trzd_claim = trzd_claim \
    .withColumn('sk_financial_institution_name', unaccent(col('financial_institution_name'))) \
    .withColumn('sk_financial_institution_name', lower(col('sk_financial_institution_name'))) \
    .withColumn('sk_financial_institution_name', regexp_replace(col('sk_financial_institution_name'), r' ', '')) \
    .withColumn('sk_financial_institution_name', sha1(col('sk_financial_institution_name')))

In [103]:
trzd_claim = trzd_claim \
    .withColumn('quarter_claim', col('quarter_claim').cast(IntegerType())) \
    .withColumn('number_of_regulated_proceeding_complaints', col('number_of_regulated_proceeding_complaints').cast(IntegerType())) \
    .withColumn('number_of_regulated_other_complaints', col('number_of_regulated_other_complaints').cast(IntegerType())) \
    .withColumn('number_of_unregulated_complaints', col('number_of_unregulated_complaints').cast(IntegerType())) \
    .withColumn('total_number_of_complaints', col('total_number_of_complaints').cast(IntegerType()))
    

In [104]:
trzd_claim.printSchema()

root
 |-- year_claim: string (nullable = true)
 |-- quarter_claim: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- bank_type: string (nullable = true)
 |-- cnpj: string (nullable = true)
 |-- financial_institution_name: string (nullable = true)
 |-- bank_index: float (nullable = true)
 |-- number_of_regulated_proceeding_complaints: integer (nullable = true)
 |-- number_of_regulated_other_complaints: integer (nullable = true)
 |-- number_of_unregulated_complaints: integer (nullable = true)
 |-- total_number_of_complaints: integer (nullable = true)
 |-- total_number_of_ccs_and_scr_customers: integer (nullable = true)
 |-- number_of_ccs_customers: integer (nullable = true)
 |-- number_of_scr_customers: integer (nullable = true)
 |-- sk_financial_institution_name: string (nullable = true)



In [105]:
trzd_claim.show(100, truncate=False)

+----------+-------------+--------------------------------------------------------+----------------+--------+-----------------------------------------------------------------+----------+-----------------------------------------+------------------------------------+--------------------------------+--------------------------+-------------------------------------+-----------------------+-----------------------+----------------------------------------+
|year_claim|quarter_claim|category                                                |bank_type       |cnpj    |financial_institution_name                                       |bank_index|number_of_regulated_proceeding_complaints|number_of_regulated_other_complaints|number_of_unregulated_complaints|total_number_of_complaints|total_number_of_ccs_and_scr_customers|number_of_ccs_customers|number_of_scr_customers|sk_financial_institution_name           |
+----------+-------------+--------------------------------------------------------+-----------

In [None]:
## salvar em parquet

---
## **Delivery**

In [None]:
# trzd_bank_path = './trusted/bank'
# banks = spark.read.parquet(trzd_bank_path)

# trzd_employee_path = './trusted/employee'
# employee = spark.read.parquett(trzd_employee_path)

# trzd_claim_path = './trusted/claim'
# claims = spark.read.parquet(trzd_claim_path)

In [114]:
banks = trzd_bank

employee = trzd_employee

claims = trzd_claim

In [115]:
banks = banks.drop('financial_institution_name')

banks_claims = claims.join(banks, on='cnpj', how='inner')

banks_claims.show(2, truncate=False)

+--------+----------+-------------+---------------------------+----------------+--------------------------------+----------+-----------------------------------------+------------------------------------+--------------------------------+--------------------------+-------------------------------------+-----------------------+-----------------------+----------------------------------------+-------+----------------------------------------+
|cnpj    |year_claim|quarter_claim|category                   |bank_type       |financial_institution_name      |bank_index|number_of_regulated_proceeding_complaints|number_of_regulated_other_complaints|number_of_unregulated_complaints|total_number_of_complaints|total_number_of_ccs_and_scr_customers|number_of_ccs_customers|number_of_scr_customers|sk_financial_institution_name           |segment|sk_cnpj_segment                         |
+--------+----------+-------------+---------------------------+----------------+--------------------------------+-------

In [116]:
employee = employee.drop('financial_institution_name')

employee_claims = claims.join(employee, on='sk_financial_institution_name', how='inner')

employee_claims.show(2)

+-----------------------------+----------+-------------+--------------------+----------------+--------+--------------------------+----------+-----------------------------------------+------------------------------------+--------------------------------+--------------------------+-------------------------------------+-----------------------+-----------------------+--------------------+-------------+-------------+--------------+--------------+--------------------+---------------------+----------------+--------------------+--------------------+--------------------+-------------+--------------------+-------------------------+------------------+-----------------------+---------------------------+--------------------------+--------------------+----------------------+-------+-------------+
|sk_financial_institution_name|year_claim|quarter_claim|            category|       bank_type|    cnpj|financial_institution_name|bank_index|number_of_regulated_proceeding_complaints|number_of_regulated_oth

In [117]:
employee_claims = employee_claims.withColumn('sk_cnpj_segment', sha1(concat(col('cnpj'), col('segment'))))

In [123]:
columns_to_add_employee_claims = list(set(employee_claims.columns) - set(banks_claims.columns))
columns_to_add_employee_claims.append('sk_cnpj_segment')
columns_to_drop = list(set(employee_claims.columns) - set(columns_to_add_employee_claims))
columns_to_drop

['sk_financial_institution_name',
 'number_of_unregulated_complaints',
 'number_of_regulated_proceeding_complaints',
 'quarter_claim',
 'category',
 'segment',
 'year_claim',
 'total_number_of_complaints',
 'financial_institution_name',
 'bank_index',
 'number_of_regulated_other_complaints',
 'number_of_ccs_customers',
 'total_number_of_ccs_and_scr_customers',
 'cnpj',
 'number_of_scr_customers',
 'bank_type']

In [125]:
employee_claims = employee_claims.drop(*columns_to_drop)

dlzd_bank_employment_satisfaction = banks_claims.join(
    employee_claims, 
    on='sk_cnpj_segment', 
    how='inner'
)

In [128]:
dlzd_bank_employment_satisfaction = dlzd_bank_employment_satisfaction.drop_duplicates()

In [129]:
dlzd_bank_employment_satisfaction.count()

16

In [130]:
dlzd_bank_employment_satisfaction.show()

+--------------------+--------+----------+-------------+--------------------+----------------+--------------------------+----------+-----------------------------------------+------------------------------------+--------------------------------+--------------------------+-------------------------------------+-----------------------+-----------------------+-----------------------------+-------+--------------------+-------------+-------------+--------------+--------------+--------------------+---------------------+----------------+--------------------+--------------------+--------------------+-------------+--------------------+-------------------------+------------------+-----------------------+---------------------------+--------------------------+--------------------+----------------------+-------------+
|     sk_cnpj_segment|    cnpj|year_claim|quarter_claim|            category|       bank_type|financial_institution_name|bank_index|number_of_regulated_proceeding_complaints|number_of_r