In [None]:
# homelab/dev/bronze/sensors/topic=sensors.temperature/year=2025/month=8/day=26
s3_endpoint = "http://192.168.1.196:32000"
env = "dev"
bucket = "homelab"

input_bronze_prefix = "bronze/sensors/topic=sensors.temperature"
output_silver_prefix = "silver/sensors"

topic = "sensors.temperature"

s3_topic_path = f"s3a://{bucket}/{env}/{input_bronze_prefix}"
checkpoint_dir = f"s3a://{bucket}/{env}/{output_silver_prefix}/_checkpoints/sensors/temperature"
warehouse =  f"s3a://{bucket}/{env}/{output_silver_prefix}"

logLevel = "INFO"

In [None]:
print(s3_topic_path)
print(checkpoint_dir)
print(warehouse)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_timestamp, to_date
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    DoubleType,
    TimestampType,
    LongType,
    IntegerType,
)
import time

In [None]:
spark = (
    SparkSession.builder.appName("BronzeExplorationS3")
    .master("local[*]")
    
    # -----------------------------
    # S3 configuration
    # -----------------------------
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config(
        "spark.hadoop.fs.s3a.aws.credentials.provider",
        "software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider",
    )
    .config("fs.s3a.endpoint", s3_endpoint)
    .config("spark.hadoop.fs.s3a.region", "us-east-1")
    .config("spark.hadoop.fs.s3a.access.key", "vYWL9V9xFeIIll7mTmUX")
    .config("spark.hadoop.fs.s3a.secret.key", "rAni4j7zN1aKgoWezNBQfFFfIXLZBgpis6AB7CUV")
    .config("fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")  # For HTTP endpoints
    .config("spark.sql.adaptive.enabled", "false")
    # -----------------------------
    # Required jars
    # -----------------------------
    .config(
        "spark.jars.packages",
        "org.apache.hadoop:hadoop-aws:3.4.1,"
        "org.postgresql:postgresql:42.7.7"
)
    .config(
        "spark.jars",
        "/home/hchaibi/projects/homelab/smartstar/note-books/"
        "iceberg-spark-runtime-4.0_2.13-1.10.0-20250822.002003-111.jar",
    )
    
    # -----------------------------
    # Iceberg JDBC catalog
    # -----------------------------
    .config("spark.sql.catalog.sensors", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.sensors.type", "jdbc")
    .config("spark.sql.catalog.sensors.uri", "jdbc:postgresql://localhost:5432/iceberg")
    .config("spark.sql.catalog.sensors.jdbc.user", "iceberg")
    .config("spark.sql.catalog.sensors.jdbc.password", "iceberg")
    .config("spark.sql.catalog.sensors.warehouse", warehouse)
    
    # -----------------------------
    # Iceberg Spark extensions
    # -----------------------------
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.defaultCatalog", "sensors")
    
    .getOrCreate()
)

spark.sparkContext.setLogLevel(logLevel)


In [None]:
raw_schema = StructType(
    [
        StructField("event_time", StringType(), True),
        StructField("ingestion_ts", StringType(), True),
        StructField("kafka_timestamp", StringType(), True),
        StructField("offset", LongType(), True),
        StructField("partition", LongType(), True),
        StructField("timestamp", StringType(), True),
        StructField("value", StringType(), True),
    ]
)

payload_schema = StructType(
    [
        StructField("device_id", StringType(), True),
        StructField("timestamp", StringType(), True),  # cast later to TimestampType
        StructField("sensor_type", StringType(), True),
        StructField(
            "location",
            StructType(
                [
                    StructField("latitude", DoubleType(), True),
                    StructField("longitude", DoubleType(), True),
                    StructField("city", StringType(), True),
                ]
            ),
        ),
        StructField("temperature", DoubleType(), True),
        StructField("humidity", DoubleType(), True),
        StructField("unit", StringType(), True),
    ]
)


In [None]:
# Ensure Iceberg table exists (create if missing)
spark.sql("""
DROP TABLE IF EXISTS sensors.temperature
""")
spark.sql("""
CREATE TABLE IF NOT EXISTS sensors.temperature (
  device_id STRING,
  device_ts TIMESTAMP,
  sensor_type STRING,
  latitude DOUBLE,
  longitude DOUBLE,
  city STRING,
  temperature DOUBLE,
  humidity DOUBLE,
  unit STRING,
  event_time TIMESTAMP,
  year INT,
  month INT,
  day INT
)
USING iceberg
PARTITIONED BY (year, month, day)
""")

print("✅ Iceberg catalog and table bootstrap complete")

In [None]:
# Stream JSON from the s3 topic path
df_raw = (
    spark.readStream
    .format("json")
    .schema(raw_schema)
    .option("maxFilesPerTrigger", 10)  # Reduced for easier debugging
    .option("pathGlobFilter", "*.json")
    .option("columnNameOfCorruptRecord", "_corrupt_record")
    .option("mode", "PERMISSIVE")  # This allows malformed records
    .load(s3_topic_path)
)

df_decoded = (
    df_raw.withColumn("data", from_json(col("value"), payload_schema))
          .select(                      # flatten the structure
              "partition",
              "offset",
              "timestamp",
              "event_time",
              col("data.device_id"),
              col("data.timestamp").alias("device_ts"),
              col("data.sensor_type"),
              col("data.location.latitude"),
              col("data.location.longitude"),
              col("data.location.city"),
              col("data.temperature"),
              col("data.humidity"),
              col("data.unit"),
              col("year"),
              col("month"),
              col("day")
          )
)

In [None]:
df_decoded.printSchema()

In [None]:
query = (
    df_decoded
    .select(
        "device_id",
        "device_ts",
        "sensor_type",
        "latitude",
        "longitude",
        "city",
        "temperature",
        "humidity",
        "unit",
        "event_time",
        "year",
        "month",
        "day"
    )
    .withColumn(
        "device_ts", to_timestamp(col("device_ts"))
    )
        .withColumn(
        "event_time", to_timestamp(col("event_time"))
    )
    .writeStream.format("iceberg")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_dir)
    .toTable("sensors.temperature")   # ✅ correct for Iceberg
)
query.awaitTermination()

