## Test Bronze -> Silver Pipeline

In [None]:
from rt_databricks.utils.gcs_paths import bronze_path
from pyspark.sql.functions import current_timestamp

bronze_uri = bronze_path("school_climate_raw")
print("Bronze URI:", bronze_uri)

kafka_batch_df = (
    spark.read.format("kafka")
    .option("kafka.bootstrap.servers", "boot-strap-server.kafka.svc.cluster.local:9094")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option(
        "kafka.sasl.jaas.config",
        "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required "
        'username="username" '
        'password="password";',
    )
    .option("subscribe", "school_climate_stream")
    .option("startingOffsets", "earliest")
    .load()
)

kafka_batch_df.printSchema()

bronze_df = kafka_batch_df.withColumn("ingest_ts", current_timestamp())

(
    bronze_df.write.format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")  # important because we had id/msg before
    .save(bronze_uri)
)

print("âœ… Bronze Delta reseeded with correct Kafka schema.")

spark.read.format("delta").load(bronze_uri).printSchema()

In [None]:
spark.read.format("delta").load(bronze_uri).printSchema()

In [None]:
import importlib
import rt_databricks.silver.silver_school_climate as ssc

importlib.reload(ssc)

ssc.run_silver_pipeline(
    spark,
    bronze_dataset="school_climate_raw",
    silver_dataset="school_climate_clean",
    snowflake_table="SCHOOL_CLIMATE_CLEAN",
    write_mode="overwrite",
)