# Limpeza e tratamento dos dados [PySpark]

#### **Deletar** antes de subir no Github

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## 1 Importando csv como dataframe

Carregando bibliotecas necessárias

In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 55.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=200eaf7fd2ae41dda3c79899a6d3ba2c11d1f4ea9c1857ccc4f5ceeb4e484336
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, lit, col
from pyspark.sql.types import StringType, BooleanType, IntegerType, FloatType, DateType, StructField, StructType
from pyspark.sql.functions import *

In [4]:
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [5]:
import pandas as pd
import numpy as np

# caminho para salvamento dos arquivos
caminho = '/content/drive/MyDrive/Colab Notebooks/dados/'

Carregando as bases brutas extraídas

In [6]:
df_reclamações = spark.read.csv(caminho + '/df_reclamações.csv', encoding='latin1', header=True)
df_instituicoes = spark.read.csv(caminho + '/df_reclamações.csv', encoding='latin1', header=True)

df_tarifas = spark.read.csv(caminho + '/df_tarifas.csv', encoding='latin1', header=True)
df_servicos = spark.read.csv(caminho + '/df_tarifas.csv', encoding='latin1', header=True)

## 2 Data Wrangling

#### Limpando o ``` df_consolidado```



In [7]:
# Listando nomes atuais
df_reclamações.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- Ano: string (nullable = true)
 |-- Trimestre: string (nullable = true)
 |-- Categoria: string (nullable = true)
 |-- Tipo: string (nullable = true)
 |-- CNPJ IF: string (nullable = true)
 |-- Instituição financeira: string (nullable = true)
 |-- Índice: string (nullable = true)
 |-- Quantidade de reclamações reguladas procedentes: string (nullable = true)
 |-- Quantidade de reclamações reguladas - outras: string (nullable = true)
 |-- Quantidade de reclamações não reguladas: string (nullable = true)
 |-- Quantidade total de reclamações: string (nullable = true)
 |-- Quantidade total de clientes  CCS e SCR: string (nullable = true)
 |-- Quantidade de clientes  CCS: string (nullable = true)
 |-- Quantidade de clientes  SCR: string (nullable = true)
 |-- Unnamed: 14: string (nullable = true)



In [8]:
# Limpando a coluna Trimestre
df_reclamações = df_reclamações.withColumn('Trimestre', regexp_replace('Trimestre', 'º', ''))


In [9]:
# deletando Conglomerados
df_reclamações = df_reclamações.where(df_reclamações.Tipo != 'Conglomerado')

In [10]:
# criando chave calendario
df_reclamações = df_reclamações.select("*", concat(col("Ano"), col("Trimestre")).alias("ChaveCalendario"))

In [11]:
# renomeando colunas

df_reclamações = (
    df_reclamações
    .withColumnRenamed('CNPJ IF', 'CNPJ')
    .withColumnRenamed('Instituição financeira', 'InstituicaoFinanceira')
    .withColumnRenamed('Quantidade de reclamações reguladas procedentes', 'QuantidadeReclamacoesReguladasProcedentes')
    .withColumnRenamed('Quantidade de reclamações reguladas - outras', 'QuantidadeReclamacoesReguladasOutras')
    .withColumnRenamed('Quantidade de reclamações não reguladas', 'QuantidadeReclamacoesNaoReguladas')
    .withColumnRenamed('Quantidade total de reclamações', 'QuantidadeTotalReclamacoes')
    .withColumnRenamed('Quantidade total de clientes  CCS e SCR', 'QuantidadeTotalClientesCCSSCR')
    .withColumnRenamed('Quantidade de clientes  CCS', 'QuantidadeClientesCCS')
    .withColumnRenamed('Quantidade de clientes  SCR', 'QuantidadeClientesSCR')
)

In [12]:
# deletando colunas desnecessárias
df_reclamações = df_reclamações.drop(*['Unnamed: 14', '_c0', 'Índice', 'Ano', 'Trimestre', 'Categoria', 'Tipo'])