In [None]:
# --- Write the decoded stream to console ---
query = (
    df_decoded.writeStream
              .format("console")
              .outputMode("append")        # "append" is usually what you want for streams
              .option("truncate", False)
              .start()
)
query.awaitTermination()



#### Get distinct topics from the raw dataframe
```python
topics = [row.topic for row in df_raw.select("topic").distinct().collect()]


schemas = {}
for t in topics:
    sample_path = f"s3a://homelab/dev/bronze/sensors/topic={t}/"
    inferred = spark.read.json(sample_path).schema
    schemas[t] = inferred


```python
for topic_name, schema in schemas.items():
    df_topic = (
        df_raw.filter(col("topic") == topic_name)
           .withColumn("data", from_json(col("value"), schema))
           .select("topic", "year", "month", "day", "data.*")
    )

    # cleansing: e.g. drop nulls
    df_topic_clean = df_topic.na.drop(subset=["device_id", "timestamp"])

    # enrichment: e.g. add partition_date column
    df_topic_enriched = df_topic_clean.withColumn(
        "partition_date", to_date(col("timestamp"))
    )

    # write to Silver layer in parquet/Delta
    (
        df_topic_enriched.writeStream
            .format("parquet")
            .option("numRows", 20)
            .option("path", f"s3a://homelab/dev/silver/{topic_name}/")
            .option("checkpointLocation", f"s3a://homelab/checkpoints/{topic_name}/")
            .partitionBy("year", "month", "day")
            .outputMode("append")
            .trigger(processingTime='10 seconds')
            .start()
            .awaitTermination()
    )


```python
sample = (
    spark.read
         .format("json")
         .option("pathGlobFilter", "*.json")
         .load("s3a://homelab/dev/bronze/sensors/")
)

sample.printSchema()
schema = sample.schema

In [None]:
# Use Nessie as Iceberg catalog
spark.sql.catalog.nessie=org.apache.iceberg.nessie.NessieCatalog
spark.sql.catalog.nessie.uri=http://<nessie-server>:19120/api/v1
spark.sql.catalog.nessie.ref=main
spark.sql.catalog.nessie.auth.type=NONE       # or BEARER if auth enabled
spark.sql.catalog.nessie.warehouse=s3a://homelab/dev/silver/


```python
# Start a local Spark session for exploration
spark = (
    SparkSession.builder.appName("BronzeExplorationS3")
    .master("local[*]")
    # needed for s3a:// access
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config(
        "spark.hadoop.fs.s3a.aws.credentials.provider",
        "software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider",
    )
    .config("fs.s3a.endpoint", s3_endpoint)
    .config("spark.hadoop.fs.s3a.region", "us-east-1")
    .config("spark.hadoop.fs.s3a.access.key", "vYWL9V9xFeIIll7mTmUX")
    .config(
        "spark.hadoop.fs.s3a.secret.key", "rAni4j7zN1aKgoWezNBQfFFfIXLZBgpis6AB7CUV"
    )
    .config("fs.s3a.path.style.access", "true")
    # Required packages for S3 and Iceberg
    .config(
        "spark.jars",
        "/home/hchaibi/projects/homelab/smartstar/note-books/iceberg-spark-runtime-4.0_2.13-1.10.0-20250822.002003-111.jar",
    )
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.4.1")
    # Iceberg catalog config (example for S3)
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.iceberg.spark.SparkSessionCatalog",
    )
    .getOrCreate()
)
spark.sparkContext.setLogLevel(logLevel)