<a href="https://colab.research.google.com/github/eder1985/igti-bootcamp-eng-dados-cloud/blob/main/igti_edc_des1_analise_rais_2020.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h1><center>Introduction to Google Colab and PySpark</center></h1>

<a id='installing-spark'></a>
### Installing Spark

Install Dependencies:


1.   Java 8
2.   Apache Spark with hadoop and
3.   Findspark (used to locate the spark in the system)


In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

Set Environment Variables:

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
!ls

sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

<a id='exploring-the-dataset'></a>
## Exploring the Dataset

<a id='loading-the-dataset'></a>
### Loading the Dataset: Downloading dados da RAIS 2020


In [5]:
!wget ftp://ftp.mtps.gov.br/pdet/microdados/RAIS/2020/RAIS_VINC_PUB_CENTRO_OESTE.7z
!wget ftp://ftp.mtps.gov.br/pdet/microdados/RAIS/2020/RAIS_VINC_PUB_MG_ES_RJ.7z
!wget ftp://ftp.mtps.gov.br/pdet/microdados/RAIS/2020/RAIS_VINC_PUB_NORDESTE.7z#
!wget ftp://ftp.mtps.gov.br/pdet/microdados/RAIS/2020/RAIS_VINC_PUB_NORTE.7z
!wget ftp://ftp.mtps.gov.br/pdet/microdados/RAIS/2020/RAIS_VINC_PUB_SUL.7z
!wget ftp://ftp.mtps.gov.br/pdet/microdados/RAIS/2020/RAIS_VINC_PUB_SP.7z

--2023-08-03 22:12:57--  ftp://ftp.mtps.gov.br/pdet/microdados/RAIS/2020/RAIS_VINC_PUB_CENTRO_OESTE.7z
           => ‘RAIS_VINC_PUB_CENTRO_OESTE.7z’
Resolving ftp.mtps.gov.br (ftp.mtps.gov.br)... 189.9.32.26
Connecting to ftp.mtps.gov.br (ftp.mtps.gov.br)|189.9.32.26|:21... connected.
Logging in as anonymous ... Logged in!
==> SYST ... done.    ==> PWD ... done.
==> TYPE I ... done.  ==> CWD (1) /pdet/microdados/RAIS/2020 ... done.
==> SIZE RAIS_VINC_PUB_CENTRO_OESTE.7z ... 223045956
==> PASV ... done.    ==> RETR RAIS_VINC_PUB_CENTRO_OESTE.7z ... done.
Length: 223045956 (213M) (unauthoritative)


2023-08-03 22:14:03 (3.38 MB/s) - ‘RAIS_VINC_PUB_CENTRO_OESTE.7z’ saved [223045956]

--2023-08-03 22:14:03--  ftp://ftp.mtps.gov.br/pdet/microdados/RAIS/2020/RAIS_VINC_PUB_MG_ES_RJ.7z
           => ‘RAIS_VINC_PUB_MG_ES_RJ.7z’
Resolving ftp.mtps.gov.br (ftp.mtps.gov.br)... 189.9.32.26
Connecting to ftp.mtps.gov.br (ftp.mtps.gov.br)|189.9.32.26|:21... connected.
Logging in as anonymous ... Logg

In [6]:
!apt install p7zip-full

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
p7zip-full is already the newest version (16.02+dfsg-8).
0 upgraded, 0 newly installed, 0 to remove and 15 not upgraded.


### Creating data folders

In [7]:
!mkdir -p work/data/raw/rais/7z
!mkdir -p work/data/raw/rais/txt
!mkdir -p work/data/staging/rais/

In [8]:
!ls -la

