### Processing raw data

This algorithm has the objective of preparing the relevant data to be exported to the database.

#### Processing the data from ENEM csv file

For this part of the algorithm I chose to use Dask because it is a library built on the very popular Pandas but that allows for parallel computing. It is lazy to execute code, meaning that it will not compute any calculations until they are needed. 

In [1]:
# Importing Dask

import dask.dataframe as dd

In [2]:
# Reading the csv file stored in an S3 bucket
# Encoding had to be adjusted to latin1 and delimiter to ';'

df = dd.read_csv('s3://sagemaker-studio-g94r3spx4fg/MICRODADOS_ENEM_2017.csv', delimiter=';', 
                 encoding='latin1', assume_missing=True)

In [4]:
# Check if file was read correctly

df.head(5)

Unnamed: 0,NU_INSCRICAO,NU_ANO,CO_MUNICIPIO_RESIDENCIA,NO_MUNICIPIO_RESIDENCIA,CO_UF_RESIDENCIA,SG_UF_RESIDENCIA,NU_IDADE,TP_SEXO,TP_ESTADO_CIVIL,TP_COR_RACA,...,Q018,Q019,Q020,Q021,Q022,Q023,Q024,Q025,Q026,Q027
0,170003300000.0,2017.0,3503208.0,Araraquara,35.0,SP,29.0,F,0.0,1.0,...,A,C,B,B,C,B,B,B,A,A
1,170003300000.0,2017.0,5002902.0,Cassilândia,50.0,MS,22.0,F,0.0,1.0,...,A,B,A,A,C,B,A,A,A,A
2,170001700000.0,2017.0,3550308.0,São Paulo,35.0,SP,38.0,F,0.0,1.0,...,A,B,A,A,C,A,B,B,A,A
3,170001700000.0,2017.0,4209300.0,Lages,42.0,SC,35.0,F,0.0,1.0,...,B,C,A,B,D,A,B,B,A,A
4,170001700000.0,2017.0,2704302.0,Maceió,27.0,AL,40.0,M,0.0,3.0,...,A,B,B,A,C,A,C,B,A,A


It is noticeable that numeric columns that should be integers are represented as float. This will be addressed when we extract the data to the database.

A further check to see which are all the types for the columns and see if they match with the reference dictionary.

In [9]:
# Check each column dtype

for c in df.columns:
    print(c + ' ' + str(df[c].dtype))

NU_INSCRICAO float64
NU_ANO float64
CO_MUNICIPIO_RESIDENCIA float64
NO_MUNICIPIO_RESIDENCIA object
CO_UF_RESIDENCIA float64
SG_UF_RESIDENCIA object
NU_IDADE float64
TP_SEXO object
TP_ESTADO_CIVIL float64
TP_COR_RACA float64
TP_NACIONALIDADE float64
CO_MUNICIPIO_NASCIMENTO float64
NO_MUNICIPIO_NASCIMENTO object
CO_UF_NASCIMENTO float64
SG_UF_NASCIMENTO object
TP_ST_CONCLUSAO float64
TP_ANO_CONCLUIU float64
TP_ESCOLA float64
TP_ENSINO float64
IN_TREINEIRO float64
CO_ESCOLA float64
CO_MUNICIPIO_ESC float64
NO_MUNICIPIO_ESC object
CO_UF_ESC float64
SG_UF_ESC object
TP_DEPENDENCIA_ADM_ESC float64
TP_LOCALIZACAO_ESC float64
TP_SIT_FUNC_ESC float64
IN_BAIXA_VISAO float64
IN_CEGUEIRA float64
IN_SURDEZ float64
IN_DEFICIENCIA_AUDITIVA float64
IN_SURDO_CEGUEIRA float64
IN_DEFICIENCIA_FISICA float64
IN_DEFICIENCIA_MENTAL float64
IN_DEFICIT_ATENCAO float64
IN_DISLEXIA float64
IN_DISCALCULIA float64
IN_AUTISMO float64
IN_VISAO_MONOCULAR float64
IN_OUTRA_DEF float64
IN_GESTANTE float64
IN_LACTANTE fl

There are some differences between the suggested dtypes by Dask and the dictionary. The dictionary contains some errors:
- It states that the column Q004 is a numeric value, when it receives a string (A to F);
- CO_UF_PROVA is described as being a string, but it receives only integers (two digits). While the necessity of calculating on these values is unlikely, for cohesion with other similar cases in the same dataset we will keep the numeric type.

