# Star schema creation

## imports 

In [0]:
from pyspark.sql.functions import col, count, monotonically_increasing_id
from pyspark.sql import functions as F

In [0]:
df_aircraft = spark.read.table("anac_aeronautical_occurrences_in_brazilian_civil_aviation.silver_layer.gov_aircraft_clean")

In [0]:
df_factor = spark.read.table("anac_aeronautical_occurrences_in_brazilian_civil_aviation.silver_layer.gov_significant_factor_clean")

In [0]:
df_occurence = spark.read.table("anac_aeronautical_occurrences_in_brazilian_civil_aviation.silver_layer.gov_occurrence_clean")

In [0]:
df_recommendation = spark.read.table("anac_aeronautical_occurrences_in_brazilian_civil_aviation.silver_layer.gov_recommendation_clean")

In [0]:
df_type = spark.read.table("anac_aeronautical_occurrences_in_brazilian_civil_aviation.silver_layer.gov_type_occurrence_clean")

## aircraft dimensional

In [0]:
#display(df_aircraft.groupBy(table).agg(count("*").alias("qtd")))

In [0]:
df_aircraft_dim = (
    df_aircraft
    .select("codigo_ocorrencia2", "aeronave_matricula", "aeronave_fabricante",
            "aeronave_modelo", "aeronave_tipo_veiculo", "aeronave_motor_tipo", "aeronave_motor_quantidade",
            "aeronave_assentos", "aeronave_ano_fabricacao", "aeronave_voo_origem", 
            "aeronave_voo_destino", "aeronave_fase_operacao", "aeronave_nivel_dano", "aeronave_fatalidades_total")
    .dropDuplicates(["codigo_ocorrencia2"])
)


In [0]:
df_aircraft_dim = df_aircraft_dim.withColumn("id_dim_aeronave", monotonically_increasing_id())

## significant factor dimensional

In [0]:
df_factor_dim = (
    df_factor
    .select("codigo_ocorrencia3", "fator_nome", "fator_aspecto",
            "fator_condicionante", "fator_area")
)

In [0]:
df_factor_dim = (
    df_factor_dim
    .groupBy("codigo_ocorrencia3")
    .agg(
        F.concat_ws(", ", F.collect_list("fator_nome")).alias("fatores_concatenados"),
        F.concat_ws(", ", F.collect_list("fator_aspecto")).alias("aspectos_concatenados"),
        F.concat_ws(", ", F.collect_list("fator_condicionante")).alias("condicionantes_concatenados"),
        F.concat_ws(", ", F.collect_list("fator_area")).alias("areas_concatenadas")
    )
)

In [0]:
df_factor_dim = df_factor_dim.withColumn("id_dim_fator", monotonically_increasing_id())

## type occurrence dimensional

In [0]:
df_type_dim = (
    df_type
    .select("codigo_ocorrencia1", "ocorrencia_tipo", 
            "ocorrencia_tipo_categoria", "taxonomia_tipo_icao")
    .dropDuplicates(["codigo_ocorrencia1"])
)

In [0]:
df_type_dim = df_type_dim.withColumn("id_dim_ocorrencia_tipo", monotonically_increasing_id())

## recommendation dimensional

In [0]:
df_recommendation_dim = (
    df_recommendation
    .select("codigo_ocorrencia4", "recomendacao_conteudo", 
            "recomendacao_status", "recomendacao_destinatario_sigla")
    .dropDuplicates(["codigo_ocorrencia4"])
)

In [0]:
df_recommendation_dim = df_recommendation_dim.withColumn("id_dim_recomendacao", monotonically_increasing_id())

## occurrence fact

In [0]:
df_occurence_fact = (
    df_occurence
    .join(df_aircraft_dim.select("codigo_ocorrencia2", "id_dim_aeronave"), "codigo_ocorrencia2", "left")
    .join(df_factor_dim.select("codigo_ocorrencia3", "id_dim_fator"), "codigo_ocorrencia3", "left")
    .join(df_type_dim.select("codigo_ocorrencia1", "id_dim_ocorrencia_tipo"), "codigo_ocorrencia1", "left")

    .select(
        "codigo_ocorrencia",       
        "id_dim_aeronave", 
        "id_dim_fator",
        "id_dim_ocorrencia_tipo",
        "ocorrencia_classificacao",
        "ocorrencia_cidade",
        "ocorrencia_uf",
        "ocorrencia_pais",
        "ocorrencia_dia",
        "ocorrencia_hora",
        "total_aeronaves_envolvidas"
    )
)

# Saving dims

In [0]:
dataframes = {
    "gov_recommendation_dim": df_recommendation_dim,
    "gov_significant_factor_dim": df_factor_dim,
    "gov_type_occurrence_dim": df_type_dim,
    "gov_occurrence_fact": df_occurence_fact,
    "gov_aircraft_dim": df_aircraft_dim
}

for table_name, dataframe in dataframes.items():
    dataframe.write.format("delta") \
        .mode("overwrite") \
        .saveAsTable(f"anac_aeronautical_occurrences_in_brazilian_civil_aviation.gold_layer.{table_name}")

print("All tables saved in gold layer.")