<a href="https://colab.research.google.com/github/AmandaAntonio/ETL/blob/main/ETL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Iniciando o Ambiente Spark


In [None]:
#Instalando o Spark
!pip install pyspark



In [None]:
#Instalando o unidecode
!pip install unidecode



In [None]:
#Importando bibliotecas
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.window import Window
from unidecode import unidecode

In [None]:
#Iniciando a sessão Spark
spark = SparkSession.builder.appName("Feature_Engineering").getOrCreate()

#Carregando os Dados em CSV

##Leitura dos dados

In [None]:
#Lendo o arquivo csv
cadastro_csv = spark.read.csv("/content/drive/MyDrive/ETL_Exercicio/Raw_Data/dados_cadastro_2.csv", header=True, sep = "|")

In [None]:
#Exibindo as primeiras linhas
cadastro_csv.show(truncate=False)

+-----------+----------------------+---------------+----+--------+--------------------------+------+-----------------------------+--------------------+------+-------------------+-------------+
|cpf        |nome                  |data_nascimento|sexo|convenio|logradouro                |numero|bairro                       |cidade              |estado|data_cadastro      |pais_cadastro|
+-----------+----------------------+---------------+----+--------+--------------------------+------+-----------------------------+--------------------+------+-------------------+-------------+
|03687145271|Maria Cecília Duarte  |02/11/1989     |Fem |AMIL    |Jardim Rodrigues          |335.0 |São Geraldo                  |Gonçalves           |CE    |1970-07-28 10:05:02|Br           |
|40791586375|Maria Cecília Alves   |10/05/1980     |Fem |AMIL    |Viela de Ribeiro          |651.0 |Goiania                      |Rezende da Praia    |AP    |1992-03-12 07:10:16|Br           |
|52983647056|Laura Cardoso         

In [None]:
#Nomes das colunas e o tipo dos dados
cadastro_csv.printSchema()

root
 |-- cpf: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- data_nascimento: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- convenio: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- data_cadastro: string (nullable = true)
 |-- pais_cadastro: string (nullable = true)



In [None]:
#Número de linhas da tabela
cadastro_csv.count()

952875

##Tratamento dos dados

In [None]:
#Fazendo um cópia da base de dados original
cadastro_csv_clean = cadastro_csv.alias("cadastro_csv_clean")

In [None]:
#Função para remover acentos
def remove_acentos(text):
  return unidecode(text)

#Registrando a função como UDF (User Defined Function)
remove_acentos_udf = F.udf(remove_acentos, StringType())

In [None]:
#Removendo os pronomes de tratamento da coluna nome
cadastro_csv_clean = cadastro_csv_clean.withColumn("nome", F.when(F.col("nome").rlike(r"^(Sr\.? |Srta\.? |Dr\.? |Dra\.? |Sra\.? )")\
                                                      , F.regexp_replace(F.col("nome"), r"^(Sr\.? |Srta\.? |Dr\.? |Dra\.? |Sra\.? )", ""))\
                                                      .otherwise(F.col("nome")))


#Convertendo a coluna data nascimento para o formato de data
cadastro_csv_clean = cadastro_csv_clean.withColumn("data_nascimento", F.to_date("data_nascimento", "dd/MM/yyyy"))

#Convertendo o valor Fem e Masc para F e M
cadastro_csv_clean = cadastro_csv_clean.replace(["Fem", "Masc"], ["F", "M"], "sexo")

#Removendo o valor .0 da coluna numero
cadastro_csv_clean = cadastro_csv_clean.withColumn("numero", F.regexp_replace(F.col("numero"), r"\.0$", ""))

#Padronizando os registros da coluna cidade(inicias com letra maiúscula)
cadastro_csv_clean = cadastro_csv_clean.withColumn("cidade", F.initcap("cidade"))

#Convertendo a coluna data_cadastro de string para timestamp
cadastro_csv_clean = cadastro_csv_clean.withColumn("data_cadastro", F.to_timestamp("data_cadastro", "yyyy-MM-dd HH:mm:ss"))

#Transformando o Br da coluna país pra Brasil
cadastro_csv_clean = cadastro_csv_clean.replace(["Br"], ["Brasil"], "pais_cadastro")

