## Importação das Bibiliotecas

In [1]:
from pyspark.sql import SparkSession
import pandas as pd
from sqlalchemy import create_engine
import os
from pyspark.sql.functions import col, count, sum
from pyspark.sql.types import StringType, IntegerType, DoubleType, StructType, StructField, LongType
import pymssql
#import pyodbc


## Criando uma SparkSession

In [2]:
#Usando a Imagem Docker
#spark = SparkSession.builder.remote("sc://spark-connect").getOrCreate()

In [3]:
# Crie um SparkSession
spark = SparkSession.builder.appName("LerDadosEnem").getOrCreate()

## Processo de Leitura dos arquivos CSV

In [4]:
#Path dos arquivos usando Imagem do docker

#caminho_arquivo_2022 = '/opt/datasets/ENEM/2022/microdados_enem_2022/DADOS/MICRODADOS_ENEM_2022.csv'
#caminho_arquivo_2021 = '/opt/datasets/ENEM/2021/microdados_enem_2021/DADOS/MICRODADOS_ENEM_2021.csv'
#caminho_arquivo_2020 = '/opt/datasets/ENEM/2020/microdados_enem_2020/DADOS/MICRODADOS_ENEM_2020.csv'
#caminho_arquivo_2019 = '/opt/datasets/ENEM/2019/microdados_enem_2019/DADOS/MICRODADOS_ENEM_2019.csv'

In [5]:
#Path dos arquivos

caminho_arquivo_2022 = r'E:\Estudos\SQL\Datasets\ENEM\2022\microdados_enem_2022\DADOS\MICRODADOS_ENEM_2022.csv'
caminho_arquivo_2021 = r'E:\Estudos\SQL\Datasets\ENEM\2021\microdados_enem_2021\DADOS\MICRODADOS_ENEM_2021.csv'
caminho_arquivo_2020 = r'E:\Estudos\SQL\Datasets\ENEM\2020\microdados_enem_2020\DADOS\MICRODADOS_ENEM_2020.csv'
caminho_arquivo_2019 = r'E:\Estudos\SQL\Datasets\ENEM\2019\microdados_enem_2019\DADOS\MICRODADOS_ENEM_2019.csv'

In [6]:
#Função Paa ler os arquivos CSV
def lerCSV(spark, caminho_arquivo):
    df = spark.read.csv(caminho_arquivo, sep=';', header=True, inferSchema=True, encoding='UTF-8')
    return df


## Criação dos Dataframes e Unificaçao

In [7]:
##Chama função quer ler o arquivo CSV e Transforma para DF.SPARK

dfEscola22 = lerCSV(spark, caminho_arquivo_2022)
dfEscola21 = lerCSV(spark, caminho_arquivo_2021)
dfEscola20 = lerCSV(spark, caminho_arquivo_2020)
dfEscola19 = lerCSV(spark, caminho_arquivo_2019)


In [8]:
#Unir os DF em um só 

dfDadosGerais = dfEscola22.union(dfEscola21).union(dfEscola20).union(dfEscola19)
#dfDadosGerais = dfEscola22


In [9]:
dfPreencher = dfDadosGerais.fillna(-2)  # Preenche com -2(sem ref), por exemplo
#dfPreencher.cache()

## Criação das Dimes

### Dime Municipios

In [10]:
dfDimeMunicipio = dfPreencher.select(
    col("CO_MUNICIPIO_ESC").alias("MUNICIPIO_ID"),
    col("NO_MUNICIPIO_ESC").alias("MUNICIPIO_DESCRICAO"),
    col("CO_UF_ESC").alias("UF_ID")
).distinct()

### Dime UF

In [11]:
dfUF = dfPreencher.select(
    col("CO_UF_ESC").alias("UF_ID"),
    col("SG_UF_ESC").alias("UF_SIGLA")
).distinct()

dfUF.createOrReplaceTempView("TEMP_UF")

### Dicionário de Siglas UF dos Estados Brasileiros e sua descrição

