In [0]:
spark.sql("USE CATALOG workspace")
spark.sql("USE SCHEMA default")

display(spark.sql("SELECT current_catalog(), current_schema()"))

In [0]:
# Silver principal
df_ops_silver   = spark.table("silver_operational_readouts")
df_specs_silver = spark.table("silver_specifications")
df_labels_silver = spark.table("silver_labels")
df_tte_silver   = spark.table("silver_train_tte")




In [0]:
df_ops_silver.printSchema()
df_specs_silver.printSchema()
df_labels_silver.printSchema()
df_tte_silver.printSchema()

In [0]:
from pyspark.sql.functions import col

# Base da dimensão: especificações unificadas
cols_specs = [c for c in df_specs_silver.columns if c != "dataset_split"]

df_dim_veiculo_base = (
    df_specs_silver
    .select(*cols_specs)
    .dropDuplicates(["vehicle_id"])
)

#  Juntar info de TTE para veículos do train
# train_tte.csv tem: vehicle_id, length_of_study_time_step, in_study_repair :contentReference[oaicite:0]{index=0}
df_dim_veiculo = (
    df_dim_veiculo_base.alias("spec")
    .join(
        df_tte_silver.alias("tte"),
        on="vehicle_id",
        how="left"  
    )
)

df_dim_veiculo.printSchema()
display(df_dim_veiculo.limit(10))

In [0]:
df_dim_veiculo.write.mode("overwrite").format("delta").saveAsTable("gold_dim_veiculo")




In [0]:
from pyspark.sql.functions import when

df_dim_tempo = (
    df_ops_silver
    .select("time_step")
    .dropDuplicates()
    .withColumn("time_step", col("time_step").cast("int"))
)

# Criar buckets de exemplo 
df_dim_tempo = df_dim_tempo.withColumn(
    "faixa_time_step",
    when(col("time_step") < 24, "0-24")
    .when(col("time_step") < 48, "24-48")
    .when(col("time_step") < 72, "48-72")
    .otherwise(">72")
)

df_dim_tempo.printSchema()
display(df_dim_tempo.orderBy("time_step").limit(20))


In [0]:
df_dim_tempo.write.mode("overwrite").format("delta").saveAsTable("gold_dim_tempo")



In [0]:
from pyspark.sql.functions import lit

# Puxar classes distintas da silver_labels
df_classes = (
    df_labels_silver
    .select("class_label")
    .dropDuplicates()
    .orderBy("class_label")
)

# Mapear descrições e janelas (em time_step)
df_classes = (
    df_classes
    .withColumn("descricao",
        when(col("class_label") == 0, lit("> 48 time_steps antes da falha"))
        .when(col("class_label") == 1, lit("entre 48 e 24 time_steps antes da falha"))
        .when(col("class_label") == 2, lit("entre 24 e 12 time_steps antes da falha"))
        .when(col("class_label") == 3, lit("entre 12 e 6 time_steps antes da falha"))
        .when(col("class_label") == 4, lit("entre 6 e 0 time_steps antes da falha"))
    )
    .withColumn("janela_inicio",
        when(col("class_label") == 0, lit(49))
        .when(col("class_label") == 1, lit(24))
        .when(col("class_label") == 2, lit(12))
        .when(col("class_label") == 3, lit(6))
        .when(col("class_label") == 4, lit(0))
    )
    .withColumn("janela_fim",
        when(col("class_label") == 0, lit(9999)) 
        .when(col("class_label") == 1, lit(48))
        .when(col("class_label") == 2, lit(24))
        .when(col("class_label") == 3, lit(12))
        .when(col("class_label") == 4, lit(6))
    )
)

df_classes.printSchema()
display(df_classes)


In [0]:
df_classes.write.mode("overwrite").format("delta").saveAsTable("gold_dim_classe_proximidade")



In [0]:
# Juntar as labels por vehicle_id + dataset_split (apenas val/test tem classe)
join_cols = ["vehicle_id", "dataset_split"]

df_fato = (
    df_ops_silver.alias("ops")
    .join(
        df_labels_silver.alias("lab"),
        on=join_cols,
        how="left"
    )
)

df_fato.printSchema()
display(df_fato.limit(10))


In [0]:
# Colunas-chave
key_cols = ["vehicle_id", "time_step", "dataset_split", "class_label"]

# Features operacionais = todas as colunas de df_ops_silver tirando vehicle_id, time_step, dataset_split
feature_cols = [
    c for c in df_ops_silver.columns
    if c not in ["vehicle_id", "time_step", "dataset_split"]
]

# Garantir que existam
print("Qtd de features operacionais:", len(feature_cols))

df_fato_snapshot = df_fato.select(
    *key_cols,
    *[col(c) for c in feature_cols]
)

df_fato_snapshot.printSchema()
display(df_fato_snapshot.limit(10))

In [0]:
df_fato_snapshot.write.mode("overwrite").format("delta").saveAsTable("gold_fato_snapshot")

Colunas:

vehicle_id

length_of_study_time_step (quanto tempo o veículo foi observado)

in_study_repair (1 se houve reparo, 0 se não)

In [0]:
df_fato_tempo_ate_evento = df_tte_silver.select(
    "vehicle_id",
    "length_of_study_time_step",
    "in_study_repair"
)

df_fato_tempo_ate_evento.printSchema()
display(df_fato_tempo_ate_evento.limit(10))


In [0]:
df_fato_tempo_ate_evento.write.mode("overwrite").format("delta").saveAsTable("gold_fato_tempo_ate_evento")

Conferencia

In [0]:
spark.sql("SHOW TABLES IN workspace.default").show(200)