#Colunas para remover acentos
colunas_para_limpar = ['nome', 'convenio', 'logradouro', 'bairro', 'cidade']

# Aplicando a remoção de acentos em cada coluna especificada
for coluna in colunas_para_limpar:
  cadastro_csv_clean = cadastro_csv_clean.withColumn(coluna, remove_acentos_udf(F.col(coluna)))

In [None]:
#Dados após o tratamento
cadastro_csv_clean.show(truncate=False)

+-----------+--------------------+---------------+----+--------+--------------------------+------+-----------------------------+--------------------+------+-------------------+-------------+
|cpf        |nome                |data_nascimento|sexo|convenio|logradouro                |numero|bairro                       |cidade              |estado|data_cadastro      |pais_cadastro|
+-----------+--------------------+---------------+----+--------+--------------------------+------+-----------------------------+--------------------+------+-------------------+-------------+
|03687145271|Maria Cecilia Duarte|1989-11-02     |F   |AMIL    |Jardim Rodrigues          |335   |Sao Geraldo                  |Goncalves           |CE    |1970-07-28 10:05:02|Brasil       |
|40791586375|Maria Cecilia Alves |1980-05-10     |F   |AMIL    |Viela de Ribeiro          |651   |Goiania                      |Rezende Da Praia    |AP    |1992-03-12 07:10:16|Brasil       |
|52983647056|Laura Cardoso       |1997-09-02 

In [None]:
#Tipagem de dados após o tratamento
cadastro_csv_clean.printSchema()

root
 |-- cpf: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- data_nascimento: date (nullable = true)
 |-- sexo: string (nullable = true)
 |-- convenio: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- data_cadastro: timestamp (nullable = true)
 |-- pais_cadastro: string (nullable = true)



##Exploração de dados

In [None]:
#Contagem de registros para coluna cpf
contagem_cpf_csv = cadastro_csv_clean.groupBy("cpf").count().alias("count")

#Verificando os valores de cpf repetidos
contagem_cpf_csv.filter(F.col("count") > 1).show()

+-----------+-----+
|        cpf|count|
+-----------+-----+
|86705324171|    2|
|91045283797|    2|
|06214593806|    2|
|10452867967|    2|
+-----------+-----+



In [None]:
#Definindo a janela para cada grupo 'cpf', ordenada pela data_cadastro em ordem decrescente
windowSpec = Window.partitionBy("cpf").orderBy(F.desc("data_cadastro"))

#Adicionando uma coluna numerada por linha, ordenada pela data_cadastro decrescente
cadastro_csv_clean = cadastro_csv_clean.withColumn("row_number", F.row_number().over(windowSpec))

#Mantendo apenas as linhas com o número 1 (ou seja, a entrada mais recente para cada 'cpf')
cadastro_csv_clean = cadastro_csv_clean.filter(cadastro_csv_clean["row_number"] == 1).drop("row_number")

In [None]:
#Contagem de registros para coluna cpf
contagem_cpf_csv = cadastro_csv_clean.groupBy("cpf").count().alias("count")

#Verificando o resultado
contagem_cpf_csv.filter(F.col("count") > 1).show()

+---+-----+
|cpf|count|
+---+-----+
+---+-----+



In [None]:
#Contagem para valores da coluna convenio
cadastro_csv_clean.groupBy("convenio").count().alias("count").show()

+------------+-----+
|    convenio|count|
+------------+-----+
|Porto Seguro|86731|
|  GoodHealth|86945|
|   SaudeMais|86647|
|       Prata|86405|
|        AMIL|86420|
|        Gold|86266|
|  Sulamerica|86462|
|    Bradesco|86860|
|        Itau|86387|
|      Bronze|86863|
|         SUS|86885|
+------------+-----+



##Salvando os dados

In [None]:
#Criando uma view temporaria
cadastro_csv_clean.createOrReplaceTempView("cadastro_csv_clean")