In [12]:
dfDimeUF = spark.sql("""
        SELECT T.*,A.UF_DESCRICAO FROM(
        SELECT 'RO' AS UF_ID, 'Rondônia' AS UF_DESCRICAO
        UNION ALL
        SELECT 'AC', 'Acre'
        UNION ALL
        SELECT 'AM', 'Amazonas'
        UNION ALL
        SELECT 'RR', 'Roraima'
        UNION ALL
        SELECT 'PA', 'Pará'
        UNION ALL
        SELECT 'AP', 'Amapá'
        UNION ALL
        SELECT 'TO', 'Tocantins'
        UNION ALL
        SELECT 'MA', 'Maranhão'
        UNION ALL
        SELECT 'PI', 'Piauí'
        UNION ALL
        SELECT 'CE', 'Ceará'
        UNION ALL
        SELECT 'RN', 'Rio Grande do Norte'
        UNION ALL
        SELECT 'PB', 'Paraíba'
        UNION ALL
        SELECT 'PE', 'Pernambuco'
        UNION ALL
        SELECT 'AL', 'Alagoas'
        UNION ALL
        SELECT 'SE', 'Sergipe'
        UNION ALL
        SELECT 'BA', 'Bahia'
        UNION ALL
        SELECT 'MG', 'Minas Gerais'
        UNION ALL
        SELECT 'ES', 'Espírito Santo'
        UNION ALL
        SELECT 'RJ', 'Rio de Janeiro'
        UNION ALL
        SELECT 'SP', 'São Paulo'
        UNION ALL
        SELECT 'PR', 'Paraná'
        UNION ALL
        SELECT 'SC', 'Santa Catarina'
        UNION ALL
        SELECT 'RS', 'Rio Grande do Sul'
        UNION ALL
        SELECT 'MS', 'Mato Grosso do Sul'
        UNION ALL
        SELECT 'MT', 'Mato Grosso'
        UNION ALL
        SELECT 'GO', 'Goiás'
        UNION ALL
        SELECT 'DF', 'Distrito Federal'
        UNION ALL
        SELECT '-2', 'Sem Referência') A
        INNER JOIN TEMP_UF T
        ON A.UF_ID = T.UF_SIGLA
""")

In [13]:
dimeManualCorRacao = r'C:\Users\leoba\Documents\Enem\Dataset\Dimes\Dime_Cor_Raca.csv'
dimeManualEscola = r'C:\Users\leoba\Documents\Enem\Dataset\Dimes\Dime_Escola.csv'
dimeManualEstadoCivil = r'C:\Users\leoba\Documents\Enem\Dataset\Dimes\Dime_Estado_Civil.csv'
dimeManualFaixaEtaria = r'C:\Users\leoba\Documents\Enem\Dataset\Dimes\Dime_Faixa_Etaria.csv'
dimeManualFaixaRendaMensal = r'C:\Users\leoba\Documents\Enem\Dataset\Dimes\Dime_Faixa_Renda_Mensal.csv'
dimeManualNacionalidade = r'C:\Users\leoba\Documents\Enem\Dataset\Dimes\Dime_Nacionalidade.csv'
dimeManualSexo = r'C:\Users\leoba\Documents\Enem\Dataset\Dimes\Dime_Sexo.csv'
dimeManualSituacaoEscolaridade = r'C:\Users\leoba\Documents\Enem\Dataset\Dimes\Dime_Situacao_Escolaridade.csv'


In [14]:
##Chama função quer ler o arquivo CSV e Transforma para DF.SPARK
dfDimeCorRaca = lerCSV(spark, dimeManualCorRacao)
dfDimeEscola = lerCSV(spark, dimeManualEscola)
dfDimeEstadoCivil = lerCSV(spark,dimeManualEstadoCivil)
dfDimeFaixaEtaria = lerCSV(spark,dimeManualFaixaEtaria)
dfDimeFaixaRendaMensal = lerCSV(spark,dimeManualFaixaRendaMensal)
dfDimeNacionalidade = lerCSV(spark,dimeManualNacionalidade)
dfDimeSexo = lerCSV(spark,dimeManualSexo)
dfDimeSituacaoEscolaridade = lerCSV(spark,dimeManualSituacaoEscolaridade)


## Criação da Fato Enem

