# Bootcamp da Xpe

## Engenharia de Dados em Cloud

### Módulo 1: Fundamentos em Arquitetura de Dados e Soluções em Nuvem


### Objetivos:
>> Implementação de um Data Lake; <br>
>> Armazenamento de dados em Storage camada Raw; <br>
>> Armazenamento de dados em Storage camada Bronze; <br>
>> Armazenamento de dados em Storage camada Silver; <br>
>> Implementação de Processamento de Big Data; <br>
>> IaC de toda estrutura com Terraform; <br>
>> Esteiras de Deploy com Github. <br>


### Esse notebook trata dos itens 2 e 3 do desafio
2. Realizar tratamento no dataset da RAIS 2020  <br>
    a. Modifique os nomes das colunas, trocando espaços por “_”; <br>
    b. Retire acentos e caracter especiais das colunas; <br>
    c. Transforme todas as colunas em letras minúsculas; <br>
    d. Crie uma coluna “uf” através da coluna "municipio"; <br>
    e. Realize os ajustes no tipo de dado para as colunas de remuneração.

3. Transformar os dados no formato parquet e escrevê-los na zona staging ou zona silver do seu Data Lake.

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, input_file_name, regexp_replace
from pyspark.sql import functions as spkFn
import unicodedata
import re

In [50]:
print(spark.version)

3.3.1


#### Inicia uma `Session` do Spark

In [2]:
spark = (SparkSession.builder
                     .appName("readFile")
                     .getOrCreate())  

#### Path para diretório source e target

In [4]:
productRais = "RAIS2020"
pathRaw     = f"C:/aws/bucket_dow_up/data_up/{productRais}/"
pathBronze  = f'C:/aws/dados/dados_bronze/{productRais}/parquet/'

#### Schema DDL

In [5]:
fileNameSchema = "RAIS_VINC_PUB_NORTE.txt.bz2"
filePathSchema = f"{pathRaw}/{fileNameSchema}"

fileDfSchema   = (spark.read
                       .format("csv")
                       .option("header","true")
                       .option("sep", ";")
                       .option("encoding", "latin1")
                       .option("inferSchema", "true")
                       .load(filePathSchema)
                       .schema)

schemaJson     = fileDfSchema.json()
schemaDDL      = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schemaJson).toDDL()

#### Ler arquivos com a API DataFrameRead

In [14]:
## Read
rais2020_csv = (spark.read
                     .format("csv")
                     .option("header","true")
                     .option("sep", ";")
                     .option("encoding", "latin1")
                     .option("inferSchema", "true")
                     .schema(schemaDDL)
                     .load(pathRaw)
                     .withColumn("file_name", lit(input_file_name())))

In [15]:
print(rais2020_csv.printSchema())

root
 |-- Bairros SP: string (nullable = true)
 |-- Bairros Fortaleza: string (nullable = true)
 |-- Bairros RJ: string (nullable = true)
 |-- Causa Afastamento 1: integer (nullable = true)
 |-- Causa Afastamento 2: integer (nullable = true)
 |-- Causa Afastamento 3: integer (nullable = true)
 |-- Motivo Desligamento: integer (nullable = true)
 |-- CBO Ocupação 2002: string (nullable = true)
 |-- CNAE 2.0 Classe: integer (nullable = true)
 |-- CNAE 95 Classe: integer (nullable = true)
 |-- Distritos SP: string (nullable = true)
 |-- Vínculo Ativo 31/12: integer (nullable = true)
 |-- Faixa Etária: integer (nullable = true)
 |-- Faixa Hora Contrat: integer (nullable = true)
 |-- Faixa Remun Dezem (SM): integer (nullable = true)
 |-- Faixa Remun Média (SM): integer (nullable = true)
 |-- Faixa Tempo Emprego: integer (nullable = true)
 |-- Escolaridade após 2005: double (nullable = true)
 |-- Qtd Hora Contr: double (nullable = true)
 |-- Idade: double (nullable = true)
 |-- Ind CEI Vincul

### Funções para normalizar as colunas

> ***normalizar_colunas***
1. Função para normalizar as colunas de um dataframe
    - Retira espaços vazios e incluir um underline (Ex: `Sobre Nome -> Sobre_Nome`)
    - Retira ponto e incluir um underline (Ex: `Sobre.Nome -> Sobre_Nome`)
    - Formata todas as colunas com letras minúsculas (Ex: `Sobre_Nome -> sobre_nome`)