In [None]:
#Padronizando os nomes e as sequência das colunas do dataframe
cadastro_csv_clean = spark.sql("""
               SELECT
                cpf as CPF,
                nome as NOME,
                data_nascimento as DATA_NASCIMENTO,
                sexo as SEXO,
                convenio CONVENIO,
                logradouro as LOGRADOURO,
                numero NUMERO,
                bairro as BAIRRO,
                cidade as CIDADE,
                estado as ESTADO,
                data_cadastro as DATA_CADASTRO,
                pais_cadastro as PAIS_CADASTRO
              FROM
                cadastro_csv_clean
""")

#Exibindo o dataframe limpo
cadastro_csv_clean.show(5, truncate=False)

+-----------+---------------------+---------------+----+------------+---------------------+------+--------------------+-------------------+------+-------------------+-------------+
|CPF        |NOME                 |DATA_NASCIMENTO|SEXO|CONVENIO    |LOGRADOURO           |NUMERO|BAIRRO              |CIDADE             |ESTADO|DATA_CADASTRO      |PAIS_CADASTRO|
+-----------+---------------------+---------------+----+------------+---------------------+------+--------------------+-------------------+------+-------------------+-------------+
|01234569752|Luiz Miguel Rodrigues|2017-06-04     |M   |Porto Seguro|Loteamento de Mendes |373   |Conjunto Serra Verde|Santos             |SP    |2014-04-02 04:22:42|Brasil       |
|01234578905|Luiz Henrique Gomes  |2020-12-13     |M   |Gold        |Setor de da Conceicao|346   |Vila Sumare         |Ferreira           |PA    |2008-04-03 23:31:27|Brasil       |
|01234579634|Heitor Vieira        |1978-05-28     |M   |Gold        |Patio Vieira         |554 

In [None]:
#Caminho onde salva o arquivo no Google Drive
caminho_no_drive = "/content/drive/MyDrive/ETL_Exercicio/Clean_Data/cadastro_csv_clean_parquet"

#Salvando o DataFrame como arquivo parquet no Google Drive
cadastro_csv_clean.write.parquet(caminho_no_drive, mode="overwrite")

#Carregando os Dados em Parquet

##Leitura dos dados

In [None]:
#Lendo o arquivo parquet
cadastro_parquet = spark.read.parquet("/content/drive/MyDrive/ETL_Exercicio/Raw_Data/dados_cadastro_1.parquet",header = True)

In [None]:
#Exibindo as primeiras linhas
cadastro_parquet.show(truncate=False)

+--------------+------------------------+---------------+----+------------+------------------------+------+-----------------------+---------------------+------+-------------------+---+-----------------+
|documento_cpf |nome_completo           |data_nascimento|sexo|convenio    |logradouro              |numero|bairro                 |cidade               |pais  |data_cadastro      |uf |__index_level_0__|
+--------------+------------------------+---------------+----+------------+------------------------+------+-----------------------+---------------------+------+-------------------+---+-----------------+
|390.628.415-89|Caroline Rocha          |1982-02-28     |Fem |GoodHealth  |Trevo Santos            |773   |Vila Antena            |Cardoso              |Brasil|1979-10-19T08:09:16|AC |207979           |
|360.847.952-00|Dr. Luiz Otávio Monteiro|2020-09-04     |Masc|Porto Seguro|Trevo Mariana Alves     |184   |Marmiteiros            |Gonçalves            |Brasil|2012-01-22T00:21:53|AC |5181

In [None]:
#Nomes das colunas e o tipo dos dados
cadastro_parquet.printSchema()

root
 |-- documento_cpf: string (nullable = true)
 |-- nome_completo: string (nullable = true)
 |-- data_nascimento: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- convenio: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- pais: string (nullable = true)
 |-- data_cadastro: string (nullable = true)
 |-- uf: string (nullable = true)
 |-- __index_level_0__: long (nullable = true)



In [None]:
#Número de linhas da tabela
cadastro_parquet.count()

952613

##Tratamento do dados

In [None]:
#Fazendo um cópia da base de dados original
cadastro_parquet_clean = cadastro_parquet.alias("cadastro_csv_clean")

In [None]:
#Removendo os pontos da coluna cpf
cadastro_parquet_clean = cadastro_parquet_clean.withColumn('documento_cpf', F.regexp_replace('documento_cpf', '[^\w\s]', ''))