total 2693236
drwxr-xr-x  1 root root      4096 Aug  3 22:32 .
drwxr-xr-x  1 root root      4096 Aug  3 22:08 ..
drwxr-xr-x  4 root root      4096 Aug  2 13:34 .config
-rw-r--r--  1 root root 223045956 Aug  3 22:14 RAIS_VINC_PUB_CENTRO_OESTE.7z
-rw-r--r--  1 root root 516815111 Aug  3 22:17 RAIS_VINC_PUB_MG_ES_RJ.7z
-rw-r--r--  1 root root 386159232 Aug  3 22:17 RAIS_VINC_PUB_NORDESTE.7z
-rw-r--r--  1 root root 125495152 Aug  3 22:18 RAIS_VINC_PUB_NORTE.7z
-rw-r--r--  1 root root 782803328 Aug  3 22:31 RAIS_VINC_PUB_SP.7z
-rw-r--r--  1 root root 494772628 Aug  3 22:21 RAIS_VINC_PUB_SUL.7z
drwxr-xr-x  1 root root      4096 Aug  2 13:34 sample_data
drwxr-xr-x 13 1000 1000      4096 Feb 22  2021 spark-3.1.1-bin-hadoop3.2
-rw-r--r--  1 root root 228721937 Feb 22  2021 spark-3.1.1-bin-hadoop3.2.tgz
drwxr-xr-x  3 root root      4096 Aug  3 22:32 work


In [9]:
!7z x RAIS_VINC_PUB_CENTRO_OESTE.7z -o/content/work/data/raw/rais/txt
!7z x RAIS_VINC_PUB_MG_ES_RJ.7z -o/content/work/data/raw/rais/txt
!7z x RAIS_VINC_PUB_NORDESTE.7z -o/content/work/data/raw/rais/txt
!7z x RAIS_VINC_PUB_NORTE.7z -o/content/work/data/raw/rais/txt
!7z x RAIS_VINC_PUB_SP.7z -o/content/work/data/raw/rais/txt
!7z x RAIS_VINC_PUB_SUL.7z -o/content/work/data/raw/rais/txt


7-Zip [64] 16.02 : Copyright (c) 1999-2016 Igor Pavlov : 2016-05-21
p7zip Version 16.02 (locale=en_US.UTF-8,Utf16=on,HugeFiles=on,64 bits,2 CPUs Intel(R) Xeon(R) CPU @ 2.20GHz (406F0),ASM,AES-NI)

Scanning the drive for archives:
  0M Scan         1 file, 223045956 bytes (213 MiB)

Extracting archive: RAIS_VINC_PUB_CENTRO_OESTE.7z
--
Path = RAIS_VINC_PUB_CENTRO_OESTE.7z
Type = 7z
Physical Size = 223045956
Headers Size = 170
Method = LZMA2:24
Solid = -
Blocks = 1

  0%      0% - RAIS_VINC_PUB_CENTRO_OESTE.txt                                       1% - RAIS_VINC_PUB_CENTRO_OESTE.txt                                       2% - RAIS_VINC_PUB_CENTRO_OESTE.txt                                       3% - RAIS_VINC_PUB_CENTRO_OESTE.txt

In [10]:
!mv *.7z /content/work/data/raw/rais/7z/

In [11]:
from pyspark.sql.types import StructType,StructField, StringType, StringType

