# Pipeline de Tratamento de Dados – Bronze → Silver → Gold

## Objetivo

Este notebook tem como objetivo consolidar o pipeline de tratamento de dados do MVP, orquestrando as etapas de transformação entre as camadas Bronze, Silver e Gold.

O pipeline foi estruturado de forma manual e reexecutável, uma vez que o escopo deste MVP é educacional. Em um cenário produtivo, este pipeline poderia ser facilmente automatizado utilizando Databricks Jobs ou ferramentas de orquestração como Apache Airflow.

A proposta deste pipeline é demonstrar, de forma estruturada e reexecutável, o fluxo completo de dados desde a ingestão bruta até a modelagem analítica no Data Warehouse, utilizando Delta Lake no Databricks.

Este notebook não substitui os notebooks individuais de cada etapa, mas atua como um **orquestrador lógico**, facilitando a compreensão do fluxo de dados e a reexecução do pipeline.


In [0]:
from pyspark.sql.functions import col, monotonically_increasing_id, when, sum


In [0]:
# Leitura da camada Bronze
df_bronze = spark.table("maintenance_bronze")

print("Bronze carregada com sucesso")
print(f"Total de registros: {df_bronze.count()}")

df_bronze.show(5)


Bronze carregada com sucesso
Total de registros: 10000
+---+----------+----+-----------------+---------------------+--------------------+---------+-------------+---------------+---+---+---+---+---+
|UDI|Product_ID|Type|Air_temperature_K|Process_temperature_K|Rotational_speed_rpm|Torque_Nm|Tool_wear_min|Machine_failure|TWF|HDF|PWF|OSF|RNF|
+---+----------+----+-----------------+---------------------+--------------------+---------+-------------+---------------+---+---+---+---+---+
|  1|    M14860|   M|            298.1|                308.6|                1551|     42.8|            0|              0|  0|  0|  0|  0|  0|
|  2|    L47181|   L|            298.2|                308.7|                1408|     46.3|            3|              0|  0|  0|  0|  0|  0|
|  3|    L47182|   L|            298.1|                308.5|                1498|     49.4|            5|              0|  0|  0|  0|  0|  0|
|  4|    L47183|   L|            298.2|                308.6|                1433|     

In [0]:
df_silver = df_bronze.select(
    col("UDI").cast("long").alias("udi"),
    col("Product_ID").cast("string").alias("product_id"),
    col("Type").cast("string").alias("type"),
    col("Air_temperature_K").cast("double").alias("air_temperature_k"),
    col("Process_temperature_K").cast("double").alias("process_temperature_k"),
    col("Rotational_speed_rpm").cast("long").alias("rotational_speed_rpm"),
    col("Torque_Nm").cast("double").alias("torque_nm"),
    col("Tool_wear_min").cast("long").alias("tool_wear_min"),
    col("Machine_failure").cast("long").alias("machine_failure"),
    col("TWF").cast("long").alias("twf"),
    col("HDF").cast("long").alias("hdf"),
    col("PWF").cast("long").alias("pwf"),
    col("OSF").cast("long").alias("osf"),
    col("RNF").cast("long").alias("rnf")
)


In [0]:
df_silver.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df_silver.columns
]).show()


+---+----------+----+-----------------+---------------------+--------------------+---------+-------------+---------------+---+---+---+---+---+
|udi|product_id|type|air_temperature_k|process_temperature_k|rotational_speed_rpm|torque_nm|tool_wear_min|machine_failure|twf|hdf|pwf|osf|rnf|
+---+----------+----+-----------------+---------------------+--------------------+---------+-------------+---------------+---+---+---+---+---+
|  0|         0|   0|                0|                    0|                   0|        0|            0|              0|  0|  0|  0|  0|  0|
+---+----------+----+-----------------+---------------------+--------------------+---------+-------------+---------------+---+---+---+---+---+



In [0]:
df_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("maintenance_silver")


### Tratamento Preventivo de Valores Nulos

Embora o conjunto de dados utilizado neste MVP não apresente valores nulos, foi incluída no pipeline uma etapa de verificação e tratamento preventivo com o objetivo de tornar o fluxo de dados mais robusto e preparado para cenários futuros.

Em ambientes produtivos, a presença de valores ausentes pode ocorrer devido a falhas de coleta, problemas de sensores ou integrações externas. Dessa forma, o pipeline foi estruturado para identificar possíveis valores nulos e aplicar estratégias adequadas de tratamento, como descarte de registros inválidos ou imputação controlada, conforme a natureza do atributo.

Para o dataset atual, não foi necessária a aplicação de correções, uma vez que a análise de completude confirmou a ausência de valores nulos.



In [0]:
from pyspark.sql.functions import col

null_counts = df_silver.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df_silver.columns
])

null_counts.show()


In [0]:
dim_machine = df_silver.select(
    "product_id",
    "type"
).dropDuplicates() \
 .withColumn("machine_sk", monotonically_increasing_id())

dim_machine.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_machine")


In [0]:
fact_maintenance = df_silver.join(
    dim_machine,
    on=["product_id", "type"],
    how="left"
).select(
    "machine_sk",
    "udi",
    "air_temperature_k",
    "process_temperature_k",
    "rotational_speed_rpm",
    "torque_nm",
    "tool_wear_min",
    "machine_failure",
    "twf",
    "hdf",
    "pwf",
    "osf",
    "rnf"
)

fact_maintenance.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("fact_maintenance")


In [0]:
%sql
SELECT COUNT(*) AS total_fato FROM fact_maintenance;


total_fato
10000


In [0]:
%sql
SELECT COUNT(*) AS total_dimensao FROM dim_machine;


total_dimensao
10000


In [0]:
%sql
SELECT COUNT(*) AS registros_orfaos
FROM fact_maintenance f
LEFT JOIN dim_machine d
  ON f.machine_sk = d.machine_sk
WHERE d.machine_sk IS NULL;


registros_orfaos
0


## Discussão do Pipeline

O pipeline executado confirmou a correta orquestração das etapas entre as camadas Bronze, Silver e Gold. A separação em camadas assegurou rastreabilidade, qualidade e organização dos dados, enquanto o uso de tabelas Delta gerenciadas garantiu persistência e governança no Databricks.

Essa estrutura consolida a arquitetura proposta para o MVP, permitindo reexecução consistente do fluxo e futuras extensões.

