In [0]:
%sql
USE CATALOG nttdataeducacao;
USE SCHEMA bronze;

In [0]:
base_path = "/Volumes/nttdataeducacao/bronze/data_bronze"  

# **Tabela de Resultados**

In [0]:
from pyspark.sql.functions import current_timestamp, col

df_resultados = (spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("encoding", "ISO-8859-1")
    .option("delimiter", ";")
    .csv(f"{base_path}/RESULTADOS_2024.csv")
    .select(
        "*",
        col("_metadata.file_path").alias("source_file")
        )
    .withColumn("ingestion_at", current_timestamp())
    )
display(df_resultados.limit(5)) 


In [0]:
df_resultados.write.format("delta").mode("overwrite").saveAsTable("nttdataeducacao.bronze.resultados_2024")

# **Tabela de **Escolas****

In [0]:
df = spark.read.csv(
    f"{base_path}/HAD_ESCOLAS_2024.csv",
    sep=",",
    header=False,
    inferSchema=False,
    quote="\"",
    escape="\""
)

In [0]:
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql.window import Window

w = Window.orderBy(monotonically_increasing_id())

df_num = df.withColumn("rn", row_number().over(w))

df_clean = df_num.filter("rn > 8").drop("rn")

header_row = df_clean.limit(1).collect()[0]
header = [str(x) for x in header_row]

df_body = df_clean.subtract(df_clean.limit(1))

for i, name in enumerate(header):
    df_body = df_body.withColumnRenamed(f"_c{i}", name)

display(df_body)

In [0]:
df_body = df_body.drop("none")

In [0]:
display(df_body)

In [0]:
df_body.write.mode("overwrite").saveAsTable("nttdataeducacao.bronze.escolas_bronz")

# **Tabela de ENADE**

In [0]:
from pyspark.sql.functions import current_timestamp, col

df_enade = (spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("encoding", "ISO-8859-1")
    .option("delimiter", ";")
    .csv(f"{base_path}/conceito_enade_2023(PLANILHA_ENADE).csv")
    .select(
        "*",
        col("_metadata.file_path").alias("source_file")
        )
    .withColumn("ingestion_at", current_timestamp())
    )
display(df_enade.limit(5))

In [0]:
import re
from pyspark.sql.functions import col

def normalizar_nome_coluna(nome_coluna):
   
    nome = nome_coluna.lower()
    
    nome = (
        nome.replace('á', 'a').replace('ã', 'a').replace('à', 'a').replace('â', 'a')
        .replace('é', 'e').replace('ê', 'e')
        .replace('í', 'i')
        .replace('ó', 'o').replace('õ', 'o').replace('ô', 'o')
        .replace('ú', 'u')
        .replace('ç', 'c')
        .replace('º', '').replace('ª', '')
    )
 
    nome = re.sub(r'[ -/()]', '_', nome) 
    
 
    nome = re.sub(r'[*\.\,]', '', nome)
    
  
    nome = re.sub(r'_{2,}', '_', nome).strip('_')
    
    return nome


mapeamento_colunas = {
    col_antiga: normalizar_nome_coluna(col_antiga) 
    for col_antiga in df_enade.columns
}


df_enade_limpo = df_enade
for nome_antigo, nome_novo in mapeamento_colunas.items():
    df_enade_limpo = df_enade_limpo.withColumnRenamed(nome_antigo, nome_novo)


In [0]:
df_enade_limpo.write.format("delta").mode("overwrite").saveAsTable("nttdataeducacao.bronze.enade_2024")

display(df_enade_limpo)