raw_rais_schema = StructType([
    StructField('bairros_sp',StringType(),True),
    StructField('bairros_fortaleza',StringType(),True),
    StructField('bairros_rj',StringType(),True),
    StructField('causa_afastamento_1',StringType(),True),
    StructField('causa_afastamento_2',StringType(),True),
    StructField('causa_afastamento_3',StringType(),True),
    StructField('motivo_desligamento',StringType(),True),
    StructField('cbo_ocupacao_2002',StringType(),True),
    StructField('cnae_2.0_classe',StringType(),True),
    StructField('cnae_95_classe',StringType(),True),
    StructField('distritos_sp',StringType(),True),
    StructField('vinculo_ativo_31_12',StringType(),True),
    StructField('faixa_etaria',StringType(),True),
    StructField('faixa_hora_contrat',StringType(),True),
    StructField('faixa_remun_dezem_sm',StringType(),True),
    StructField('faixa_remun_media_sm',StringType(),True),
    StructField('faixa_tempo_emprego',StringType(),True),
    StructField('escolaridade_apos_2005',StringType(),True),
    StructField('qtd_hora_contr',StringType(),True),
    StructField('idade',StringType(),True),
    StructField('ind_cei_vinculado',StringType(),True),
    StructField('ind_simples',StringType(),True),
    StructField('mes_admissao',StringType(),True),
    StructField('mes_desligamento',StringType(),True),
    StructField('mun_trab',StringType(),True),
    StructField('municipio',StringType(),True),
    StructField('nacionalidade',StringType(),True),
    StructField('natureza_juridica',StringType(),True),
    StructField('ind_portador_defic',StringType(),True),
    StructField('qtd_dias_afastamento',StringType(),True),
    StructField('raca_cor',StringType(),True),
    StructField('regioes_adm_df',StringType(),True),
    StructField('vl_remun_dezembro_nom',StringType(),True),
    StructField('vl_remun_dezembro_sm',StringType(),True),
    StructField('vl_remun_media_nom',StringType(),True),
    StructField('vl_remun_media_sm',StringType(),True),
    StructField('cnae_2.0_subclasse',StringType(),True),
    StructField('sexo_trabalhador',StringType(),True),
    StructField('tamanho_estabelecimento',StringType(),True),
    StructField('tempo_emprego',StringType(),True),
    StructField('tipo_admissao',StringType(),True),
    StructField('tipo_estab41',StringType(),True),
    StructField('tipo_estab42',StringType(),True),
    StructField('tipo_defic',StringType(),True),
    StructField('tipo_vinculo',StringType(),True),
    StructField('ibge_subsetor',StringType(),True),
    StructField('vl_rem_janeiro_sc',StringType(),True),
    StructField('vl_rem_fevereiro_sc',StringType(),True),
    StructField('vl_rem_marco_sc',StringType(),True),
    StructField('vl_rem_abril_sc',StringType(),True),
    StructField('vl_rem_maio_sc',StringType(),True),
    StructField('vl_rem_junho_sc',StringType(),True),
    StructField('vl_rem_julho_sc',StringType(),True),
    StructField('vl_rem_agosto_sc',StringType(),True),
    StructField('vl_rem_setembro_sc',StringType(),True),
    StructField('vl_rem_outubro_sc',StringType(),True),
    StructField('vl_rem_novembro_sc',StringType(),True),
    StructField('ano_chegada_brasil',StringType(),True),
    StructField('ind_trab_intermitente',StringType(),True),
    StructField('ind_trab_parcial',StringType(),True)
])

In [12]:
# Load data from csv to a dataframe.
# header=True means the first row is a header
# sep=';' means the column are seperated using ''
raw_df = spark.read.schema(raw_rais_schema).csv('/content/work/data/raw/rais/txt', header=True, sep=";", encoding="latin1")
raw_df.show(5)

+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-----------------+---------------+--------------+--------------------+-------------------+------------+------------------+--------------------+--------------------+-------------------+----------------------+--------------+-----+-----------------+-----------+------------+----------------+--------+---------+-------------+-----------------+------------------+--------------------+--------+--------------+---------------------+--------------------+------------------+-----------------+------------------+--------------------+-----------------------+-------------+-------------+------------+--------------------+----------+------------+-------------+-----------------+-------------------+---------------+---------------+--------------+---------------+---------------+----------------+------------------+-----------------+------------------+------------------+

In [None]:
raw_df.count()

65921194

<a id='dataframe-schema'></a>
### Dataframe Schema

In [13]:
raw_df.printSchema()