In [16]:
# Next we check if there are any null values on the column 'NU_INSCRICAO'

df['NU_INSCRICAO'].isnull().sum().compute()

0

There are no rows with a null value on the column 'NU_INSCRICAO', meaning that all the rows have a value for this field. Next we will check if there are any duplicate values on this column.

In [20]:
# Check if there is a disparity between total number of records and unique values

(df['NU_INSCRICAO'].count() - df['NU_INSCRICAO'].nunique()).compute()

0

Now we know that number of unique values is the same as the total number of rows for this column, assuring that we have unique and not-null values for all registration numbers in the dataset.

We will proceed with data processing using SQL, acessing directly the files from S3. That is a better option because SageMaker instance has a quite small storage space, and we use it only to execute Python code.

However, the queries are going to be ran through this notebook later on.

In [23]:
# Clear memory to improve notebook performance as we won't use these variables any longer

del df

#### Cities table

In order to create the cities table we will have to read two files and merge them into a single dataset: cities_data.csv and BR_Localidades_2010_v1.shp (the latter actually uses also other files to be complete).

Because these files are relatively small, I will use Pandas to import the csv, and then use GeoPandas to import and convert the shape file into a GeoJson format, which will allow the usage within the context of a dataframe.

In [None]:
import pandas as pd

df_cities = pd.read_csv('s3://sagemaker-studio-g94r3spx4fg/cities_data.csv')

import geopandas as gpd

cities_shp = gpd.read_file('s3://sagemaker-studio-g94r3spx4fg/BR_Localidades_2010_v1.shp')

In [47]:
df_cities.head()

Unnamed: 0,Código,Espacialidades,Índice de Gini 2010,População total 2010,IDHM 2010,IDHM Renda 2010,IDHM Longevidade 2010,IDHM Educação 2010
0,76,Brasil,0.6,190755799,0.727,0.739,0.816,0.637
1,5200050,Abadia de Goiás,0.42,6876,0.708,0.687,0.83,0.622
2,3100104,Abadia dos Dourados,0.47,6704,0.689,0.693,0.839,0.563
3,5200100,Abadiânia,0.43,15757,0.689,0.671,0.841,0.579
4,3100203,Abaeté,0.54,22690,0.698,0.72,0.848,0.556


In [48]:
cities_shp.head()

Unnamed: 0,ID,CD_GEOCODI,TIPO,CD_GEOCODB,NM_BAIRRO,CD_GEOCODS,NM_SUBDIST,CD_GEOCODD,NM_DISTRIT,CD_GEOCODM,...,NM_UF,CD_NIVEL,CD_CATEGOR,NM_CATEGOR,NM_LOCALID,LONG,LAT,ALT,GMRotation,geometry
0,1,110001505000001,URBANO,110001505006.0,Redondo,11000150500,,110001505,ALTA FLORESTA D'OESTE,1100015,...,RONDÔNIA,1,5,CIDADE,ALTA FLORESTA D'OESTE,-61.999824,-11.93554,337.735719,0.0,POINT (-61.99982 -11.93554)
1,2,110001515000001,URBANO,,,11000151500,,110001515,FILADÉLFIA D'OESTE,1100015,...,RONDÔNIA,2,15,VILA,FILADÉLFIA D'OESTE,-62.043898,-12.437239,215.244429,0.0,POINT (-62.04390 -12.43724)
2,3,110001520000001,URBANO,,,11000152000,,110001520,IZIDOLÂNDIA,1100015,...,RONDÔNIA,2,20,VILA,IZIDOLÂNDIA,-62.175549,-12.601415,181.044807,0.0,POINT (-62.17555 -12.60142)
3,4,110001525000001,URBANO,,,11000152500,,110001525,NOVA GEASE D'OESTE,1100015,...,RONDÔNIA,2,25,VILA,NOVA GEASE D'OESTE,-62.31865,-11.919792,191.576571,0.0,POINT (-62.31865 -11.91979)
4,5,110001530000001,URBANO,,,11000153000,,110001530,ROLIM DE MOURA DO GUAPORÉ,1100015,...,RONDÔNIA,2,30,VILA,ROLIM DE MOURA DO GUAPORÉ,-62.276812,-13.079806,157.285277,0.0,POINT (-62.27681 -13.07981)


