In [0]:
import dlt
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

### Kafka

In [0]:
kafka_bootstrap_servers = "localhost:9092" #kafka no docker local
kafka_topic = "test"

In [0]:
schema = StructType([
  StructField("id", StringType(), True),
  StructField("name", StringType(), True),
  StructField("value", IntegerType(), True)
])

### DLT Tables

In [0]:
@dlt.table(
    name="bronze_table",
    comment="Bronze table",
    table_properties={"quality": "bronze"}
)
def bronze_table():#Bronze
  return(
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
    .option("suscribe", kafka_topic)
    .option("startingOffsets", "earliest") #Lê todos os dados desde o inicio
    .load()
    .selectExpr("CAST(value AS STRING) as json_data")
  )

@dlt.table(
  name="silver_table",
  comment="Silver table",
  table_properties={"quality": "silver"}
)
def silver_table():# Camada Silver: Transformação dos dados brutos para um formato estruturado
  return (
    dlt.read_stream("bronze_table")
      .select(from_json(col("json_data"), schema).alias("data"))
      .select("data.*")  # Desaninha a coluna 'data'
  )

@dlt.table(
  name="gold_table",
  comment="Gold table",
  table_properties={"quality": "gold"}
)
def gold_table():# Camada Gold: Agregação dos dados para análises
  return(
    dlt.read_stream("silver_table")
    .groupBy("name")
    .agg({"value": "sum"})
  )