root
 |-- bairros_sp: string (nullable = true)
 |-- bairros_fortaleza: string (nullable = true)
 |-- bairros_rj: string (nullable = true)
 |-- causa_afastamento_1: string (nullable = true)
 |-- causa_afastamento_2: string (nullable = true)
 |-- causa_afastamento_3: string (nullable = true)
 |-- motivo_desligamento: string (nullable = true)
 |-- cbo_ocupacao_2002: string (nullable = true)
 |-- cnae_2.0_classe: string (nullable = true)
 |-- cnae_95_classe: string (nullable = true)
 |-- distritos_sp: string (nullable = true)
 |-- vinculo_ativo_31_12: string (nullable = true)
 |-- faixa_etaria: string (nullable = true)
 |-- faixa_hora_contrat: string (nullable = true)
 |-- faixa_remun_dezem_sm: string (nullable = true)
 |-- faixa_remun_media_sm: string (nullable = true)
 |-- faixa_tempo_emprego: string (nullable = true)
 |-- escolaridade_apos_2005: string (nullable = true)
 |-- qtd_hora_contr: string (nullable = true)
 |-- idade: string (nullable = true)
 |-- ind_cei_vinculado: string (nul

<a id='dataframe-operations-on-columns'></a>
## DataFrame Operations on Columns

In [14]:
from pyspark.sql import functions as f

In [17]:
raw_df = raw_df.withColumn("uf", f.col("municipio").cast('string').substr(1,2).cast('int'))
raw_df.select("uf", "municipio").show(5)

+---+---------+
| uf|municipio|
+---+---------+
| 31|   312230|
| 31|   310500|
| 31|   313460|
| 31|   312660|
| 31|   313065|
+---+---------+
only showing top 5 rows



In [18]:
raw_df = (
    raw_df
    .withColumn("mes_desligamento", f.col('mes_desligamento').cast('int'))
    .withColumn("vl_remun_dezembro_nom", f.regexp_replace("vl_remun_dezembro_nom", ',', '.').cast('double'))
    .withColumn("vl_remun_dezembro_sm", f.regexp_replace("vl_remun_dezembro_sm", ',', '.').cast('double'))
    .withColumn("vl_remun_media_nom", f.regexp_replace("vl_remun_media_nom", ',', '.').cast('double'))
    .withColumn("vl_remun_media_sm", f.regexp_replace("vl_remun_media_sm", ',', '.').cast('double'))
    .withColumn("vl_rem_janeiro_sc", f.regexp_replace("vl_rem_janeiro_sc", ',', '.').cast('double'))
    .withColumn("vl_rem_fevereiro_sc", f.regexp_replace("vl_rem_fevereiro_sc", ',', '.').cast('double'))
    .withColumn("vl_rem_marco_sc", f.regexp_replace("vl_rem_marco_sc", ',', '.').cast('double'))
    .withColumn("vl_rem_abril_sc", f.regexp_replace("vl_rem_abril_sc", ',', '.').cast('double'))
    .withColumn("vl_rem_maio_sc", f.regexp_replace("vl_rem_maio_sc", ',', '.').cast('double'))
    .withColumn("vl_rem_junho_sc", f.regexp_replace("vl_rem_junho_sc", ',', '.').cast('double'))
    .withColumn("vl_rem_julho_sc", f.regexp_replace("vl_rem_julho_sc", ',', '.').cast('double'))
    .withColumn("vl_rem_agosto_sc", f.regexp_replace("vl_rem_agosto_sc", ',', '.').cast('double'))
    .withColumn("vl_rem_setembro_sc", f.regexp_replace("vl_rem_setembro_sc", ',', '.').cast('double'))
    .withColumn("vl_rem_outubro_sc", f.regexp_replace("vl_rem_outubro_sc", ',', '.').cast('double'))
    .withColumn("vl_rem_novembro_sc", f.regexp_replace("vl_rem_novembro_sc", ',', '.').cast('double'))
)

## Writing txt raw data to parquet staging data

In [None]:
(
    raw_df
    .coalesce(50)
    .write.mode('overwrite')
    .partitionBy('uf')
    .format('parquet')
    .save('/content/work/data/staging/rais/')
)