In [13]:
# Garantindo tipo correto de coluna
df_reclamações = (df_reclamações
                    .withColumn("CNPJ", df_reclamações["CNPJ"].cast('int'))
                    .withColumn("InstituicaoFinanceira", df_reclamações["InstituicaoFinanceira"].cast('int'))
                    .withColumn("QuantidadeReclamacoesReguladasProcedentes", df_reclamações["QuantidadeReclamacoesReguladasProcedentes"].cast('int'))
                    .withColumn("QuantidadeReclamacoesReguladasOutras", df_reclamações["QuantidadeReclamacoesReguladasOutras"].cast('int'))
                    .withColumn("QuantidadeReclamacoesNaoReguladas", df_reclamações["QuantidadeReclamacoesNaoReguladas"].cast('int'))
                    .withColumn("QuantidadeTotalReclamacoes", df_reclamações["QuantidadeTotalReclamacoes"].cast('int'))
                    .withColumn("QuantidadeTotalClientesCCSSCR", df_reclamações["QuantidadeTotalClientesCCSSCR"].cast('int'))
                    .withColumn("QuantidadeClientesCCS", df_reclamações["QuantidadeClientesCCS"].cast('int'))
                    .withColumn("QuantidadeClientesSCR", df_reclamações["QuantidadeClientesSCR"].cast('int'))
                    .withColumn("ChaveCalendario", df_reclamações["ChaveCalendario"].cast('int'))
                  )

In [14]:
df_reclamações.printSchema()

root
 |-- CNPJ: integer (nullable = true)
 |-- InstituicaoFinanceira: integer (nullable = true)
 |-- QuantidadeReclamacoesReguladasProcedentes: integer (nullable = true)
 |-- QuantidadeReclamacoesReguladasOutras: integer (nullable = true)
 |-- QuantidadeReclamacoesNaoReguladas: integer (nullable = true)
 |-- QuantidadeTotalReclamacoes: integer (nullable = true)
 |-- QuantidadeTotalClientesCCSSCR: integer (nullable = true)
 |-- QuantidadeClientesCCS: integer (nullable = true)
 |-- QuantidadeClientesSCR: integer (nullable = true)
 |-- ChaveCalendario: integer (nullable = true)



#### Limpando o ``` df_tarifas```

In [15]:
df_tarifas.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- CodigoServico: string (nullable = true)
 |-- Servico: string (nullable = true)
 |-- Unidade: string (nullable = true)
 |-- DataVigencia: string (nullable = true)
 |-- ValorMaximo: string (nullable = true)
 |-- TipoValor: string (nullable = true)
 |-- Periodicidade: string (nullable = true)
 |-- CNPJ: string (nullable = true)



In [16]:
# transformando DataVigencia em date
df_tarifas = (df_tarifas.withColumn("DataVigencia", df_tarifas["DataVigencia"].cast('date')))

In [17]:
# criando chave calendario
df_tarifas = df_tarifas.select("*", concat(year(col("DataVigencia")), year(col("DataVigencia"))).alias("ChaveCalendario"))

In [18]:
# deletando colunas desnecessárias
df_tarifas = df_tarifas.drop(*['_c0', 'Servico', 'Unidade', 'TipoValor', 'Periodicidade', 'DataVigencia'])

In [19]:
# Garantindo tipo correto de coluna
df_tarifas = (df_tarifas
                    .withColumn("CNPJ", df_tarifas["CNPJ"].cast('int'))
                    .withColumn("CodigoServico", df_tarifas["CodigoServico"].cast('int'))
                    .withColumn("ValorMaximo", df_tarifas["ValorMaximo"].cast('float'))
                    .withColumn("ChaveCalendario", df_tarifas["ChaveCalendario"].cast('int'))
                  )

In [20]:
df_tarifas.printSchema()

root
 |-- CodigoServico: integer (nullable = true)
 |-- ValorMaximo: float (nullable = true)
 |-- CNPJ: integer (nullable = true)
 |-- ChaveCalendario: integer (nullable = true)



Dim Calendario

In [21]:
# Criando Calendario

data2 = [
    (12020, 1, "1º Trimestre", 2020),
    (22020, 2, "2º Trimestre", 2020),
    (32020, 3, "3º Trimestre", 2020),
    (42020, 4, "4º Trimestre", 2020),
    (12021, 1, "1º Trimestre", 2021),
    (22021, 2, "2º Trimestre", 2021),
    (32021, 3, "3º Trimestre", 2021),
    (42021, 4, "4º Trimestre", 2021),
    (12022, 1, "1º Trimestre", 2022),
    (22022, 2, "2º Trimestre", 2022),
    (32022, 3, "3º Trimestre", 2022),
    (42022, 4, "4º Trimestre", 2022)
  ]