#Removendo os pronomes de tratamento da coluna nome completo
cadastro_parquet_clean = cadastro_parquet_clean.withColumn("nome_completo", F.when(F.col("nome_completo").rlike(r"^(Sr\.? |Srta\.? |Dr\.? |Dra\.? |Sra\.? )")\
                                              ,F.regexp_replace(F.col("nome_completo"), r"^(Sr\.? |Srta\.? |Dr\.? |Dra\.? |Sra\.? )", ""))\
                                              .otherwise(F.col("nome_completo")))

#Convertendo a coluna data nascimento para data
cadastro_parquet_clean = cadastro_parquet_clean.withColumn("data_nascimento", F.to_date("data_nascimento", "yyyy-MM-dd"))

#Convertendo o valor Fem e Masc para F e M
cadastro_parquet_clean = cadastro_parquet_clean.replace(["Fem", "Masc"], ["F", "M"], "sexo")

#Convertendo a coluna data nascimento para timestamp
cadastro_parquet_clean = cadastro_parquet_clean.withColumn("data_cadastro", F.to_timestamp("data_cadastro", "yyyy-MM-dd'T'HH:mm:ss"))

#Padronizando os registros da coluna cidade(inicias com letra maiúscula)
cadastro_parquet_clean = cadastro_parquet_clean.withColumn("cidade", F.initcap("cidade"))

#Excluindo a coluna __inde__level__0
cadastro_parquet_clean = cadastro_parquet_clean.drop("__index_level_0__")

#Colunas para remover acentos
colunas_para_limpar = ['nome_completo', 'convenio', 'logradouro', 'bairro', 'cidade']

# Aplicando a remoção de acentos em cada coluna especificada
for coluna in colunas_para_limpar:
  cadastro_parquet_clean = cadastro_parquet_clean.withColumn(coluna, remove_acentos_udf(F.col(coluna)))


In [None]:
#Dados após o tratamento
cadastro_parquet_clean.show(truncate=False)

+-------------+---------------------+---------------+----+------------+------------------------+------+-----------------------+---------------------+------+-------------------+---+
|documento_cpf|nome_completo        |data_nascimento|sexo|convenio    |logradouro              |numero|bairro                 |cidade               |pais  |data_cadastro      |uf |
+-------------+---------------------+---------------+----+------------+------------------------+------+-----------------------+---------------------+------+-------------------+---+
|39062841589  |Caroline Rocha       |1982-02-28     |F   |GoodHealth  |Trevo Santos            |773   |Vila Antena            |Cardoso              |Brasil|1979-10-19 08:09:16|AC |
|36084795200  |Luiz Otavio Monteiro |2020-09-04     |M   |Porto Seguro|Trevo Mariana Alves     |184   |Marmiteiros            |Goncalves            |Brasil|2012-01-22 00:21:53|AC |
|41593207670  |Emanuelly Caldeira   |2011-10-28     |F   |Prata       |Residencial Mendes      

In [None]:
#Tipagem de dados após o tratamento
cadastro_parquet_clean.printSchema()

root
 |-- documento_cpf: string (nullable = true)
 |-- nome_completo: string (nullable = true)
 |-- data_nascimento: date (nullable = true)
 |-- sexo: string (nullable = true)
 |-- convenio: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- pais: string (nullable = true)
 |-- data_cadastro: timestamp (nullable = true)
 |-- uf: string (nullable = true)



##Exploração dos dados

In [None]:
#Contagem de registros para coluna cpf
contagem_cpf_parket = cadastro_parquet_clean.groupBy("documento_cpf").count().alias("count")

#Verificando os valores de cpf repetidos
contagem_cpf_parket.filter(F.col("count") > 1).show()

+-------------+-----+
|documento_cpf|count|
+-------------+-----+
+-------------+-----+



In [None]:
#Contagem para valores da coluna convenio
cadastro_parquet_clean.groupBy("convenio").count().alias("count").show()

+------------+-----+
|    convenio|count|
+------------+-----+
|Porto Seguro|86796|
|  GoodHealth|86916|
|   SaudeMais|87047|
|       Prata|86351|
|        AMIL|86094|
|        Gold|86705|
|  Sulamerica|86712|
|    Bradesco|86073|
|        Itau|86346|
|      Bronze|86380|
|         SUS|87193|
+------------+-----+