In [15]:
dfPreencherDistinct = dfPreencher.select(
    col("NU_INSCRICAO").alias("NU_INSCRICAO"),
    col("NU_ANO").alias("ANO_EDICAO"),
    col("TP_ANO_CONCLUIU").alias("ANO_CONCLUSAO"),
    col("TP_SEXO").alias("SEXO_ID"),
    col("CO_MUNICIPIO_ESC").alias("MUNICIPIO_ID"),
    col("TP_ESTADO_CIVIL").alias("ESTADO_CIVIL_ID"),
    col("TP_COR_RACA").alias("COR_RACA_ID"),
    col("TP_NACIONALIDADE").alias("NACIONALIDADE_ID"),
    col("TP_ST_CONCLUSAO").alias("SITUACAO_ESCOLARIDADE_ID"),
    col("TP_ESCOLA").alias("ESCOLA_ID"),
    col("Q006").alias("FAIXA_RENDA_MENSAL_ID"),
    col("TP_FAIXA_ETARIA").alias("FAIXA_ETARIA_ID"),
    col("NU_NOTA_CN").alias("NOTA_CIENCIA_DA_NATUREZA"),
    col("NU_NOTA_CH").alias("NOTA_CIENCIA_DA_HUMANA"),
    col("NU_NOTA_LC").alias("NOTA_LINGUAGEM_CODIGO"),
    col("NU_NOTA_MT").alias("NOTA_MATEMATICA")
).distinct()
#dfPreencherDistinct.cache()

In [16]:
dfAgregate = dfPreencherDistinct.select(
    "ANO_EDICAO",
    "ANO_CONCLUSAO",
    "SEXO_ID",
    "MUNICIPIO_ID",
    "ESTADO_CIVIL_ID",
    "COR_RACA_ID",
    "NACIONALIDADE_ID",
    "SITUACAO_ESCOLARIDADE_ID",
    "ESCOLA_ID",
    "FAIXA_RENDA_MENSAL_ID",
    "FAIXA_ETARIA_ID",
    "NU_INSCRICAO",  # Adicionando essa coluna para o count
    "NOTA_CIENCIA_DA_NATUREZA",
    "NOTA_CIENCIA_DA_HUMANA",
    "NOTA_LINGUAGEM_CODIGO",
    "NOTA_MATEMATICA"
)

# Agregação após o agrupamento
dfAgregate = dfAgregate.groupBy(
    "ANO_EDICAO",
    "ANO_CONCLUSAO",
    "SEXO_ID",
    "MUNICIPIO_ID",
    "ESTADO_CIVIL_ID",
    "COR_RACA_ID",
    "NACIONALIDADE_ID",
    "SITUACAO_ESCOLARIDADE_ID",
    "ESCOLA_ID",
    "FAIXA_RENDA_MENSAL_ID",
    "FAIXA_ETARIA_ID"
).agg(
    count("NU_INSCRICAO").alias("QTD"),
    sum("NOTA_CIENCIA_DA_NATUREZA").alias("NOTA_CIENCIA_DA_NATUREZA"),
    sum("NOTA_CIENCIA_DA_HUMANA").alias("NOTA_CIENCIA_DA_HUMANA"),
    sum("NOTA_LINGUAGEM_CODIGO").alias("NOTA_LINGUAGEM_CODIGO"),
    sum("NOTA_MATEMATICA").alias("NOTA_MATEMATICA")
)

dfAgregate.cache()

DataFrame[ANO_EDICAO: int, ANO_CONCLUSAO: int, SEXO_ID: string, MUNICIPIO_ID: int, ESTADO_CIVIL_ID: int, COR_RACA_ID: int, NACIONALIDADE_ID: int, SITUACAO_ESCOLARIDADE_ID: int, ESCOLA_ID: int, FAIXA_RENDA_MENSAL_ID: string, FAIXA_ETARIA_ID: int, QTD: bigint, NOTA_CIENCIA_DA_NATUREZA: double, NOTA_CIENCIA_DA_HUMANA: double, NOTA_LINGUAGEM_CODIGO: double, NOTA_MATEMATICA: double]

