# Notebook for ingesting ENEM data

This notebook aims to ingest ENEM microdata stored on the [INEP](http://portal.inep.gov.br/web/guest/microdados) website, where it basically consists of the following steps:

1. Prepare the environment (install unzips, create folders, define variables and import python libraries)
2. Create Python functions that will extract the data from the unzipped file, convert and upload it to the S3 bucket
3. Download the files from the INEP website according to the list defined in the **step 1** variable
4. Perform the transformation functions defined in **step 2** in the downloaded .zip files
5. Create the database in AWS Glue, then create and run the crawler that will register the tables in the data catalog

After completing this notebook, you can proceed with the data transformation, exploration and consumption processes in Athena.

*Note: it is a prerequisite for running this notebook to create the CloudFormation stack as specified in the aws-samples repository.*

---

### 1.1- *Download and binary installation for rar uncompress (one of the downloaded files contains a .rar file)*

In [None]:
%%bash

wget https://www.rarlab.com/rar/rarlinux-x64-5.9.1.tar.gz  
tar -xvzf rarlinux-x64-5.9.1.tar.gz
rm rarlinux-x64-5.9.1.tar.gz

### 1.2- *Download and install wget library in Python*

In [None]:
!pip install wget

### 1.3- *Import Python libraries that will be used by code in next cells*

In [None]:
import zipfile
import wget
import fnmatch
import os
import gzip
import boto3
import botocore
import sys
import shutil
from zipfile import ZipFile
from pprint import pprint

### 1.4- *Variables definition and folders criation to store zip packages downloaded and microdata files in csv format*

In [None]:
#define variáveis e cria diretorios de trabalho 
zipdir = 'zips'
outdir = 'microdados'
unrarexec = "/home/ec2-user/SageMaker/aws-edu-exam-analytics/notebooks/rar/unrar"
list_arq=[
          'microdados_enem2012'
          ,'microdados_enem2013'
          ,'microdados_enem2014'
          ,'microdados_enem2015'
          ,'microdados_enem2016'
          ,'microdados_enem2017'
          ,'microdados_enem2018'
          ,'microdados_enem_2019'
]

#troque o nome do bucket para o criado no stack do Cloudformation
region='us-east-1'
bucket='martinig-inep-data'
folder='data'
glue_bucket='martinig-glue-job-scripts'
state_machine='arn:aws:states:us-east-1:856860420093:stateMachine:EnemEtlStateMachine'

print(list_arq)
os.mkdir(zipdir)
os.mkdir(outdir)

### 2- *Function blocks to provide wget download status, uncompress, transform, and upload data to S3 bucket*

In [None]:
#bloco de funções
#status do download wget
def bar_custom(current, total, width=80):
    progress_message = "Downloading: %d%% [%d / %d] bytes" % (current / total * 100, current, total)
    sys.stdout.write("\r" + progress_message)

#converte pra utf8 e comprime
def convert_compress_file(csvfile):
    year=csvfile[-8:-4]
    filenameout="microdados/MICRODADOS_ENEM_"+year+".csv.gz"
    print("Convertendo "+csvfile+" para utf-8 e compactando para "+filenameout+"...")

    #converte, compacta e remove aspas se for o caso
    with open(csvfile,encoding='cp1252') as filein,gzip.open(filenameout,'wt',encoding='utf8') as fileout:
        for line in filein:
            fileout.write(line.replace('"', ''))
    os.remove(csvfile)
    return filenameout

#carrega dados no bucket
def upload_s3(upfile,bucket,folder):
    year=upfile[-11:-7]
    s3 = boto3.resource('s3')
    data = open(upfile, "rb")
    key = folder + '/enem_microdados_' + year + '/' + os.path.basename(upfile)
    print("Carregando "+key+" para o bucket "+bucket)
    s3.Bucket(bucket).put_object(Key=key, Body=data)

            
# transformação e upload do arquivo csv
def microdados_transform(microfile):
    pattern1 = "*/DADOS_*.csv"
    pattern2 = "*/[Mm][Ii][Cc][Rr][Oo]*.csv"

    with ZipFile(microfile, "r") as zipObj:
        listOfiles = zipObj.namelist()
        # Se for arquivo rar
        if fnmatch.filter(listOfiles, "*.rar"):
            rarfile = fnmatch.filter(listOfiles, "*.rar")[0]
            print("Arquivo rar " + rarfile)
            zipObj.extractall()
            unrarlb = unrarexec + " lb " + rarfile + " | grep MICRO | grep csv"
            extractfile = os.popen(unrarlb).readline().rstrip("\r\n")
            print("Extraindo arquivo " + extractfile)
            print(unrarexec + " e " + rarfile + " " + extractfile)
            os.system(unrarexec + " e " + rarfile + " " + extractfile)
            print("Movendo arquivo para pasta microdados")
            finalfile = "microdados/" + os.path.basename(extractfile)
            os.rename(os.path.basename(extractfile), finalfile)
            os.remove(rarfile)
        else:
            for extractfile in fnmatch.filter(listOfiles, "*.csv"):
                if fnmatch.fnmatch(extractfile, pattern1) or fnmatch.fnmatch(
                    extractfile, pattern2
                ):
                    print("Arquivo zip " + microfile)
                    print("Extraindo arquivo " + extractfile)
                    zipObj.extract(extractfile)
                    print("Movendo arquivo para pasta microdados")
                    finalfile = "microdados/" + os.path.basename(extractfile)
                    os.rename(extractfile, finalfile)
                    basepath = extractfile.split("/")[0]
                    print("Removendo " + basepath)
                    shutil.rmtree(basepath)
    return finalfile

# retorna nome do arquivo wheel para o job
def find_wheel():
    for file in os.listdir("../jobs"):
        if file.endswith(".whl"):
            return file

### 3- *Download of microdata file packages from INEP site according to file list in variable previously defined*

In [None]:
#Download dos arquivos (com base em list_arq)

for item in list_arq:
    year=item[-4:]
    if os.path.isfile('zips/'+item+'.zip'):
        print("arquivo "+item+".zip já existe")
    else:
        print("carregando arquivo "+item+"...")
        url='http://download.inep.gov.br/microdados/'+item+'.zip'
        wget.download(url,bar=bar_custom, out='zips')
        print(" ok")

print("fim dos downloads")

### 4- *Execute in loop for each downloaded .zip file the uncompress, convertion and upload functions*

In [None]:
#loop completo
for filename in sorted(os.listdir('zips')):
    print(">>Processando zips/"+filename)
    #extraindo arquivo csv
    result_tr=microdados_transform('zips/'+filename)
    
    #convertendo e comprimindo
    result_conv=convert_compress_file(result_tr)

    #enviando para o bucket s3
    upload_s3(result_conv,bucket,folder)


### 5.1- *Initialize boto3 object to execute AWS Glue related tasks*

In [None]:
glue_client = boto3.client('glue', region_name=region)

### 5.2- *Run crawler*

In [None]:
response = glue_client.start_crawler(Name='enem-crawler')
pprint(response)

### 5.3- *Build Glue job dependencies and upload to S3*

In [None]:
os.system("python ../jobs/SetupPythonShell.py bdist_wheel --dist-dir ../jobs")
pprint("wheel criado na pasta ../jobs/")

In [None]:
wheelfile = find_wheel()
jobfile = "GenerateSummaryData.py" # no  script, troque o nome do bucket S3 e do database do Glue

In [None]:
# s3 client
s3 = boto3.resource("s3")

# upload dependencies
data = open("../jobs/" + wheelfile, "rb")
key = os.path.basename(wheelfile)
s3.Bucket(glue_bucket).put_object(Key=key, Body=data)

# upload job script
data = open("../jobs/" + jobfile, "rb")
key = os.path.basename(jobfile)
s3.Bucket(glue_bucket).put_object(Key=key, Body=data)

### 5.4- *Create Glue job*

In [None]:
glue_client.create_job(
    Name="GenerateSummaryData",
    Role="SummaryJobRole",
    MaxCapacity=1,
    Command={
        'Name': 'pythonshell',
        'ScriptLocation': f's3://{glue_bucket}/{jobfile}',
        'PythonVersion': '3'
    },
    DefaultArguments={
        '--extra-py-files': f's3://{glue_bucket}/{os.path.basename(wheelfile)}'
    },
)

### 6.1- *Initialize boto3 object to execute AWS Step Functions state machine*

In [None]:
stepfunctions = boto3.client("stepfunctions")

### 6.2- *Run state machine*

In [None]:
stepfunctions.start_execution(stateMachineArn=state_machine)

---
### After executing steps above, it's available in Glue Data Catalog the microdata tables, that can be used in Amazon Athena and other tools/services.