# Bootcamp da Xpe

## Engenharia de Dados em Cloud

### Módulo 1: Fundamentos em Arquitetura de Dados e Soluções em Nuvem


### Objetivos:
>> Implementação de um Data Lake; <br>
>> Armazenamento de dados em Storage camada Raw; <br>
>> Armazenamento de dados em Storage camada Bronze; <br>
>> Armazenamento de dados em Storage camada Silver; <br>
>> Implementação de Processamento de Big Data; <br>
>> IaC de toda estrutura com Terraform; <br>
>> Esteiras de Deploy com Github. <br>


### Esse notebook trata dos itens 2 e 3 do desafio
2. Realizar tratamento no dataset da RAIS 2020  <br>
    a. Modifique os nomes das colunas, trocando espaços por “_”; <br>
    b. Retire acentos e caracter especiais das colunas; <br>
    c. Transforme todas as colunas em letras minúsculas; <br>
    d. Crie uma coluna “uf” através da coluna "municipio"; <br>
    e. Realize os ajustes no tipo de dado para as colunas de remuneração.

3. Transformar os dados no formato parquet e escrevê-los na zona staging ou zona silver do seu Data Lake.

In [1]:
import pyspark
# import pyarrow
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, input_file_name, regexp_replace
from pyspark.sql import functions as spkFn

import unicodedata
import re

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1675446198895_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Inicia uma `Session` do Spark

In [2]:
spark = (SparkSession.builder
                     .appName("readFile")
                     .config("spark.sql.repl.eagerEval.enabled", True)
                     .getOrCreate()) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Path para diretório source e target

In [4]:
productRais = "RAIS2020"

#### Path Cloud AWS
pathRaw     = f"s3://datalake-igti-tf-dev-877854487254/RAW/RAIS-2020/"
pathBronze  = f's3://datalake-igti-tf-dev-877854487254/SILVER/RAIS-2020/'

#### Path Load
#pathRaw    = f"D:/{productRais}/DATA_GZIP/"
#pathBronze = f"D:/{productRais}/bronze/DATA_PARQUET/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Schema DDL

In [5]:
fileNameSchema = "RAIS_VINC_PUB_NORTE.txt.gz"
filePathSchema = f"{pathRaw}/{fileNameSchema}"

fileDfSchema   = (spark.read
                       .format("csv")
                       .option("header","true")
                       .option("sep", ";")
                       .option("encoding", "latin1")
                       .option("inferSchema", "true")
                       .load(filePathSchema)
                       .schema)

schemaJson     = fileDfSchema.json()
schemaDDL      = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schemaJson).toDDL()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Ler arquivos com a API DataFrameRead

In [6]:
## Read
rais2020_csv = (spark.read
                     .format("csv")
                     .option("header","true")
                     .option("sep", ";")
                     .option("encoding", "latin1")
                     .option("inferSchema", "true")
                     .schema(schemaDDL)
                     .load(pathRaw)
                     .withColumn("file_name", lit(input_file_name())))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# print(rais2020_csv.printSchema())

### Funções para normalizar as colunas

> ***normalizar_colunas***
1. Função para normalizar as colunas de um dataframe
    - Retira espaços vazios e incluir um underline (Ex: `Sobre Nome -> Sobre_Nome`)
    - Retira ponto e incluir um underline (Ex: `Sobre.Nome -> Sobre_Nome`)
    - Formata todas as colunas com letras minúsculas (Ex: `Sobre_Nome -> sobre_nome`)

> ***normalizar_acentos***
2. Função para retirar os acentos de todas as colunas de um dataframe
    - Remover espaços nas extremidades (Ex: `"  sobre_nome  " -> "sobre_nome"`)
    - Replace de carácteres especiais por underline (Ex: `sobre@nome -> sobre_nome`)
    - Remover underlines nas extremidades (Ex: `_sobre_nome_ -> sobre_nome`)
    - Remover acentuação (Ex: `média_mês -> media_mes`)

In [7]:
## Função para normalizar as colunas de um dataframe
def normalizar_colunas(df):
  try:
    new_column_spaces_lower = (list(map(lambda x: x.replace(" ", "_")
                                                   .replace(".", "_")
                                                   .lower(),
                                                 df.columns)))
    return df.toDF(*new_column_spaces_lower) 
  except Exception as err:
        error_message = f"Erro ao normalizar nomes das colunas: {str(err)}"
        print(error_message)
        raise ValueError(error_message)

## Função para retirar os acentos de todas as colunas de um dataframe
def normalizar_acentos(str):
  try:
    new_str = str
    # Remover espaços nas extremidades
    new_str = new_str.strip()
    # Replace de carácteres especiais por underline
    new_str = re.sub(r"[^\w]", "_", new_str)
    # Remover underlines nas extremidades
    new_str = new_str.strip("_")
    # Remover 2 underlines juntos e deixar apenas 1
    new_str = new_str.replace("__", "_")
    # Remover acentuação
    new_str = unicodedata.normalize('NFKD', new_str)
    new_str = u"".join([c for c in new_str if not unicodedata.combining(c)])
    return new_str
  except Exception as err:
    error_message = f"Erro ao normalizar nomes das colunas: {str(err)}"
    print(error_message)
    raise ValueError(error_message)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Utiliza as funções no Dataframe