In [17]:
dfAgregate = dfAgregate.withColumn("NOTA_CIENCIA_DA_NATUREZA", col("NOTA_CIENCIA_DA_NATUREZA").cast(DoubleType())) \
                       .withColumn("NOTA_CIENCIA_DA_HUMANA", col("NOTA_CIENCIA_DA_HUMANA").cast(DoubleType())) \
                       .withColumn("NOTA_LINGUAGEM_CODIGO", col("NOTA_LINGUAGEM_CODIGO").cast(DoubleType())) \
                       .withColumn("NOTA_MATEMATICA", col("NOTA_MATEMATICA").cast(DoubleType()))

In [18]:
dfAgregate.schema

StructType([StructField('ANO_EDICAO', IntegerType(), True), StructField('ANO_CONCLUSAO', IntegerType(), True), StructField('SEXO_ID', StringType(), True), StructField('MUNICIPIO_ID', IntegerType(), True), StructField('ESTADO_CIVIL_ID', IntegerType(), True), StructField('COR_RACA_ID', IntegerType(), True), StructField('NACIONALIDADE_ID', IntegerType(), True), StructField('SITUACAO_ESCOLARIDADE_ID', IntegerType(), True), StructField('ESCOLA_ID', IntegerType(), True), StructField('FAIXA_RENDA_MENSAL_ID', StringType(), True), StructField('FAIXA_ETARIA_ID', IntegerType(), True), StructField('QTD', LongType(), False), StructField('NOTA_CIENCIA_DA_NATUREZA', DoubleType(), True), StructField('NOTA_CIENCIA_DA_HUMANA', DoubleType(), True), StructField('NOTA_LINGUAGEM_CODIGO', DoubleType(), True), StructField('NOTA_MATEMATICA', DoubleType(), True)])

# Salvar em SQL 
Salvar os dados em uma tabela SQL


## Parametrizando a conexão do Banco SQL SERVER Usando JDBC

In [19]:
server_name = "localhost"
database_name = "DockerEnem"
username = "sa"
password = "bi@123456"

In [21]:
# Conectando ao banco de dados
conn = pymssql.connect(server=server_name, database=database_name, user=username, password=password,port='1434')
# Criando um cursor
cursor = conn.cursor()

NameError: name 'pymssql' is not defined

In [None]:
dicionarioDataFrames = {
    "FATO_ENEM": dfAgregate,
    "DIME_SEXO": dfDimeSexo,
    "DIME_MUNICIPIO": dfDimeMunicipio,
    "DIME_UF": dfDimeUF,
    "DIME_ESTADO_CIVIL": dfDimeEstadoCivil,
    "DIME_COR_RACA": dfDimeCorRaca,
    "DIME_NACIONALIDADE": dfDimeNacionalidade,
    "DIME_SITUACAO_ESCOLARIDADE": dfDimeSituacaoEscolaridade,
    "DIME_ESCOLA": dfDimeEscola,
    "DIME_FAIXA_RENDA_MENSAL": dfDimeFaixaRendaMensal,
    "DIME_FAIXA_ETARIA": dfDimeFaixaEtaria
}

In [None]:
autocommit = conn.autocommit
if not autocommit:
    cursor.execute("BEGIN TRANSACTION")
try:
    for nomeTabela,dataFrame in dicionarioDataFrames.items():
        # Dropando a tabela se ela já existir
        drop_table_query = f"""
        IF OBJECT_ID(N'{nomeTabela}', 'U') IS NOT NULL
            DROP TABLE {nomeTabela}
        """
    # print(f"DROP TABLE {nomeTabela}")
        try:
            cursor.execute(drop_table_query)
        except Exception as e:
            print(f"Erro ao realizar o comando DROP TABLE {nomeTabela}: {e}")

    # Comitar após tentar dropar todas as tabelas
    try:
        conn.commit()
        print("Commit realizado com sucesso.")
    except Exception as e:
        print(f"Erro ao realizar o commit: {e}")
        conn.rollback()
        print("Rollback realizado devido a erro no commit.")

except Exception as e:
    print(f"Erro ao realizar as operações: {e}")
    if not autocommit:
        conn.rollback()
        print("Rollback realizado devido a erro nas operações.")