schema = StructType([ \
    StructField("ChaveCalendario",IntegerType(),True), \
    StructField("Trimestre",IntegerType(),True), \
    StructField("Trimestre Descrição",StringType(),True), \
    StructField("Ano", IntegerType(), True), \
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)

root
 |-- ChaveCalendario: integer (nullable = true)
 |-- Trimestre: integer (nullable = true)
 |-- Trimestre Descrição: string (nullable = true)
 |-- Ano: integer (nullable = true)

+---------------+---------+-------------------+----+
|ChaveCalendario|Trimestre|Trimestre Descrição|Ano |
+---------------+---------+-------------------+----+
|12020          |1        |1º Trimestre       |2020|
|22020          |2        |2º Trimestre       |2020|
|32020          |3        |3º Trimestre       |2020|
|42020          |4        |4º Trimestre       |2020|
|12021          |1        |1º Trimestre       |2021|
|22021          |2        |2º Trimestre       |2021|
|32021          |3        |3º Trimestre       |2021|
|42021          |4        |4º Trimestre       |2021|
|12022          |1        |1º Trimestre       |2022|
|22022          |2        |2º Trimestre       |2022|
|32022          |3        |3º Trimestre       |2022|
|42022          |4        |4º Trimestre       |2022|
+---------------+-----

Dim Servicos

In [22]:
# deletando colunas desnecessárias
df_servicos = df_servicos.drop(*['_c0', 'DataVigencia', 'ValorMaximo', 'CNPJ'])

In [23]:
# ajustando texto
df_servicos = df_servicos.withColumn("Unidade", initcap(df_servicos["Unidade"])).withColumn("Periodicidade", initcap(df_servicos["Periodicidade"]))

In [24]:
# removendo códigos de serviços duplicados
df_servicos = df_servicos.dropDuplicates(["CodigoServico"])

In [25]:
# Garantindo tipo correto de coluna
df_servicos = (df_servicos
                    .withColumn("CodigoServico", df_servicos["CodigoServico"].cast('int'))
                    .withColumn("Servico", df_servicos["Servico"].cast('string'))
                    .withColumn("Unidade", df_servicos["Unidade"].cast('string'))
                    .withColumn("TipoValor", df_servicos["TipoValor"].cast('string'))
                    .withColumn("Periodicidade", df_servicos["Periodicidade"].cast('string'))
                  )

In [26]:
df_servicos.printSchema()

root
 |-- CodigoServico: integer (nullable = true)
 |-- Servico: string (nullable = true)
 |-- Unidade: string (nullable = true)
 |-- TipoValor: string (nullable = true)
 |-- Periodicidade: string (nullable = true)



Dim Instituições

In [27]:
# removendo Conglomerados
df_instituicoes = df_instituicoes.where(df_instituicoes.Tipo != 'Conglomerado')

In [30]:
# deletando colunas desnecessárias
df_instituicoes = df_instituicoes.select(*["Categoria", "Tipo", "CNPJ IF", "Instituição financeira"])

In [32]:
# renomeando colunas

df_instituicoes = (
    df_instituicoes
    .withColumnRenamed('CNPJ IF', 'CNPJ')
    .withColumnRenamed('Instituição financeira', 'InstituicaoFinanceira')
)

In [35]:
# removendo códigos de serviços duplicados
df_instituicoes = df_instituicoes.dropDuplicates(["CNPJ"])

In [41]:
# Garantindo tipo correto de coluna
df_instituicoes = (df_instituicoes
                    .withColumn("CNPJ", df_instituicoes["CNPJ"].cast('int'))
                    .withColumn("Tipo", df_instituicoes["Tipo"].cast('string'))
                    .withColumn("Categoria", df_instituicoes["Categoria"].cast('string'))
                    .withColumn("InstituicaoFinanceira", df_instituicoes["InstituicaoFinanceira"].cast('string'))
                  )

## 3 Salvando dados tratados

In [None]:
df_tarifas.to_csv(caminho + 'normalizadas/df_tarifas.csv', encoding='latin1', index=False)
df_lista_inst.to_csv(caminho + 'normalizadas/df_lista_inst.csv', encoding='latin1', index=False)
df_consolidado.to_csv(caminho + 'normalizadas/df_consolidado.csv', encoding='latin1', index=False)
df_reclamações.to_csv(caminho + 'normalizadas/df_reclamações.csv', encoding='latin1', index=False)