##Salvando os dados

In [None]:
#Criando uma view temporaria
cadastro_parquet_clean.createOrReplaceTempView("cadastro_parquet_clean")

In [None]:
#Padronizando os nomes e as sequência das colunas do dataframe
cadastro_parquet_clean = spark.sql("""
               SELECT
                documento_cpf as CPF,
                nome_completo as NOME,
                data_nascimento as DATA_NASCIMENTO,
                sexo as SEXO,
                convenio as CONVENIO,
                logradouro as LOGRADOURO,
                numero as NUMERO,
                bairro as BAIRRO,
                cidade as CIDADE,
                uf as ESTADO,
                data_cadastro as DATA_CADASTRO,
                pais as PAIS_CADASTRO
             FROM
                cadastro_parquet_clean
""")

#Exibindo o dataframe limpo
cadastro_parquet_clean.show(5, truncate=False)

+-----------+--------------------+---------------+----+------------+-------------------+------+-------------+-------------+------+-------------------+-------------+
|CPF        |NOME                |DATA_NASCIMENTO|SEXO|CONVENIO    |LOGRADOURO         |NUMERO|BAIRRO       |CIDADE       |ESTADO|DATA_CADASTRO      |PAIS_CADASTRO|
+-----------+--------------------+---------------+----+------------+-------------------+------+-------------+-------------+------+-------------------+-------------+
|39062841589|Caroline Rocha      |1982-02-28     |F   |GoodHealth  |Trevo Santos       |773   |Vila Antena  |Cardoso      |AC    |1979-10-19 08:09:16|Brasil       |
|36084795200|Luiz Otavio Monteiro|2020-09-04     |M   |Porto Seguro|Trevo Mariana Alves|184   |Marmiteiros  |Goncalves    |AC    |2012-01-22 00:21:53|Brasil       |
|41593207670|Emanuelly Caldeira  |2011-10-28     |F   |Prata       |Residencial Mendes |449   |Vila Puc     |Pinto        |AC    |2022-03-16 08:37:34|Brasil       |
|024653789

In [None]:
#Caminho onde salva o arquivo no Google Drive
caminho_no_drive = "/content/drive/MyDrive/ETL_Exercicio/Clean_Data/cadastro_parquet_clean_parquet"

#Salvando o DataFrame como arquivo parquet no Google Drive
cadastro_parquet_clean.write.parquet(caminho_no_drive, mode="overwrite")

# Carregando os Dados em Json

##Leitura dos dados

In [None]:
#Lendo o arquivo json
cadastro_json = spark.read.json("/content/drive/MyDrive/ETL_Exercicio/Raw_Data/dados_cadastro_3.json")

In [None]:
#Exibindo as primeiras linhas
cadastro_json.show(truncate=False)

+-----------------------------+-------------------+--------+--------------+-------------+------------------+------------------+-------------------------------+-----------------------+------+----+----+
|bairro                       |cidade             |convenio|cpf           |data_cadastro|data_nascimento   |estado            |logradouro                     |nome                   |numero|pais|sexo|
+-----------------------------+-------------------+--------+--------------+-------------+------------------+------------------+-------------------------------+-----------------------+------+----+----+
|Vila Madre Gertrudes 3ª Seção|Ribeiro Grande     |AMIL    |573.691.208-21|817032248000 |August 26, 2017   |Amapá             |Largo de Viana                 |Ryan Santos            |886   |    |M   |
|Havaí                        |Caldeira das Flores|AMIL    |520.936.741-06|1009317437000|January 30, 2023  |Mato Grosso do Sul|Rodovia Nina Alves             |Juliana Correia        |545   |    |F

In [None]:
#Nomes das colunas e o tipo dos dados
cadastro_json.printSchema()

root
 |-- bairro: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- convenio: string (nullable = true)
 |-- cpf: string (nullable = true)
 |-- data_cadastro: long (nullable = true)
 |-- data_nascimento: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- numero: long (nullable = true)
 |-- pais: string (nullable = true)
 |-- sexo: string (nullable = true)



