In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable
import requests
import time
from pathlib import Path
import os
import logging

In [0]:
spark = SparkSession.builder.getOrCreate()
catalog_dev = "`land_topografisk-gdb_dev`"
schema_dev = "ai2025"
spark.sql(f"USE CATALOG {catalog_dev}")
spark.sql(f"USE SCHEMA {schema_dev}")

bronze_table = f"{catalog_dev}.{schema_dev}.endepunkt_bronze"
silver_table = f"{catalog_dev}.{schema_dev}.endepunkt_silver"
log_table = f"{catalog_dev}.{schema_dev}.logs_processed_endepunkter"
buffer = 20

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("silver_pipeline")

In [0]:
q = f"""
CREATE TABLE IF NOT EXISTS {log_table} (
    kommune_id STRING,
    processed_time TIMESTAMP,
    num_inserted INT,
    num_updated INT,
    num_deleted INT
) USING DELTA
"""
spark.sql(q)

In [0]:
def log_endepunkt_run(kommune_id: str, inserted: int, updated: int, deleted: int = 0):
    schema = StructType([
        StructField("kommune_id", StringType(), True),
        StructField("inserted", IntegerType(), True),
        StructField("updated", IntegerType(), True),
        StructField("deleted", IntegerType(), True),
    ])
    
    data = [(kommune_id, inserted, updated, deleted)]
    
    df_log = spark.createDataFrame(data, schema=schema)
    
    (df_log.write
        .option("mergeSchema", "true")
        .mode("append")
        .saveAsTable("your_log_table_name"))

In [0]:
def add_silver_columns(df: DataFrame, buffer: int = 50, kommune_id: str = "") -> DataFrame:
    df = df.withColumn("bbox", expr(f"array(x - {buffer}, y - {buffer}, x + {buffer}, y + {buffer})"))
    df = df.withColumn("image_path", lit(None).cast("string")) \
           .withColumn("dom_path", lit(None).cast("string")) \
           .withColumn("image_status", lit("PENDING")) \
           .withColumn("dom_status", lit("PENDING")) \
           .withColumn("lastet_tid", current_timestamp()) \
           .withColumn("kommune_id", lit(kommune_id)) \
           .withColumn("row_hash", sha2(concat_ws("||", *df.columns), 256))
    return df

In [0]:
def write_delta_table(sdf: DataFrame, mode: str = "merge") -> None:
    if mode == "overwrite":
        sdf.write.format("delta") \
            .option("mergeSchema", "true") \
            .mode("overwrite") \
            .saveAsTable(silver_table)
    else:
        from delta.tables import DeltaTable

        delta_tbl = DeltaTable.forName(spark, silver_table)

        delta_tbl.alias("target").merge(
            sdf.alias("source"),
            condition="target.row_hash = source.row_hash" 
        ).whenMatchedUpdate(
            condition="target.hentet_tid < source.hentet_tid", 
            set={col: f"source.{col}" for col in sdf.columns}
        ).whenNotMatchedInsert(
            values={col: f"source.{col}" for col in sdf.columns}
        ).execute()


In [0]:
def process_silver_for_kommune(kommune_id: str) -> None:
    kommune_id = str(kommune_id)  # sikkerhet
    bronze_df = spark.read.table(bronze_table)
    bronze_df = bronze_df.filter(col("kommune_id") == lit(kommune_id))

    silver_df = add_silver_columns(bronze_df, buffer=buffer)

    if not spark.catalog.tableExists(silver_table):
        write_delta_table(silver_df, mode="overwrite")
    else:
        expected_schema = spark.table(silver_table).schema

        silver_df = silver_df.select([
            lit("").cast("string").alias(c.name) if c.dataType.typeName() == "void"
            else col(c.name).cast(c.dataType)
            for c in expected_schema
        ])

        write_delta_table(silver_df)

    log_endepunkt_run(kommune_id, inserted=silver_df.count(), updated=0)
    logger.info(f"✅ Endepunkt silver opprettet eller oppdatert med {silver_df.count()} rader.")

In [0]:
kommune_id_rows = [
    row.asDict() for row in spark.read.table(bronze_table).select("kommune_id").distinct().collect()
]


for row in kommune_id_rows:
    print(f"Row: {row}, type: {type(row)}, kommune_id: {row['kommune_id']}, type: {type(row['kommune_id'])}")

    kommune_id = row["kommune_id"]
    if kommune_id is not None:
        kommune_id = str(kommune_id)
        process_silver_for_kommune(kommune_id)


In [0]:
# spark.sql(f"DROP TABLE IF EXISTS {silver_table}")
# spark.sql(f"DROP TABLE IF EXISTS {log_table}")