In [None]:
from datetime import datetime
import matplotlib.pyplot as plt
import os
import re

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, when, isnull
from pyspark.sql.types import StringType, IntegerType

#### Setting variables

In [None]:
path_wd   = os.getcwd()
path_root = path_wd.rsplit("\\",0)[0]

path_raw = path_root + '\\raw\\'
path_stage = path_root + '\\stage\\'

path_raw_bancos = path_raw + 'bancos\\'
path_raw_empregados = path_raw + 'empregados\\'
path_raw_reclamacoes = path_raw + 'reclamacoes\\'

flist_bancos = os.listdir(path_raw_bancos)
flist_empregados = os.listdir(path_raw_empregados)
flist_reclamacoes = os.listdir(path_raw_reclamacoes)

#### Initialize SparkSession

In [None]:
spark = SparkSession.builder.appName("DataExtraction").getOrCreate()

#### 1. Data Extraction

1.1. Bancos

In [None]:
fname_bancos = path_raw_bancos + flist_bancos[0]

df_bancos = spark.read.option("delimiter", "\t").option("header", True).option("charset", "latin1").csv(fname_bancos)

1.2. Empregados

In [None]:
fname_empregados_01 = path_raw_empregados + flist_empregados[0]
fname_empregados_02 = path_raw_empregados + flist_empregados[1]

df_empregados_01 = spark.read.option("delimiter", "|").option("header", True).csv(fname_empregados_01)
df_empregados_02 = spark.read.option("delimiter", "|").option("header", True).csv(fname_empregados_02)

1.3. Reclamações

In [None]:
fname_reclamacoes_202101 = path_raw_reclamacoes + flist_reclamacoes[0]
fname_reclamacoes_202102 = path_raw_reclamacoes + flist_reclamacoes[1]
fname_reclamacoes_202103 = path_raw_reclamacoes + flist_reclamacoes[2]
fname_reclamacoes_202104 = path_raw_reclamacoes + flist_reclamacoes[3]
fname_reclamacoes_202201 = path_raw_reclamacoes + flist_reclamacoes[4]
fname_reclamacoes_202202 = path_raw_reclamacoes + flist_reclamacoes[5]
fname_reclamacoes_202203 = path_raw_reclamacoes + flist_reclamacoes[6]
fname_reclamacoes_202204 = path_raw_reclamacoes + flist_reclamacoes[7]

df_reclamacoes_202101 = spark.read.option("delimiter", ";").option("header", True).csv(fname_reclamacoes_202101)
df_reclamacoes_202102 = spark.read.option("delimiter", ";").option("header", True).csv(fname_reclamacoes_202102)
df_reclamacoes_202103 = spark.read.option("delimiter", ";").option("header", True).csv(fname_reclamacoes_202103)
df_reclamacoes_202104 = spark.read.option("delimiter", ";").option("header", True).csv(fname_reclamacoes_202104)
df_reclamacoes_202201 = spark.read.option("delimiter", ";").option("header", True).csv(fname_reclamacoes_202201)
df_reclamacoes_202202 = spark.read.option("delimiter", ";").option("header", True).csv(fname_reclamacoes_202202)
df_reclamacoes_202203 = spark.read.option("delimiter", ";").option("header", True).csv(fname_reclamacoes_202203)
df_reclamacoes_202204 = spark.read.option("delimiter", ";").option("header", True).csv(fname_reclamacoes_202204)

#### 2. Data Cleansing

In [None]:
# Limpar caracteres especiais
def clean_special_chars(df, column_name):
    cleaned_df = df.withColumn(column_name + "_cleaned", 
                               regexp_replace(col(column_name), '[^a-zA-Z0-9\s]', ''))
    return cleaned_df

#  Tratar nulos e vazios
def handle_nulls_and_empty(df, column_name):
    cleaned_df = df.withColumn(column_name + "_handled", 
                               when(isnull(col(column_name)) | (col(column_name) == ""), 
                                    None)
                                    .otherwise(col(column_name)))
    return cleaned_df

2.1. Bancos

In [None]:
df_bancos_cleaned = clean_special_chars(df_bancos, "Nome")
df_bancos_handled = handle_nulls_and_empty(df_bancos_cleaned, "Nome_cleaned")

df_bancos_final = df_bancos_handled.select("Segmento", "CNPJ", "Nome_cleaned_handled").withColumnRenamed("Nome_cleaned_handled", "Nome")
df_bancos_final.show()

In [None]:
# Output
# nome do arquivo
fname_bancos_output = fname_bancos.replace('.tsv', '_cleaned.csv')
fname_bancos_output_01 = fname_bancos_output.replace('\\', '/')

df_bancos_final.write.csv(fname_bancos_output_01, header=True, sep=";", mode="overwrite")

2.2. Empregados

2.3. Reclamações

#### 3. Data Transformation

In [None]:
# Padronizar nomes dos bancos

In [None]:
# spark.stop()