In [1]:
#Todos os imports necessários para aplicação.
import os
import sagemaker_pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField,StringType, FloatType
from pyspark import SparkContext, SparkConf
from sagemaker_pyspark import classpath_jars
from pyspark.sql.functions import create_map, struct
from pyspark.sql.functions import exp
from pyspark.sql.functions import regexp_replace
from pyspark.sql import SQLContext
from functools import reduce 
from pyspark.sql import DataFrame

In [2]:
classpath = ":".join(sagemaker_pyspark.classpath_jars())

builder = SparkSession.builder.appName("Dados Enem")
builder.config(
    "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
builder.config("spark.speculation", "false")
builder.config("spark.sql.parquet.compression.codec", "gzip")
builder.config("spark.debug.maxToStringFields", "100")
builder.config("spark.driver.extraClassPath", classpath)
builder.config("spark.driver.memory", "1g")
builder.config("spark.driver.cores", "1")
builder.config("spark.executor-memory", "20g")
builder.config("spark.executor.cores", "4")


builder.master("local[*]")

spark = builder.getOrCreate()
spark

In [3]:
LANDED = 'C:\\BigData\\Fontes\\microdados_enem\\Landed\\'
RAW = 'C:\\BigData\\Fontes\\microdados_enem\\Raw\\'
MODELED = 'C:\\BigData\\Fontes\\microdados_enem\\Modeled\\'
SELF = 'C:\\BigData\\Fontes\\microdados_enem\\Self\\'

In [4]:
ENEM_2018 = 'MICRODADOS_ENEM_2018.csv'
ENEM_2017 = 'MICRODADOS_ENEM_2017.csv'
ENEM_2016 = 'MICRODADOS_ENEM_2016.csv'

In [5]:
sqlContext = SQLContext(spark.sparkContext)

df_2016 = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', sep=';').load(LANDED + ENEM_2016)    
df_2016.printSchema()                                   

root
 |-- NU_INSCRICAO: long (nullable = true)
 |-- NU_ANO: integer (nullable = true)
 |-- CO_MUNICIPIO_RESIDENCIA: integer (nullable = true)
 |-- NO_MUNICIPIO_RESIDENCIA: string (nullable = true)
 |-- CO_UF_RESIDENCIA: integer (nullable = true)
 |-- SG_UF_RESIDENCIA: string (nullable = true)
 |-- NU_IDADE: integer (nullable = true)
 |-- TP_SEXO: string (nullable = true)
 |-- TP_ESTADO_CIVIL: integer (nullable = true)
 |-- TP_COR_RACA: integer (nullable = true)
 |-- TP_NACIONALIDADE: integer (nullable = true)
 |-- CO_MUNICIPIO_NASCIMENTO: integer (nullable = true)
 |-- NO_MUNICIPIO_NASCIMENTO: string (nullable = true)
 |-- CO_UF_NASCIMENTO: integer (nullable = true)
 |-- SG_UF_NASCIMENTO: string (nullable = true)
 |-- TP_ST_CONCLUSAO: integer (nullable = true)
 |-- TP_ANO_CONCLUIU: integer (nullable = true)
 |-- TP_ESCOLA: integer (nullable = true)
 |-- TP_ENSINO: integer (nullable = true)
 |-- IN_TREINEIRO: integer (nullable = true)
 |-- CO_ESCOLA: integer (nullable = true)
 |-- CO_MU

In [6]:
sqlContext = SQLContext(spark.sparkContext)

df_2017 = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', sep=';').load(LANDED + ENEM_2017)    
df_2017.printSchema()                   

root
 |-- NU_INSCRICAO: long (nullable = true)
 |-- NU_ANO: integer (nullable = true)
 |-- CO_MUNICIPIO_RESIDENCIA: integer (nullable = true)
 |-- NO_MUNICIPIO_RESIDENCIA: string (nullable = true)
 |-- CO_UF_RESIDENCIA: integer (nullable = true)
 |-- SG_UF_RESIDENCIA: string (nullable = true)
 |-- NU_IDADE: integer (nullable = true)
 |-- TP_SEXO: string (nullable = true)
 |-- TP_ESTADO_CIVIL: integer (nullable = true)
 |-- TP_COR_RACA: integer (nullable = true)
 |-- TP_NACIONALIDADE: integer (nullable = true)
 |-- CO_MUNICIPIO_NASCIMENTO: integer (nullable = true)
 |-- NO_MUNICIPIO_NASCIMENTO: string (nullable = true)
 |-- CO_UF_NASCIMENTO: integer (nullable = true)
 |-- SG_UF_NASCIMENTO: string (nullable = true)
 |-- TP_ST_CONCLUSAO: integer (nullable = true)
 |-- TP_ANO_CONCLUIU: integer (nullable = true)
 |-- TP_ESCOLA: integer (nullable = true)
 |-- TP_ENSINO: integer (nullable = true)
 |-- IN_TREINEIRO: integer (nullable = true)
 |-- CO_ESCOLA: integer (nullable = true)
 |-- CO_MU

In [7]:
sqlContext = SQLContext(spark.sparkContext)

df_2018 = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', sep=';').load(LANDED + ENEM_2018)    
df_2018.printSchema()                   

root
 |-- NU_INSCRICAO: long (nullable = true)
 |-- NU_ANO: integer (nullable = true)
 |-- CO_MUNICIPIO_RESIDENCIA: integer (nullable = true)
 |-- NO_MUNICIPIO_RESIDENCIA: string (nullable = true)
 |-- CO_UF_RESIDENCIA: integer (nullable = true)
 |-- SG_UF_RESIDENCIA: string (nullable = true)
 |-- NU_IDADE: integer (nullable = true)
 |-- TP_SEXO: string (nullable = true)
 |-- TP_ESTADO_CIVIL: integer (nullable = true)
 |-- TP_COR_RACA: integer (nullable = true)
 |-- TP_NACIONALIDADE: integer (nullable = true)
 |-- CO_MUNICIPIO_NASCIMENTO: integer (nullable = true)
 |-- NO_MUNICIPIO_NASCIMENTO: string (nullable = true)
 |-- CO_UF_NASCIMENTO: integer (nullable = true)
 |-- SG_UF_NASCIMENTO: string (nullable = true)
 |-- TP_ST_CONCLUSAO: integer (nullable = true)
 |-- TP_ANO_CONCLUIU: integer (nullable = true)
 |-- TP_ESCOLA: integer (nullable = true)
 |-- TP_ENSINO: integer (nullable = true)
 |-- IN_TREINEIRO: integer (nullable = true)
 |-- CO_ESCOLA: integer (nullable = true)
 |-- CO_MU

In [9]:
df_2016_reduzido = df_2016.selectExpr("NU_ANO", "NU_NOTA_CN", "NU_NOTA_CH", "NU_NOTA_LC", "NU_NOTA_MT", "NU_NOTA_REDACAO", "Q001", "Q002", "Q005", "Q006 as RENDA_FAMILIAR")
df_2016_reduzido.write.mode("overwrite").parquet(RAW + 'MICRODADOS_ENEM_2016_REDUZIDO.parquet')

In [10]:
df_2017_reduzido = df_2017.selectExpr("NU_ANO", "NU_NOTA_CN", "NU_NOTA_CH", "NU_NOTA_LC", "NU_NOTA_MT", "NU_NOTA_REDACAO", "Q001", "Q002", "Q005", "Q006 as RENDA_FAMILIAR")
df_2017_reduzido.write.mode("overwrite").parquet(RAW + 'MICRODADOS_ENEM_2017_REDUZIDO.parquet')

In [11]:
df_2018_reduzido = df_2018.selectExpr("NU_ANO", "NU_NOTA_CN", "NU_NOTA_CH", "NU_NOTA_LC", "NU_NOTA_MT", "NU_NOTA_REDACAO", "Q001", "Q002", "Q005", "Q006 as RENDA_FAMILIAR")
df_2018_reduzido.write.mode("overwrite").parquet(RAW + 'MICRODADOS_ENEM_2018_REDUZIDO.parquet')