In [None]:
#Número de linhas da tabela
cadastro_json.count()

952210

##Tratamento dos dados

In [None]:
#Fazendo um cópia da base de dados original
cadastro_json_clean = cadastro_json.alias("cadastro_json_clean")

In [None]:
#Função para converter nomes dos estados em siglas
def converter_estado(nome_estado):
    return mapeamento_estados.get(nome_estado, nome_estado)

#Registrando a função como UDF (User Defined Function)
converter_estado_udf = F.udf(converter_estado)


In [None]:
#Padronizando os registros da coluna cidade(inicias com letra maiúscula)
cadastro_json_clean = cadastro_json_clean.withColumn("cidade", F.initcap("cidade"))

#Removendo os pontos da coluna cpf
cadastro_json_clean = cadastro_json_clean.withColumn('cpf', F.regexp_replace('cpf', '[^\w\s]', ''))

#Convertendo a coluna data_cadastro de long para um timestamp
cadastro_json_clean = cadastro_json_clean.withColumn("data_cadastro", F.from_unixtime(cadastro_json["data_cadastro"] / 1000, "yyyy-MM-dd HH:mm:ss").cast(TimestampType()))

#Convertendo a coluna data_nascimento para data
cadastro_json_clean = cadastro_json_clean.withColumn("data_nascimento", F.to_date("data_nascimento", "MMMM dd, yyyy"))

#Removendo os pronomes de tratamento da coluna nome completo
cadastro_json_clean = cadastro_json_clean.withColumn("nome", F.when(F.col("nome").rlike(r"^(Sr\.? |Srta\.? |Dr\.? |Dra\.? |Sra\.? )")\
                                          ,F.regexp_replace(F.col("nome"), r"^(Sr\.? |Srta\.? |Dr\.? |Dra\.? |Sra\.? )", ""))\
                                          .otherwise(F.col("nome")))

#Preenchendo com o valor Brasil a coluna pais
cadastro_json_clean = cadastro_json_clean.withColumn("pais", F.lit("Brasil"))

#Mapeando os nomes dos estados para siglas
mapeamento_estados = {
    "Acre": "AC", "Alagoas": "AL", "Amapá": "AP", "Amazonas": "AM", "Bahia": "BA", "Ceará": "CE", "Distrito Federal": "DF",
    "Espírito Santo": "ES", "Goiás": "GO", "Maranhão": "MA", "Mato Grosso": "MT", "Mato Grosso do Sul": "MS", "Minas Gerais": "MG",
    "Pará": "PA",  "Paraíba": "PB", "Paraná": "PR", "Pernambuco": "PE", "Piauí": "PI", "Rio de Janeiro": "RJ", "Rio Grande do Norte": "RN",
    "Rio Grande do Sul": "RS", "Rondônia": "RO", "Roraima": "RR", "Santa Catarina": "SC", "São Paulo": "SP", "Sergipe": "SE",
    "Tocantins": "TO"
}

#Aplicando a conversão na coluna estado
cadastro_json_clean = cadastro_json_clean.withColumn("estado", converter_estado_udf(F.col("estado")))

#Colunas para remover acentos
colunas_para_limpar = ["nome", "convenio", "logradouro", "bairro", "cidade"]

# Aplicando a remoção de acentos em cada coluna especificada
for coluna in colunas_para_limpar:
  cadastro_json_clean = cadastro_json_clean.withColumn(coluna, remove_acentos_udf(F.col(coluna)))

In [None]:
#Dados após o tratamento
cadastro_json_clean.show(truncate=False)

+-----------------------------+-------------------+--------+-----------+-------------------+---------------+------+-------------------------------+--------------------+------+------+----+
|bairro                       |cidade             |convenio|cpf        |data_cadastro      |data_nascimento|estado|logradouro                     |nome                |numero|pais  |sexo|
+-----------------------------+-------------------+--------+-----------+-------------------+---------------+------+-------------------------------+--------------------+------+------+----+
|Vila Madre Gertrudes 3a Secao|Ribeiro Grande     |AMIL    |57369120821|1995-11-22 09:24:08|2017-08-26     |AP    |Largo de Viana                 |Ryan Santos         |886   |Brasil|M   |
|Havai                        |Caldeira Das Flores|AMIL    |52093674106|2001-12-25 21:57:17|2023-01-30     |MS    |Rodovia Nina Alves             |Juliana Correia     |545   |Brasil|F   |
|Caicaras                     |Barbosa            |AMIL    |