> ***normalizar_acentos***
2. Função para retirar os acentos de todas as colunas de um dataframe
    - Remover espaços nas extremidades (Ex: `"  sobre_nome  " -> "sobre_nome"`)
    - Replace de carácteres especiais por underline (Ex: `sobre@nome -> sobre_nome`)
    - Remover underlines nas extremidades (Ex: `_sobre_nome_ -> sobre_nome`)
    - Remover acentuação (Ex: `média_mês -> media_mes`)

In [42]:
## Função para normalizar as colunas de um dataframe
def normalizar_colunas(df):
  try:
    new_column_spaces_lower = (list(map(lambda x: x.replace(" ", "_")
                                                   .replace(".", "_")
                                                   .lower(),
                                                 df.columns)))
    return df.toDF(*new_column_spaces_lower) 
  except Exception as err:
        error_message = f"Erro ao normalizar nomes das colunas: {str(err)}"
        print(error_message)
        raise ValueError(error_message)

## Função para retirar os acentos de todas as colunas de um dataframe
def normalizar_acentos(str):
  try:
    new_str = str
    # Remover espaços nas extremidades
    new_str = new_str.strip()
    # Replace de carácteres especiais por underline
    new_str = re.sub(r"[^\w]", "_", new_str)
    # Remover underlines nas extremidades
    new_str = new_str.strip("_")
    # Remover 2 underlines juntos e deixar apenas 1
    new_str = new_str.replace("__", "_")
    # Remover acentuação
    new_str = unicodedata.normalize('NFKD', new_str)
    new_str = u"".join([c for c in new_str if not unicodedata.combining(c)])
    return new_str
  except Exception as err:
    error_message = f"Erro ao normalizar nomes das colunas: {str(err)}"
    print(error_message)
    raise ValueError(error_message)

#### Utiliza as funções no Dataframe

In [43]:
renamed_df = normalizar_colunas(rais2020_csv)

rais2020_renamed = renamed_df.select([spkFn.col(col).alias(normalizar_acentos(col)) for col in renamed_df.columns])

In [44]:
print(rais2020_renamed.printSchema())

root
 |-- bairros_sp: string (nullable = true)
 |-- bairros_fortaleza: string (nullable = true)
 |-- bairros_rj: string (nullable = true)
 |-- causa_afastamento_1: integer (nullable = true)
 |-- causa_afastamento_2: integer (nullable = true)
 |-- causa_afastamento_3: integer (nullable = true)
 |-- motivo_desligamento: integer (nullable = true)
 |-- cbo_ocupacao_2002: string (nullable = true)
 |-- cnae_2_0_classe: integer (nullable = true)
 |-- cnae_95_classe: integer (nullable = true)
 |-- distritos_sp: string (nullable = true)
 |-- vinculo_ativo_31_12: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)
 |-- faixa_hora_contrat: integer (nullable = true)
 |-- faixa_remun_dezem_sm: integer (nullable = true)
 |-- faixa_remun_media_sm: integer (nullable = true)
 |-- faixa_tempo_emprego: integer (nullable = true)
 |-- escolaridade_apos_2005: double (nullable = true)
 |-- qtd_hora_contr: double (nullable = true)
 |-- idade: double (nullable = true)
 |-- ind_cei_vinculado:

#### Normalização das colunas de remuneração e outras
> - As colunas de `remuneração` <br>
    - Utiliza a função `regexp_replace` para fazer um replace de `,` ***vírgula*** para `.` ***ponto*** <br>
    - Converte para o tipo de dado `double`
    
- Cria a coluna `ano` com a informação do ano do dataset
- Cria a coluna `uf` com os dois primeiro caracteres da coluna `municipio` e converte para o tipo de dado `inteiro`
- Converte a coluna `mes_desligamento` para o tipo de dado `inteiro`


