In [0]:
from pyspark.sql.functions import col, rand, expr, window, avg, row_number, count
from pyspark.sql.window import Window

In [0]:
#gerando dados
df_sensores_bruto = spark.range(0,1000000) \
    .withColumn("sensor_id", (rand() * 100 + 1).cast("int")) \
    .withColumn("temperatura", (rand() * 40 + 10).cast("decimal(10,2)")) \
    .withColumn("timestamp", expr("current_timestamp() - cast(id as interval second)")) \
    .withColumn("status", expr("elt(floor(rand() * 3    ) + 1, 'OK', 'ALERTA', 'ERRO')"))

#criando o banco de dados
spark.sql("create database if not exists projeto_iot")

#salvando com particionamento
df_sensores_bruto.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("status") \
    .saveAsTable("projeto_iot.bronze_telemetria")

print("Camada Bronze criada com 1 milhão de registros e particionada por status!")

In [0]:
%sql
select * from projeto_iot.bronze_telemetria
order by id desc
limit 10


In [0]:
%sql
select count(*)
from projeto_iot.bronze_telemetria
where status = "ERRO"
and temperatura > 45

In [0]:
#lendo a camada bronze
df_bronze = spark.table("projeto_iot.bronze_telemetria")

#transformação para silver
#filtrando erros críticos e removendo duplicatas baseadas no ID e Timestamp
df_silver = df_bronze \
    .filter(col("status") == "ERRO") \
    .dropDuplicates(["sensor_id", "timestamp"])

#escrevendo na camada silver
df_silver.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("projeto_iot.silver_telemetria_critica")

print("Camada Silver criada com sucesso!")


In [0]:
count_silver = spark.table("projeto_iot.silver_telemetria_critica").count()
display(count_silver)

In [0]:
#lendo a camada silver
df_silver = spark.table("projeto_iot.silver_telemetria_critica")

#transformação gold
df_gold = df_silver \
    .filter(col("temperatura") > 45) \
    .groupBy("sensor_id") \
    .agg(count("*").alias("total_erros")) \
    .orderBy(desc("total_erros")) \
    .limit(10) \

#escrevendo na camada gold
df_gold.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("projeto_iot.gold_ranking_erros")

display(df_gold)

Databricks visualization. Run in Databricks to view.