In [None]:
#Tipagem de dados após o tratamento
cadastro_json_clean.printSchema()

root
 |-- bairro: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- convenio: string (nullable = true)
 |-- cpf: string (nullable = true)
 |-- data_cadastro: timestamp (nullable = true)
 |-- data_nascimento: date (nullable = true)
 |-- estado: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- numero: long (nullable = true)
 |-- pais: string (nullable = false)
 |-- sexo: string (nullable = true)



##Exploração dos dados

In [None]:
#Contagem de registros para coluna cpf
contagem_cpf_json = cadastro_json_clean.groupBy("cpf").count().alias("count")

#Verificando os valores de cpf repetidos
contagem_cpf_json.filter(F.col("count") > 1).show()

+---+-----+
|cpf|count|
+---+-----+
+---+-----+



In [None]:
#Contagem para valores da coluna convenio
cadastro_json_clean.groupBy("convenio").count().alias("count").show()

+------------+-----+
|    convenio|count|
+------------+-----+
|  GoodHealth|86969|
|        AMIL|86512|
|        Gold|86435|
|    Bradesco|86582|
|        Itau|86854|
|      Bronze|86427|
|Porto Seguro|86529|
|   SaudeMais|86461|
|       Prata|86300|
|  Sulamerica|87023|
|         SUS|86118|
+------------+-----+



##Salvando os dados

In [None]:
#Criando uma view temporaria
cadastro_json_clean.createOrReplaceTempView("cadastro_json_clean")

In [None]:
#Padronizando os nomes e as sequência das colunas do dataframe
cadastro_json_clean = spark.sql("""
              SELECT
                cpf as CPF,
                nome as NOME,
                data_nascimento as DATA_NASCIMENTO,
                sexo as SEXO,
                convenio as CONVENIO,
                logradouro as LOGRADOURO,
                numero as NUMERO,
                bairro as BAIRRO,
                cidade as CIDADE,
                estado as ESTADO,
                data_cadastro as DATA_CADASTRO,
                pais as PAIS_CADASTRO
             FROM
                cadastro_json_clean
""")

#Exibindo o dataframe limpo
cadastro_json_clean.show(5, truncate=False)

+-----------+-----------------+---------------+----+--------+----------------------+------+-----------------------------+-------------------+------+-------------------+-------------+
|CPF        |NOME             |DATA_NASCIMENTO|SEXO|CONVENIO|LOGRADOURO            |NUMERO|BAIRRO                       |CIDADE             |ESTADO|DATA_CADASTRO      |PAIS_CADASTRO|
+-----------+-----------------+---------------+----+--------+----------------------+------+-----------------------------+-------------------+------+-------------------+-------------+
|57369120821|Ryan Santos      |2017-08-26     |M   |AMIL    |Largo de Viana        |886   |Vila Madre Gertrudes 3a Secao|Ribeiro Grande     |AP    |1995-11-22 09:24:08|Brasil       |
|52093674106|Juliana Correia  |2023-01-30     |F   |AMIL    |Rodovia Nina Alves    |545   |Havai                        |Caldeira Das Flores|MS    |2001-12-25 21:57:17|Brasil       |
|01269843524|Juliana Rodrigues|1979-01-06     |F   |AMIL    |Rua Peixoto           |7

In [None]:
#Caminho onde salva o arquivo no Google Drive
caminho_no_drive = "/content/drive/MyDrive/ETL_Exercicio/Clean_Data/cadastro_json_clean_parquet"

#Salvando o DataFrame como arquivo parquet no Google Drive
cadastro_json_clean.write.parquet(caminho_no_drive, mode="overwrite")

#Unindo os Arquivos