In [53]:
# Checking total number of entries for each dataset

print(df_cities.shape)
print(cities_shp.shape)

(5566, 8)
(21886, 23)


As we can see, the datasets have a different number of rows. This happens because the cities_data file contains only the cities and shape file includes also other fragmentations, like vilas, districts, etc. We have to filter the shape file so we keep only the data relative to cities.

In [57]:
# Filter cities_shp to keep only cities

cities_shp = cities_shp[cities_shp['NM_CATEGOR'] == 'CIDADE']

# Delete the first row of df_cities because we don't need the aggregate line for the entire country

df_cities.drop(0, axis=0, inplace=True)

In [59]:
# Confirming equal number of rows for both datasets

print(df_cities.shape)
print(cities_shp.shape)

(5565, 8)
(5565, 23)


In [60]:
# Because we will merge the datasets using 'Código' from df_cities and 'CD_GEOCODM' from cities_shp,
# we have to be sure they have the same dtype

print(df_cities['Código'].dtype)
print(cities_shp['CD_GEOCODM'].dtype)

int64
object


In [65]:
# Modifying dtype for 'CD_GEOCODM'

cities_shp['CD_GEOCODM'] = cities_shp['CD_GEOCODM'].astype('int64')


# Merging tables

df_cities_merge = df_cities.merge(cities_shp, left_on='Código', right_on='CD_GEOCODM')

In [67]:
df_cities_merge.head()

Unnamed: 0,Código,Espacialidades,Índice de Gini 2010,População total 2010,IDHM 2010,IDHM Renda 2010,IDHM Longevidade 2010,IDHM Educação 2010,ID,CD_GEOCODI,...,NM_UF,CD_NIVEL,CD_CATEGOR,NM_CATEGOR,NM_LOCALID,LONG,LAT,ALT,GMRotation,geometry
0,5200050,Abadia de Goiás,0.42,6876,0.708,0.687,0.83,0.622,21355,520005005000001,...,GOIÁS,1,5,CIDADE,ABADIA DE GOIÁS,-49.440548,-16.758812,893.601466,0.0,POINT (-49.44055 -16.75881)
1,3100104,Abadia dos Dourados,0.47,6704,0.689,0.693,0.839,0.563,11988,310010405000001,...,MINAS GERAIS,1,5,CIDADE,ABADIA DOS DOURADOS,-47.396832,-18.487565,753.124911,0.0,POINT (-47.39683 -18.48756)
2,5200100,Abadiânia,0.43,15757,0.689,0.671,0.841,0.579,21357,520010005000001,...,GOIÁS,1,5,CIDADE,ABADIÂNIA,-48.718812,-16.182672,1017.550634,0.0,POINT (-48.71881 -16.18267)
3,3100203,Abaeté,0.54,22690,0.698,0.72,0.848,0.556,11989,310020305000001,...,MINAS GERAIS,1,5,CIDADE,ABAETÉ,-45.446191,-19.155848,644.739909,0.0,POINT (-45.44619 -19.15585)
4,1500107,Abaetetuba,0.53,141100,0.628,0.579,0.798,0.537,1051,150010705000001,...,PARÁ,1,5,CIDADE,ABAETETUBA,-48.884404,-1.72347,10.120343,0.0,POINT (-48.88440 -1.72347)


In [68]:
# Check if number of rows is not altered

df_cities_merge.shape

(5565, 31)

In [69]:
# Export dataframe to a csv file

df_cities_merge.to_csv('s3://sagemaker-studio-g94r3spx4fg/df_cities.csv')

#### Loading data into the database

The final step of this phase is to load the datasets into the Redshift database tables that were created before. To do so we will use the package psycopg2 to be able to connect to the Redshift Service, and then we can perform the queries normally through Python.

In [71]:
# Import library

import psycopg2

# Estabilish connection

conn = psycopg2.connect(dbname='dbenem', host='redcluster.cmbnxn61hwrs.us-east-2.redshift.amazonaws.com', 
port='5439', user='eduardo', password='Eduardo123')

# Generate cursor

cur = conn.cursor()

# Every SQL command goes inside a cur.execute method like:
# cur.execute("query")

The list of all queries is in a separate document in order to keep this notebook organized, called querycommands.sql.

In [None]:
# After the queries are ran it is important to close the connection to the database.

cur.close()
conn.close()