# Modelagem Camada Gold

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS workspace.default.gold_accident

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS workspace.default.gold_dim_tempo;
CREATE VOLUME IF NOT EXISTS workspace.default.gold_dim_localizacao;
CREATE VOLUME IF NOT EXISTS workspace.default.gold_dim_acidente;
CREATE VOLUME IF NOT EXISTS workspace.default.gold_fato_acidente;

In [0]:
silver_path = "/Volumes/workspace/default/silver_accident"
df_silver = spark.read.format("delta").load(silver_path)
print("Silver rows:", df_silver.count())

### Dim_Tempo

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

dim_tempo = (
    df_silver
    .select(
        "data_inversa",
        "ano",
        "mes",
        "dia",
        "dia_semana"
    )
    .dropDuplicates()
    .withColumn("id_tempo", monotonically_increasing_id())
)
display(dim_tempo.limit(10))

In [0]:
dim_tempo_path = "/Volumes/workspace/default/gold_dim_tempo"
(
    dim_tempo
    .write
    .format("delta")
    .mode("overwrite")
    .save(dim_tempo_path)
)

In [0]:
spark.read.format("delta").load(dim_tempo_path).count()

### Dim_Localizacao

In [0]:
dim_localizacao = (
    df_silver
    .select(
        "uf",
        "br",
        "km",
        "municipio",
        "latitude",
        "longitude",
        "regional",
        "delegacia"
    )
    .dropDuplicates()
    .withColumn("id_localizacao", monotonically_increasing_id())
)

display(dim_localizacao.limit(10))

In [0]:
dim_localizacao_path = "/Volumes/workspace/default/gold_dim_localizacao"

(
    dim_localizacao
    .write
    .format("delta")
    .mode("overwrite")
    .save(dim_localizacao_path)
)

### Dim_Acidente

In [0]:
dim_acidente = (
    df_silver
    .select(
        "causa_acidente",
        "tipo_acidente",
        "classificacao_acidente",
        "fase_dia",
        "sentido_via",
        "condicao_metereologica",
        "tipo_pista",
        "tracado_via"
    )
    .dropDuplicates()
    .withColumn("id_caracteristica", monotonically_increasing_id())
)

display(dim_acidente.limit(10))

In [0]:
dim_acidente_path = "/Volumes/workspace/default/gold_dim_acidente"

(
    dim_acidente
    .write
    .format("delta")
    .mode("overwrite")
    .save(dim_acidente_path)
)

### Fato_Acidente (JOIN)

In [0]:
fato_acidente = (
    df_silver
    .join(dim_tempo, ["data_inversa", "ano", "mes", "dia", "dia_semana"], "left")
    .join(dim_localizacao, ["uf", "br", "km", "municipio", "latitude", "longitude", "regional", "delegacia"], "left")
    .join(dim_acidente, [
        "causa_acidente",
        "tipo_acidente",
        "classificacao_acidente",
        "fase_dia",
        "sentido_via",
        "condicao_metereologica",
        "tipo_pista",
        "tracado_via"
    ], "left")
    .select(
        "id_tempo",
        "id_localizacao",
        "id_caracteristica",
        "pessoas",
        "mortos",
        "feridos",
        "feridos_graves",
        "feridos_leves",
        "ilesos",
        "veiculos"
    )
)
display(fato_acidente.limit(10))

In [0]:
fato_acidente_path = "/Volumes/workspace/default/gold_fato_acidente"

(
    fato_acidente
    .write
    .format("delta")
    .mode("overwrite")
    .save(fato_acidente_path)
)


In [0]:
print("Fato rows:", spark.read.format("delta").load(fato_acidente_path).count())