In [None]:
#Lendo os arquivos parquet limpos
cadastro_parquet_csv= spark.read.parquet("/content/drive/MyDrive/ETL_Exercicio/Clean_Data/cadastro_csv_clean_parquet")
cadastro_parquet_parquet = spark.read.parquet("/content/drive/MyDrive/ETL_Exercicio/Clean_Data/cadastro_parquet_clean_parquet")
cadastro_parquet_json = spark.read.parquet("/content/drive/MyDrive/ETL_Exercicio/Clean_Data/cadastro_json_clean_parquet")

In [None]:
#Criando views temporárias
cadastro_parquet_csv.createOrReplaceTempView("cadastro_parquet_csv")
cadastro_parquet_parquet.createOrReplaceTempView("cadastro_parquet_parquet")
cadastro_parquet_json.createOrReplaceTempView("cadastro_parquet_json")

In [None]:
cadastro_final_parquet = spark.sql("""
              SELECT
                  CPF,
                  NOME,
                  DATA_NASCIMENTO,
                  SEXO,
                  CONVENIO,
                  LOGRADOURO,
                  NUMERO,
                  BAIRRO,
                  CIDADE,
                  ESTADO,
                  DATA_CADASTRO,
                  PAIS_CADASTRO
              FROM
                cadastro_parquet_csv

              UNION ALL

              SELECT
                  CPF,
                  NOME,
                  DATA_NASCIMENTO,
                  SEXO,
                  CONVENIO,
                  LOGRADOURO,
                  NUMERO,
                  BAIRRO,
                  CIDADE,
                  ESTADO,
                  DATA_CADASTRO,
                  PAIS_CADASTRO
              FROM
                cadastro_parquet_parquet

             UNION ALL

             SELECT
                  CPF,
                  NOME,
                  DATA_NASCIMENTO,
                  SEXO,
                  CONVENIO,
                  LOGRADOURO,
                  NUMERO,
                  BAIRRO,
                  CIDADE,
                  ESTADO,
                  DATA_CADASTRO,
                  PAIS_CADASTRO
             FROM
                cadastro_parquet_json
""")

#Exibindo o dataframe final
cadastro_final_parquet.show(8, truncate=False)

+-----------+---------------------+---------------+----+----------+---------------------+------+----------------------------+-----------+------+-------------------+-------------+
|CPF        |NOME                 |DATA_NASCIMENTO|SEXO|CONVENIO  |LOGRADOURO           |NUMERO|BAIRRO                      |CIDADE     |ESTADO|DATA_CADASTRO      |PAIS_CADASTRO|
+-----------+---------------------+---------------+----+----------+---------------------+------+----------------------------+-----------+------+-------------------+-------------+
|01234586924|Alicia Fernandes     |2021-04-30     |F   |Sulamerica|Recanto Oliveira     |584   |Vila Tirol                  |Nogueira   |GO    |1972-05-23 01:40:13|Brasil       |
|01234596725|Maria Sophia Ferreira|1979-08-06     |F   |Sulamerica|Rodovia de Vieira    |279   |Coqueiros                   |Rocha      |AM    |2019-12-30 18:18:00|Brasil       |
|01234597888|Ana Carolina Ramos   |2007-03-22     |F   |Prata     |Sitio de Freitas     |165   |Barao Hom

In [None]:
#Criando uma view temporária
cadastro_final_parquet.createOrReplaceTempView("cadastro_final")

In [None]:
#Verificando se os dados não se repetem na base de dados final
spark.sql("""
      SELECT DISTINCT
        COUNT(CPF) as TOTAL_REGISTROS_UNICOS,
        COUNT(*) as TOTAL_REGISTROS
      FROM
        cadastro_final
""").show()

+----------------------+---------------+
|TOTAL_REGISTROS_UNICOS|TOTAL_REGISTROS|
+----------------------+---------------+
|               2857694|        2857694|
+----------------------+---------------+



In [None]:
#Caminho onde salva o arquivo no Google Drive
caminho_no_drive = "/content/drive/MyDrive/ETL_Exercicio/Ready_Data/cadastro_final_parquet"

#Salvando o DataFrame como arquivo parquet no Google Drive
cadastro_final_parquet.write.parquet(caminho_no_drive, mode="overwrite")