Erro ao realizar o commit: Cannot commit transaction: (3902, b'The COMMIT TRANSACTION request has no corresponding BEGIN TRANSACTION.DB-Lib error message 20018, severity 16:\nGeneral SQL Server error: Check messages from the SQL Server\n')
Rollback realizado devido a erro no commit.


In [None]:
def ColunasImportar(df):
    dfSchema = df.schema
    definicaoColuna = [f"field.name] {field.dataType}" for  field in dfSchema.fields]
    
    #Mapeamento de dados de acordo com os tipos de dados do SQL SERVER
    dataMapping = {
    IntegerType().simpleString(): "INT",
    StringType().simpleString(): "NVARCHAR(255)",  # Você pode ajustar o tamanho conforme necessário
    DoubleType().simpleString(): "FLOAT",
    LongType().simpleString(): "BIGINT"
    }
    # Função para obter o tipo de dados SQL Server correspondente
    def sqlTipo(data_type):
        return dataMapping.get(data_type.simpleString(), "NVARCHAR(255)")  # Default para NVARCHAR(255)

    #Definindo o tipo de dados das colunas de acordo com o utilizado no SQL SERVER
    colunasParaCreate = [f"{field.name} {sqlTipo(field.dataType)}" for field in dfSchema.fields]
    colunasParaInsert = [f"{field.name}" for field in dfSchema.fields]

    #Convertendo para String
    colunasParaCreateStr = ",\n".join(colunasParaCreate)
    colunasParaInsertStr =  ",\n".join(colunasParaInsert)
 
    return colunasParaCreateStr,colunasParaInsertStr
    


In [None]:
#autocommit = conn.autocommit
if not autocommit:
    cursor.execute("BEGIN TRANSACTION")

try:   
    for nomeTabela,dataFrame in dicionarioDataFrames.items():
        colunaCreate, colunaInsert_ = ColunasImportar(dataFrame)
        create_table_query = f"""
        CREATE TABLE {nomeTabela} (
        {colunaCreate}
        )
        """
        #print(create_table_query)
        # Executando o comando SQL para criar a tabela
        try:
            cursor.execute(create_table_query)
            print(f"Tabela {nomeTabela} criada com sucesso.")
        except Exception as e:
            print(f"Erro ao criar a tabela {nomeTabela}: {e}")
    # Comitr após tentar criar todas as tabelas
    #if  autocommit:
    conn.commit()
    print("Commit realizado com sucesso.")

except Exception as e:
    print(f"Erro ao realizar as operações: {e}")
    if not autocommit:
        conn.rollback()
        print("Rollback realizado devido a erro nas operações.")
 

Tabela FATO_ENEM criada com sucesso.
Tabela DIME_SEXO criada com sucesso.
Tabela DIME_MUNICIPIO criada com sucesso.
Tabela DIME_UF criada com sucesso.
Tabela DIME_ESTADO_CIVIL criada com sucesso.
Tabela DIME_COR_RACA criada com sucesso.
Tabela DIME_NACIONALIDADE criada com sucesso.
Tabela DIME_SITUACAO_ESCOLARIDADE criada com sucesso.
Tabela DIME_ESCOLA criada com sucesso.
Tabela DIME_FAIXA_RENDA_MENSAL criada com sucesso.
Tabela DIME_FAIXA_ETARIA criada com sucesso.
Commit realizado com sucesso.


In [None]:
def chunks(data, chunk_size):
    """Divide a data list em pedaços de tamanho chunk_size."""
    for i in range(0, len(data), chunk_size):
        yield data[i:i + chunk_size]

def insert(query, data, chunk=999):
    query = query.lower()
    insert_q, values_q = query.split('values')
    insert_q += 'values'  # Adiciona 'values' para manter a query SQL correta após o split

    try:
        for chunk_data in chunks(data, chunk):
            flat_list = [item for sublist in chunk_data for item in sublist]
            chunk_query = insert_q + ','.join([values_q] * len(chunk_data))
            cursor.execute(chunk_query, tuple(flat_list))
            conn.commit()  # Comitar após cada chunk
    except pymssql.OperationalError as e:
        print(f"OperationalError: {e}")
        conn.rollback()
    except Exception as e:
        print(f"An error occurred: {e}")
        conn.rollback()