In [45]:
rais2020_fim = (
                 rais2020_renamed
                        .withColumn("ano", lit("2020").cast('int'))
                        .withColumn("uf", col("municipio").cast('string').substr(1,2).cast('int'))
                        .withColumn("mes_desligamento", col('mes_desligamento').cast('int'))
                        .withColumn("vl_remun_dezembro_nom", regexp_replace("vl_remun_dezembro_nom", ',', '.').cast('double'))
                        .withColumn("vl_remun_dezembro_sm", regexp_replace("vl_remun_dezembro_sm", ',', '.').cast('double'))
                        .withColumn("vl_remun_media_nom", regexp_replace("vl_remun_media_nom", ',', '.').cast('double'))
                        .withColumn("vl_remun_media_sm", regexp_replace("vl_remun_media_sm", ',', '.').cast('double'))
                        .withColumn("vl_rem_janeiro_sc", regexp_replace("vl_rem_janeiro_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_fevereiro_sc", regexp_replace("vl_rem_fevereiro_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_marco_sc", regexp_replace("vl_rem_marco_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_abril_sc", regexp_replace("vl_rem_abril_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_maio_sc", regexp_replace("vl_rem_maio_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_junho_sc", regexp_replace("vl_rem_junho_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_julho_sc", regexp_replace("vl_rem_julho_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_agosto_sc", regexp_replace("vl_rem_agosto_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_setembro_sc", regexp_replace("vl_rem_setembro_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_outubro_sc", regexp_replace("vl_rem_outubro_sc", ',', '.').cast('double'))
                        .withColumn("vl_rem_novembro_sc", regexp_replace("vl_rem_novembro_sc", ',', '.').cast('double'))
                        .drop("vl_remun_dezembro__sm", "vl_remun_media__sm")
                )

In [52]:
(
    rais2020_fim.write
                .format('parquet')
                .mode('overwrite')
                .partitionBy('ano', 'uf')
                .save(pathBronze)
)

Py4JJavaError: An error occurred while calling o1066.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:278)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 10) (DELL-GAMER executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.util.Arrays.copyOf(Arrays.java:3236)
	at java.lang.StringCoding.safeTrim(StringCoding.java:79)
	at java.lang.StringCoding.encode(StringCoding.java:365)
	at java.lang.String.getBytes(String.java:941)
	at org.apache.spark.unsafe.types.UTF8String.fromString(UTF8String.java:139)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$26(UnivocityParser.scala:224)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$$Lambda$3247/807350816.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:259)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$25(UnivocityParser.scala:224)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$$Lambda$3203/810818832.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:312)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:275)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$$Lambda$3227/2002008724.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:417)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$$$Lambda$3241/883053958.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:421)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$$$Lambda$3244/1160499787.apply(Unknown Source)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
	at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
	at org.apache.spark.sql.execution.SortExec$$Lambda$3071/2077647346.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.RDD$$Lambda$2553/1302281177.apply(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:245)
	... 41 more
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.util.Arrays.copyOf(Arrays.java:3236)
	at java.lang.StringCoding.safeTrim(StringCoding.java:79)
	at java.lang.StringCoding.encode(StringCoding.java:365)
	at java.lang.String.getBytes(String.java:941)
	at org.apache.spark.unsafe.types.UTF8String.fromString(UTF8String.java:139)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$26(UnivocityParser.scala:224)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$$Lambda$3247/807350816.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:259)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$25(UnivocityParser.scala:224)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$$Lambda$3203/810818832.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:312)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:275)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$$Lambda$3227/2002008724.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:417)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$$$Lambda$3241/883053958.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:421)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$$$Lambda$3244/1160499787.apply(Unknown Source)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
	at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
	at org.apache.spark.sql.execution.SortExec$$Lambda$3071/2077647346.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.RDD$$Lambda$2553/1302281177.apply(Unknown Source)


In [None]:
rais2020_parquet = (
                      spark.read
                           .format('parquet')
                           .load(pathBronze)
                   )   

In [None]:
rais2020_parquet.count()

74117924

In [None]:
rais2020_parquet.printSchema()

root
 |-- bairros_sp: string (nullable = true)
 |-- bairros_fortaleza: string (nullable = true)
 |-- bairros_rj: string (nullable = true)
 |-- causa_afastamento_1: integer (nullable = true)
 |-- causa_afastamento_2: integer (nullable = true)
 |-- causa_afastamento_3: integer (nullable = true)
 |-- motivo_desligamento: integer (nullable = true)
 |-- cbo_ocupacao_2002: string (nullable = true)
 |-- cnae_2_0_classe: integer (nullable = true)
 |-- cnae_95_classe: integer (nullable = true)
 |-- distritos_sp: string (nullable = true)
 |-- vinculo_ativo_31_12: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)
 |-- faixa_hora_contrat: integer (nullable = true)
 |-- faixa_remun_dezem__sm: integer (nullable = true)
 |-- faixa_remun_media__sm: integer (nullable = true)
 |-- faixa_tempo_emprego: integer (nullable = true)
 |-- escolaridade_apos_2005: double (nullable = true)
 |-- qtd_hora_contr: double (nullable = true)
 |-- idade: double (nullable = true)
 |-- ind_cei_vinculad