In [8]:
renamed_df = normalizar_colunas(rais2020_csv)

rais2020_renamed = renamed_df.select([spkFn.col(col).alias(normalizar_acentos(col)) for col in renamed_df.columns])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# print(rais2020_renamed.printSchema())

#### Normalização das colunas de remuneração e outras
> - As colunas de `remuneração` <br>
    - Utiliza a função `regexp_replace` para fazer um replace de `,` ***vírgula*** para `.` ***ponto*** <br>
    - Converte para o tipo de dado `double`
    
- Cria a coluna `ano` com a informação do ano do dataset
- Cria a coluna `uf` com os dois primeiro caracteres da coluna `municipio` e converte para o tipo de dado `inteiro`
- Converte a coluna `mes_desligamento` para o tipo de dado `inteiro`


In [9]:
rais2020_fim = (
                 rais2020_renamed
                        .withColumn("ano", lit("2020").cast('int'))
                        .withColumn("uf", col("municipio").cast('string').substr(1,2).cast('int'))
                        .withColumn("mes_desligamento", col('mes_desligamento').cast('int'))
                        .withColumn("vl_remun_dezembro_nom", regexp_replace("vl_remun_dezembro_nom", ',', '.').cast('double'))
                        .withColumn("vl_remun_dezembro_sm", regexp_replace("vl_remun_dezembro_sm", ',', '.').cast('double'))
                        .withColumn("vl_remun_media_nom", regexp_replace("vl_remun_media_nom", ',', '.').cast('double'))
                        .withColumn("vl_remun_media_sm", regexp_replace("vl_remun_media_sm", ',', '.').cast('double'))
                        .withColumn("vl_rem_janeiro_sc", regexp_replace("vl_rem_janeiro_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_fevereiro_sc", regexp_replace("vl_rem_fevereiro_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_marco_sc", regexp_replace("vl_rem_marco_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_abril_sc", regexp_replace("vl_rem_abril_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_maio_sc", regexp_replace("vl_rem_maio_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_junho_sc", regexp_replace("vl_rem_junho_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_julho_sc", regexp_replace("vl_rem_julho_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_agosto_sc", regexp_replace("vl_rem_agosto_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_setembro_sc", regexp_replace("vl_rem_setembro_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_outubro_sc", regexp_replace("vl_rem_outubro_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_novembro_sc", regexp_replace("vl_rem_novembro_sc", ',', '.').cast('double'))
                        .drop("vl_remun_dezembro__sm", "vl_remun_media__sm")
                )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
def func_display(df, rows):
    return df.pandas_api().head(rows)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
rais2020_fim.createOrReplaceTempView("vw_df")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
##### 01 - Qual é o SEGUNDO motivo de desligamento mais frequente?
spark.sql("""
                    WITH seg_desl_mais_freq
                    AS (
                        SELECT ROW_NUMBER() OVER(ORDER BY COUNT(motivo_desligamento) DESC) AS Row,
                            motivo_desligamento,
                            count(*) AS total
                        FROM vw_df
                        GROUP BY motivo_desligamento
                        ORDER BY total DESC
                        LIMIT 2
                    )
                    SELECT *
                    FROM seg_desl_mais_freq
                    WHERE Row = '2'
                """)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-------------------+-------+
|Row|motivo_desligamento|  total|
+---+-------------------+-------+
|  2|                 11|9819986|
+---+-------------------+-------+

In [14]:
(
    rais2020_fim.write
                .format('parquet')
                .mode('overwrite')
                .partitionBy('ano', 'uf')
                .save(pathBronze)
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
rais2020_parquet = (
                      spark.read
                           .format('parquet')
                           .load(pathBronze)
                   )   

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
rais2020_parquet.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

74117924

In [17]:
rais2020_parquet.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- bairros_sp: string (nullable = true)
 |-- bairros_fortaleza: string (nullable = true)
 |-- bairros_rj: string (nullable = true)
 |-- causa_afastamento_1: integer (nullable = true)
 |-- causa_afastamento_2: integer (nullable = true)
 |-- causa_afastamento_3: integer (nullable = true)
 |-- motivo_desligamento: integer (nullable = true)
 |-- cbo_ocupacao_2002: string (nullable = true)
 |-- cnae_2_0_classe: integer (nullable = true)
 |-- cnae_95_classe: integer (nullable = true)
 |-- distritos_sp: string (nullable = true)
 |-- vinculo_ativo_31_12: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)
 |-- faixa_hora_contrat: integer (nullable = true)
 |-- faixa_remun_dezem_sm: integer (nullable = true)
 |-- faixa_remun_media_sm: integer (nullable = true)
 |-- faixa_tempo_emprego: integer (nullable = true)
 |-- escolaridade_apos_2005: double (nullable = true)
 |-- qtd_hora_contr: double (nullable = true)
 |-- idade: double (nullable = true)
 |-- ind_cei_vinculado:

In [18]:
spark.stop()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…