In [None]:
autocommit = conn.autocommit
if not autocommit:
    cursor.execute("BEGIN TRANSACTION")
try:
    for nomeTabela,dataFrame in dicionarioDataFrames.items():
        colunaCreate, colunaInsert_ = ColunasImportar(dataFrame)
        #Converter em lista para a função LEN retornar a quantidade de objetos
        colunaInsertList = colunaInsert_.split(',')
        rows = dataFrame.collect()
        #converte a lista rows em uma tupla para utilizar no insert
        dadosInsert = tuple(map(tuple, rows))

        query = f"""
            INSERT INTO {nomeTabela} ({colunaInsert_})
            VALUES ({', '.join(['%s'] * len(colunaInsertList))})
        
        """
        #função que faz o insert no sql em bloco
        insert(query, dadosInsert)
except Exception as e:
    print(f"Erro ao realizar as operações: {e}")
    if not autocommit:
        conn.rollback()
        print("Rollback realizado devido a erro nas operações.")
 

## CRIAR VIEW

In [None]:
viewFatoEnem = ("""
                                       
    CREATE VIEW vwFatoEnem AS(
        SELECT 
        F.ANO_EDICAO,
        F.ANO_CONCLUSAO,
        E.ESCOLA_DESCRICAO,
        EC.ESTADO_CIVIL_DESCRICAO,
        FE.FAIXA_ETARIA_DESCRICAO,
        FRM.FAIXA_RENDA_MENSAL_DESCRICAO,
        M.MUNICIPIO_DESCRICAO,
        U.UF_SIGLA,                
        U.UF_DESCRICAO,
        N.NACIONALIDADE_DESCRICAO,
        S.SEXO_DESCRICAO,
        SE.SITUACAO_ESCOLARIDADE_DESCRICAO,
        QTD AS QTD_INSCRITOS,
        NOTA_CIENCIA_DA_NATUREZA,
        NOTA_CIENCIA_DA_HUMANA,
        NOTA_LINGUAGEM_CODIGO,
        NOTA_MATEMATICA

        FROM FATO_ENEM F
        INNER JOIN DIME_COR_RACA CR
        ON F.COR_RACA_ID = CR.COR_RACA_ID
        INNER JOIN DIME_ESCOLA E
        ON F.ESCOLA_ID= E.ESCOLA_ID
        INNER JOIN DIME_ESTADO_CIVIL EC
        ON F.ESTADO_CIVIL_ID = EC.ESTADO_CIVIL_ID
        INNER JOIN DIME_FAIXA_ETARIA FE
        ON F.FAIXA_ETARIA_ID = FE.FAIXA_ETARIA_ID
        INNER JOIN DIME_FAIXA_RENDA_MENSAL FRM
        ON F.FAIXA_RENDA_MENSAL_ID = FRM.FAIXA_RENDA_MENSAL_ID
        INNER JOIN DIME_MUNICIPIO M 
        ON F.MUNICIPIO_ID = M.MUNICIPIO_ID
        INNER JOIN DIME_UF U
        ON M.UF_ID = U.UF_ID
        INNER JOIN DIME_NACIONALIDADE N
        ON F.NACIONALIDADE_ID = N.NACIONALIDADE_ID
        INNER JOIN DIME_SEXO S
        ON F.SEXO_ID = S.SEXO_ID
        INNER JOIN DIME_SITUACAO_ESCOLARIDADE SE
        ON F.SITUACAO_ESCOLARIDADE_ID = SE.SITUACAO_ESCOLARIDADE_ID
        )
""")

In [None]:
try:
     cursor.execute("""DROP VIEW vwFatoEnem
            """)
     try: 
         cursor.execute(viewFatoEnem)

     except Exception as e:
          print(f"Erro ao realizar as operações: {e}")

except Exception as e:
    print(f"Erro ao realizar as operações: {e}")

In [None]:
# Fechando cursor e conexão
cursor.close()
conn.close()