# Configuração Inicial
Imports Necessários 

In [0]:
import sys
import os
import importlib
import scripts.ingestion
import scripts.pipeline
importlib.reload(scripts.ingestion)
importlib.reload(scripts.pipeline)



sys.path.append('/Workspace/Users/santos.gabriela04@edu.pucrs.br/projeto-educadata/config')

from pyspark.sql.functions import isnull, when, count, col, lit, countDistinct
from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils

from myconfig import STORAGE_ACCOUNT, CONTAINER, FINAL_CONTAINER

from scripts.ingestion import read_csv, connect_blob
from scripts.transform import filter_active, value_transform
from scripts.pipeline import process_df, process_year
from scripts.constants import years, col_dim_esc, col_dim_infra, col_fato


In [0]:

spark = SparkSession.builder.appName("Censo").getOrCreate()
dbutils = DBUtils(spark)

connect_blob(spark, dbutils)

# Funções de Definição

In [0]:

dfs = [process_year(year, spark, dbutils) for year in years]

df_unified = dfs[0]
for df in dfs[1:]:
    df_unified = df_unified.unionByName(df)

df_codes = df_unified.select("CO_ENTIDADE", "NU_ANO_CENSO").dropDuplicates()
select_all_years = df_codes.groupBy("CO_ENTIDADE")\
                    .agg(countDistinct('NU_ANO_CENSO').alias("anos_distintos"))\
                    .filter("anos_distintos = 5")
df_final = df_unified.join(select_all_years, on="CO_ENTIDADE", how="inner")

In [0]:
dimensao_escolas = df_final.select(*col_dim_esc).dropDuplicates()
dimensao_infra = df_final.select(*col_dim_infra).dropDuplicates() 
fato = df_final.select(*col_fato).dropDuplicates() 


# Envio dos Dados para o Blob

In [0]:

output_path = f"wasbs://{FINAL_CONTAINER}@{STORAGE_ACCOUNT}.blob.core.windows.net/censo_por_ano"
output_path_escola = f"wasbs://{FINAL_CONTAINER}@{STORAGE_ACCOUNT}.blob.core.windows.net/dimensions/dimensao_escolas"
output_path_infra = f"wasbs://{FINAL_CONTAINER}@{STORAGE_ACCOUNT}.blob.core.windows.net/dimensions/dimensao_infra"
output_path_fato = f"wasbs://{FINAL_CONTAINER}@{STORAGE_ACCOUNT}.blob.core.windows.net/fact/fato"

df_final.coalesce(1).write.mode("overwrite").parquet(output_path)
dimensao_escolas.coalesce(1).write.mode("overwrite").parquet(output_path_escola)
dimensao_infra.coalesce(1).write.mode("overwrite").parquet(output_path_infra)
fato.coalesce(1).write.mode("overwrite").parquet(output_path_fato)