<a href="https://colab.research.google.com/github/GabrielData21/ETL-com-PySpark/blob/main/ETL_com_Python_e_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Pipeline de ETL com Python (PySpark)

**Descrição da atividade**

Esse projeto visa simular uma pipeline contendo importação e transformação de dados do ENEM em uma arquitetura medallion, cujo perfil consiste em 3 camadas de tratamento de dados, sendo uma camada bruta no momento da importação (bronze), uma segunda camada para pré-processamento (silver) e a camada final com os dados prontos para uso (gold).


## Instalando PySpark

Realizando a instalação do PySpark no notebook. Em versões anteriores era necessário instalar o ambiente com Hadoop e outros módulos, a partir da versão 3.4.0 é possível fazer a instação isolada.

In [1]:
# instalando PySpark 3.4.0
!pip install pyspark==3.4.0

Collecting pyspark==3.4.0
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [91m━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m30.2/310.8 MB[0m [31m56.7 MB/s[0m eta [36m0:00:05[0m
[?25h[31mERROR: Operation cancelled by user[0m[31m
[0m

In [2]:
# criando Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETL-enem").getOrCreate()
spark

ModuleNotFoundError: No module named 'pyspark'

# Organizando o ambiente para os arquivos do ETL

Criando as pastas que irão compor o projeto de acordo com a arquitetura medallion.

In [None]:
!mkdir -p enem_2020/1.bronze

In [None]:
!mkdir enem_2020/2.silver/ enem_2020/3.gold/

## Aquisição de dados ENEM

### Download e pré processamento dos dados

In [None]:
# Importando os arquivos
!wget https://download.inep.gov.br/microdados/microdados_enem_2020.zip --no-check-certificate

# verificando os arquivos na pasta após download
!ls -l

--2023-08-31 21:28:52--  https://download.inep.gov.br/microdados/microdados_enem_2020.zip
Resolving download.inep.gov.br (download.inep.gov.br)... 200.130.24.15
Connecting to download.inep.gov.br (download.inep.gov.br)|200.130.24.15|:443... connected.
  Unable to locally verify the issuer's authority.
HTTP request sent, awaiting response... 200 OK
Length: 620776982 (592M) [application/zip]
Saving to: ‘microdados_enem_2020.zip’


2023-08-31 21:50:33 (466 KB/s) - ‘microdados_enem_2020.zip’ saved [620776982/620776982]

total 606240
drwxr-xr-x 4 root root      4096 Aug 31 21:28 enem_2020
-rw-r--r-- 1 root root 620776982 Feb 16  2022 microdados_enem_2020.zip
drwxr-xr-x 1 root root      4096 Aug 30 13:25 sample_data


Os arquivos do ENEM são recebidos em formato compactado, para fazer a descompactação existem duas opções: usando comandos linux shell (através do unzip) ou usando o pacote zipfile do Python. Como este projeto visa uma prática em Python e PySpark, foi usado o pacote zipfile.

In [None]:
import zipfile

In [None]:
%%time

# realizando a descompactação dos arquivos
with zipfile.ZipFile('microdados_enem_2020.zip', 'r') as zip_ref:
    zip_ref.extractall('enem_2020')

CPU times: user 13 s, sys: 5.55 s, total: 18.5 s
Wall time: 22.5 s


Outro passo necessário é organizar os arquivos da descompactação, será usado apenas o arquivo principal para a transformação. Por isso ele será movido para a pasta /bronze.

Mover arquivos entre pastas usando comandos linux shell é bem simples através do comando mv. Novamente, para manter o treinamento voltado para Python, usaremos o pacote shutil para realizar a movimentação.

In [None]:
import shutil

In [4]:
# criando variáveis para movimentação de arquivo
source = "/content/enem_2020/DADOS/MICRODADOS_ENEM_2020.csv"
destination = "/content/enem_2020/raw/MICRODADOS_ENEM_2020.csv"

# movimentando o arquivo
shutil.move(source, destination)

SyntaxError: invalid syntax (<ipython-input-4-305365f6d9cd>, line 1)

In [None]:
%%time

# importando pacote Functions para manipulação de dados
import pyspark.sql.functions

# primeira leitura dos dados descompactados
mida = spark.read.csv('/content/enem_2020/raw/MICRODADOS_ENEM_2020.csv', header= True, sep=';', encoding='ISO-8859-1')
mida.show(10)

+------------+------+---------------+-------+---------------+-----------+----------------+---------------+---------------+---------+---------+------------+----------------+-------------------+---------+---------+----------------------+------------------+---------------+------------------+-------------------+-----------+-----------+--------------+--------------+--------------+--------------+-----------+-----------+-----------+-----------+----------+----------+----------+----------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+-----------------+-------------+-------------+-------------+-------------+-------------+---------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|NU_INSCRICAO|NU_ANO|TP_FAIXA_ETARIA|TP_SEXO|TP_ESTADO_CIVIL|TP_COR_RACA|TP_NACIONALIDADE|TP_ST_CONCLUSAO|T

In [None]:
"""
Verificando as colunas do DataFrame, avaliando junto à documetação do dicionário dos dados
sobre o conteúdo, formato...
"""
mida.printSchema()

['NU_INSCRICAO',
 'NU_ANO',
 'TP_FAIXA_ETARIA',
 'TP_SEXO',
 'TP_ESTADO_CIVIL',
 'TP_COR_RACA',
 'TP_NACIONALIDADE',
 'TP_ST_CONCLUSAO',
 'TP_ANO_CONCLUIU',
 'TP_ESCOLA',
 'TP_ENSINO',
 'IN_TREINEIRO',
 'CO_MUNICIPIO_ESC',
 'NO_MUNICIPIO_ESC',
 'CO_UF_ESC',
 'SG_UF_ESC',
 'TP_DEPENDENCIA_ADM_ESC',
 'TP_LOCALIZACAO_ESC',
 'TP_SIT_FUNC_ESC',
 'CO_MUNICIPIO_PROVA',
 'NO_MUNICIPIO_PROVA',
 'CO_UF_PROVA',
 'SG_UF_PROVA',
 'TP_PRESENCA_CN',
 'TP_PRESENCA_CH',
 'TP_PRESENCA_LC',
 'TP_PRESENCA_MT',
 'CO_PROVA_CN',
 'CO_PROVA_CH',
 'CO_PROVA_LC',
 'CO_PROVA_MT',
 'NU_NOTA_CN',
 'NU_NOTA_CH',
 'NU_NOTA_LC',
 'NU_NOTA_MT',
 'TX_RESPOSTAS_CN',
 'TX_RESPOSTAS_CH',
 'TX_RESPOSTAS_LC',
 'TX_RESPOSTAS_MT',
 'TP_LINGUA',
 'TX_GABARITO_CN',
 'TX_GABARITO_CH',
 'TX_GABARITO_LC',
 'TX_GABARITO_MT',
 'TP_STATUS_REDACAO',
 'NU_NOTA_COMP1',
 'NU_NOTA_COMP2',
 'NU_NOTA_COMP3',
 'NU_NOTA_COMP4',
 'NU_NOTA_COMP5',
 'NU_NOTA_REDACAO',
 'Q001',
 'Q002',
 'Q003',
 'Q004',
 'Q005',
 'Q006',
 'Q007',
 'Q008',
 'Q009

# Transformação dos dados

In [None]:
"""
Para realizar o tratamento de forma mais eficiente, o arquivo será transformado em Parquet.
É possível atribuir o destino do arquivo em uma variável para usar na função
de salvar o arquivo no formato desejado, por isso ao alterar o formato o arquivo será salvo na pasta /silver.
"""

# Caminho para salvar o arquivo Parquet
parquet_path = "/content/enem_2020/silver/MICRODADOS_ENEM_2020.parquet"

# Salvar o DataFrame como arquivo Parquet
mida.write.mode("overwrite").parquet(parquet_path)

In [None]:
%%time

# Pra ler o arquivo Parquet, pode usar a mesma variável criada para alocar o aquivo na célula anterior
mida_parquet = spark.read.parquet(parquet_path)
mida_parquet.show(10)

+------------+------+---------------+-------+---------------+-----------+----------------+---------------+---------------+---------+---------+------------+----------------+-------------------+---------+---------+----------------------+------------------+---------------+------------------+-------------------+-----------+-----------+--------------+--------------+--------------+--------------+-----------+-----------+-----------+-----------+----------+----------+----------+----------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+-----------------+-------------+-------------+-------------+-------------+-------------+---------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|NU_INSCRICAO|NU_ANO|TP_FAIXA_ETARIA|TP_SEXO|TP_ESTADO_CIVIL|TP_COR_RACA|TP_NACIONALIDADE|TP_ST_CONCLUSAO|T

Parte do processo de ETL é a seleção das colunas que contém os dados importantes para análises posteriores pela equipe de Analytics.

Em um dataset que contém muitas colunas como esse, para manter o código limpo é interessante colocar as colunas selecionadas para deleção em uma variável antes de colocar na função. Isso é realizado utilizando o documento de dicionário de dados.

In [None]:
# Imprimindo as colunas em uma lista
list(mida_parquet.columns)

['NU_INSCRICAO',
 'NU_ANO',
 'TP_FAIXA_ETARIA',
 'TP_SEXO',
 'TP_ESTADO_CIVIL',
 'TP_COR_RACA',
 'TP_NACIONALIDADE',
 'TP_ST_CONCLUSAO',
 'TP_ANO_CONCLUIU',
 'TP_ESCOLA',
 'TP_ENSINO',
 'IN_TREINEIRO',
 'CO_MUNICIPIO_ESC',
 'NO_MUNICIPIO_ESC',
 'CO_UF_ESC',
 'SG_UF_ESC',
 'TP_DEPENDENCIA_ADM_ESC',
 'TP_LOCALIZACAO_ESC',
 'TP_SIT_FUNC_ESC',
 'CO_MUNICIPIO_PROVA',
 'NO_MUNICIPIO_PROVA',
 'CO_UF_PROVA',
 'SG_UF_PROVA',
 'TP_PRESENCA_CN',
 'TP_PRESENCA_CH',
 'TP_PRESENCA_LC',
 'TP_PRESENCA_MT',
 'CO_PROVA_CN',
 'CO_PROVA_CH',
 'CO_PROVA_LC',
 'CO_PROVA_MT',
 'NU_NOTA_CN',
 'NU_NOTA_CH',
 'NU_NOTA_LC',
 'NU_NOTA_MT',
 'TX_RESPOSTAS_CN',
 'TX_RESPOSTAS_CH',
 'TX_RESPOSTAS_LC',
 'TX_RESPOSTAS_MT',
 'TP_LINGUA',
 'TX_GABARITO_CN',
 'TX_GABARITO_CH',
 'TX_GABARITO_LC',
 'TX_GABARITO_MT',
 'TP_STATUS_REDACAO',
 'NU_NOTA_COMP1',
 'NU_NOTA_COMP2',
 'NU_NOTA_COMP3',
 'NU_NOTA_COMP4',
 'NU_NOTA_COMP5',
 'NU_NOTA_REDACAO',
 'Q001',
 'Q002',
 'Q003',
 'Q004',
 'Q005',
 'Q006',
 'Q007',
 'Q008',
 'Q009

In [None]:
# Selecionando as colunas que não serão utilizadas
dropcols = ('TX_RESPOSTAS_CN',
 'TX_RESPOSTAS_CH',
 'TX_RESPOSTAS_LC',
 'TX_RESPOSTAS_MT',
 'TX_GABARITO_CN',
 'TX_GABARITO_CH',
 'TX_GABARITO_LC',
 'TX_GABARITO_MT',
 'Q001',
 'Q002',
 'Q003',
 'Q004',
 'Q005',
 'Q006',
 'Q007',
 'Q008',
 'Q009',
 'Q010',
 'Q011',
 'Q012',
 'Q013',
 'Q014',
 'Q015',
 'Q016',
 'Q017',
 'Q018',
 'Q019',
 'Q020',
 'Q021',
 'Q022',
 'Q023',
 'Q024',
 'Q025')

In [None]:
mida_parquet = mida_parquet.drop(*dropcols)
mida_parquet.show(10)

+------------+------+---------------+-------+---------------+-----------+----------------+---------------+---------------+---------+---------+------------+----------------+-------------------+---------+---------+----------------------+------------------+---------------+------------------+-------------------+-----------+-----------+--------------+--------------+--------------+--------------+-----------+-----------+-----------+-----------+----------+----------+----------+----------+---------+-----------------+-------------+-------------+-------------+-------------+-------------+---------------+
|NU_INSCRICAO|NU_ANO|TP_FAIXA_ETARIA|TP_SEXO|TP_ESTADO_CIVIL|TP_COR_RACA|TP_NACIONALIDADE|TP_ST_CONCLUSAO|TP_ANO_CONCLUIU|TP_ESCOLA|TP_ENSINO|IN_TREINEIRO|CO_MUNICIPIO_ESC|   NO_MUNICIPIO_ESC|CO_UF_ESC|SG_UF_ESC|TP_DEPENDENCIA_ADM_ESC|TP_LOCALIZACAO_ESC|TP_SIT_FUNC_ESC|CO_MUNICIPIO_PROVA| NO_MUNICIPIO_PROVA|CO_UF_PROVA|SG_UF_PROVA|TP_PRESENCA_CN|TP_PRESENCA_CH|TP_PRESENCA_LC|TP_PRESENCA_MT|CO_PROV

In [None]:
mida_parquet.count()

5783109

Até esse momento foi realizado uma transformação simples dos dados alterando o formato do arquivo e removendo colunas não essenciais para o trabalho de analytics. Agora o arquivo se encontra na pasta /silver.

In [None]:
# Listando as colunas
mida_parquet.columns

Para a área de Analytics, as colunas selecionadas serão renomeadas.

In [None]:
enem_gold = mida_parquet.withColumnsRenamed({
 'NU_INSCRICAO':'INSCRICAO',
 'NU_ANO':'ANO',
 'TP_FAIXA_ETARIA':'FAIXA_ETARIA',
 'TP_SEXO':'SEXO',
 'TP_COR_RACA':'COR_RACA',
 'TP_ST_CONCLUSAO':'ST_CONCLUSAO',
 'TP_ANO_CONCLUIU':'ANO_CONCLUIU',
 'TP_ESCOLA':'ESCOLA',
 'TP_ENSINO':'ENSINO',
 'IN_TREINEIRO':'TREINEIRO',
 'SG_UF_PROVA':'UF_PROVA',
 'TP_PRESENCA_CN':'PRE_C_NATURAIS',
 'TP_PRESENCA_CH':'PRE_C_HUMANAS',
 'TP_PRESENCA_LC':'PRE_LINGUAGENS',
 'TP_PRESENCA_MT':'PRE_MATEMATICA',
 'NU_NOTA_CN':'NOTA_C_NATURAIS',
 'NU_NOTA_CH':'NOTA_C_HUMANAS',
 'NU_NOTA_LC':'NOTA_LINGAGENS',
 'NU_NOTA_MT':'NOTA_MATEMATICA',
 'TP_LINGUA':'LINGUA_ESTR',
 'TP_STATUS_REDACAO':'STATUS_REDACAO',
 'NU_NOTA_COMP1':'NOTA_COMP1',
 'NU_NOTA_COMP2':'NOTA_COMP2',
 'NU_NOTA_COMP3':'NOTA_COMP3',
 'NU_NOTA_COMP4':'NOTA_COMP4',
 'NU_NOTA_COMP5':'NOTA_COMP5',
 'NU_NOTA_REDACAO':'NOTA_REDACAO'
 })

In [None]:
# Visualizando os dados com as colunas renomeadas
enem_gold.limit(5)

Analisando os tipos das variáveis do arquivo, estão todas classificadas como string. As variáveis que não condizem com essa modelagem para análise receberão um novo formato.

In [None]:
list(enem_gold.schema)

In [None]:
# Importando o pacote types do PySpark para alterar os dados do dataframe
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [5]:
# alterando o formato dos dados no df
enem_gold = enem_gold.select(
 enem_gold.INSCRICAO.cast(StringType()),
 enem_gold.ANO.cast(StringType()),
 enem_gold.FAIXA_ETARIA.cast(StringType()),
 enem_gold.SEXO.cast(StringType()),
 enem_gold.COR_RACA.cast(StringType()),
 enem_gold.ST_CONCLUSAO.cast(StringType()),
 enem_gold.ANO_CONCLUIU.cast(StringType()),
 enem_gold.ESCOLA.cast(StringType()),
 enem_gold.ENSINO.cast(StringType()),
 enem_gold.TREINEIRO.cast(StringType()),
 enem_gold.UF_PROVA.cast(StringType()),
 enem_gold.PRE_C_NATURAIS.cast(StringType()),
 enem_gold.PRE_C_HUMANAS.cast(StringType()),
 enem_gold.PRE_LINGUAGENS.cast(StringType()),
 enem_gold.PRE_MATEMATICA.cast(StringType()),
 enem_gold.NOTA_C_NATURAIS.cast(IntegerType()),
 enem_gold.NOTA_C_HUMANAS.cast(IntegerType()),
 enem_gold.NOTA_LINGAGENS.cast(IntegerType()),
 enem_gold.NOTA_MATEMATICA.cast(IntegerType()),
 enem_gold.LINGUA_ESTR.cast(StringType()),
 enem_gold.STATUS_REDACAO.cast(StringType()),
 enem_gold.NOTA_COMP1.cast(IntegerType()),
 enem_gold.NOTA_COMP2.cast(IntegerType()),
 enem_gold.NOTA_COMP3.cast(IntegerType()),
 enem_gold.NOTA_COMP4.cast(IntegerType()),
 enem_gold.NOTA_COMP5.cast(IntegerType()),
 enem_gold.NOTA_REDACAO.cast(IntegerType())
)

# visualizando o schema do df após alteração
list(enem_gold.schema)

NameError: name 'enem_refined' is not defined

In [None]:
enem_gold.limit(5)

Finalizando o arquivo agora com os dados tratados e selecionados, resta salvar no diretório do último estágio do processo de ETL: gold.

In [None]:
# Caminho para salvar o arquivo gold
gold_path = "/content/enem_2020/3.gold/ENEM_2020_GOLD.parquet"

# Salvar o DataFrame na pasta gold
dados_enem.write.parquet(gold_path)
#dados_enem.write.mode("overwrite").parquet(parquet_path)