In [0]:




# Databricks Notebook: Komplett ETL-pipeline for parkeringsdata (uten Unity Catalog)

# ------------------------------
# 1. Hent data fra ekstern URL
# ------------------------------
import requests
import json
from pyspark.sql import SparkSession
from tests.etl_pipeline import sjekk_duplikater, valider_manglende, valider_gyldige_verdier, konverter_timestamp
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import when, col, trim, lower



# Hent JSON-data fra en offentlig URL
url = "https://opencom.no/dataset/36ceda99-bbc3-4909-bc52-b05a6d634b3f/resource/d1bdc6eb-9b49-4f24-89c2-ab9f5ce2acce/download/parking.json"

# Hent data med robust feilhåndtering
try:
    response = requests.get(url)
    response.raise_for_status()
    data = response.json()
except Exception as e:
    # Avslutt notebooken med feilmelding hvis noe går galt
    dbutils.notebook.exit(f"Feil ved henting av data: {str(e)}")

# Konverter JSON-data til Spark DataFrame
# (Datatypene blir automatisk inferert, men kan tilpasses manuelt ved behov)
df_raw = spark.createDataFrame(data)






In [0]:
# 2. Lagre rådata (bronse)
# ----------------------------------
# Append for å bevare historiske data (bronse = ubehandlet rådata)

from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp


bronze_path = "/mnt/bronze/staging_parking"


df_raw.write.format("delta").mode("append").option("mergeSchema", "true").save(bronze_path)
spark.sql(f"CREATE TABLE IF NOT EXISTS default.staging_parking USING DELTA LOCATION '{bronze_path}'")





In [0]:
# --------------------------------------------------------------
# 3. Kvalitetssikre data og konverter dato/klokkeslett (silver)
# --------------------------------------------------------------

from pyspark.sql.functions import when, col, trim, lower
from pyspark.sql.types import IntegerType


# Les inn staging-parking-tabellen (bronse) på nytt, slik at koden er uavhengig av df_raw i minne
df_raw = spark.table("default.staging_parking")


# Konverter "Fullt" til 0 og cast antall_ledige_plasser til Integer
df_raw = df_raw.withColumn(
    "Antall_ledige_plasser",
    when(lower(trim(col("Antall_ledige_plasser"))) == "fullt", 0)
    .otherwise(col("Antall_ledige_plasser"))
    .cast(IntegerType())
)


valider_manglende(df_raw) # sjekk at det ikke finnes manglende verdier i rådataene 
valider_gyldige_verdier(df_raw) # sjekk at verdiene er gyldige 
df_deduped = sjekk_duplikater(df_raw) # sjekk for duplikater
df_cleaned = konverter_timestamp(df_deduped) # rens og konverter dato og klokkelett




# df_cleaned er nå klar til å bli lagret i silver

silver_path = "/mnt/silver/parking_cleaned"


(
    df_cleaned
    .write
    .format("delta")
    .mode("append")  # append = keep history, overwrite = replace batch
    .option("mergeSchema", "true")
    .save(silver_path)
)



# Register as a Hive Metastore table for easy SQL queries
spark.sql(f"""
CREATE TABLE IF NOT EXISTS default.parking_cleaned
USING DELTA
LOCATION '{silver_path}'
""")




In [0]:
# ----------------------------------
# 4. Dimensjonstabell: Parkering
# ----------------------------------
from pyspark.sql.functions import col


# Les inn parking_cleaned (silver) på nytt, slik at koden er uavhengig av df_cleaned i minne
df_cleaned = spark.table("default.parking_cleaned")

# Hent eksisterende tabell hvis den finnes, ellers None
existing_dim_parkering = spark.table("default.dim_parkering") if "default.dim_parkering" in [t.name for t in spark.catalog.listTables("default")] else None

# Finn nye unike parkeringsplasser og gi kolonnene mer beskrivende navn
new_dim_parkering = df_cleaned.select("Sted", "Latitude", "Longitude").distinct()
new_dim_parkering = new_dim_parkering.withColumnRenamed("Sted", "Parkering_navn")


# Bruk MERGE for å oppdatere dimensjonstabellen uten duplikater

# gir lik funksjonalitet som å bruke DataFrame-basert delta merge med DeltaTable.forName(...).alias(...).merge(...)
# kun forskjell i syntaks

spark.sql("""
MERGE INTO default.dim_parkering AS target
USING (SELECT DISTINCT Sted AS Parkering_navn, Latitude, Longitude FROM default.staging_parking) AS source
ON target.Parkering_navn = source.Parkering_navn
WHEN NOT MATCHED THEN INSERT *
""")


In [0]:
# ----------------------------------
# 5. Dimensjonstabell: Tid
# ----------------------------------



from pyspark.sql.functions import to_date, hour, minute
from pyspark.sql import Row


# Les inn parking_cleaned (silver) på nytt, slik at koden er uavhengig av df_cleaned i minne
df_cleaned = spark.table("default.parking_cleaned")

# Bygg batch kun basert på nyeste data
dim_tid_batch = df_cleaned.select(
    to_date("timestamp").alias("dato"),
    hour("timestamp").alias("time"),
    minute("timestamp").alias("minutt")
).distinct()


# gjør tabellen tilgjengelig for Spark SQL
dim_tid_batch.createOrReplaceTempView("new_dim_tid")


# MERGE for å legge til nye tidspunkter uten duplikater

# # gir lik funksjonalitet som å bruke DataFrame-basert delta merge med DeltaTable.forName(...).alias(...).merge(...)
# kun forskjell i syntaks
# alle delta tables blir lest inn i Spark som DataFrames når man gjør spørringer på dem


spark.sql("""
MERGE INTO default.dim_tid AS target
USING new_dim_tid AS source
ON target.dato = source.dato AND target.time = source.time AND target.minutt = source.minutt
WHEN NOT MATCHED THEN INSERT *
""")



In [0]:
# ------------------------------------------
# 6. Faktatabell: Parkeringskapasitet (gold)

# gold-layer

# --------------------------------------------
from pyspark.sql.functions import to_date, hour, minute, date_format, to_timestamp, when, lit
from pyspark.sql.functions import col, max as max_
from pyspark.sql.types import IntegerType


# Hent 9 observasjoner per kjøring (én per lokasjon)

# spark.sql("TRUNCATE TABLE fakt_parkering")

# Prepare base DataFrame



# Les inn parking_cleaned (silver) på nytt, slik at koden er uavhengig av df_cleaned i minne
df_cleaned = spark.table("default.parking_cleaned")

new_fakt = df_cleaned.select(
    to_date("timestamp").alias("dato"),
    date_format(to_timestamp("timestamp"), "HH:mm").alias("klokkeslett"),
    col("Sted"),
    col("Antall_ledige_plasser").cast(IntegerType()).alias("Antall_ledige_plasser"),
    col("timestamp")
)


gold_path = "/mnt/gold/fakt_parkering"

(
    new_fakt
    .write
    .format("delta")
    .mode("append")  # append for incremental loads
    .option("mergeSchema", "true")
    .save(gold_path)
)


# Register as Gold table in Hive Metastore
spark.sql(f"""
CREATE TABLE IF NOT EXISTS default.fakt_parkering
USING DELTA
LOCATION '{gold_path}'
""")





# OPTIMIZE forbedrer lagring og spørringsytelse

# ZORDER BY (timestamp) organiserer dataene fysisk på disken slik at rader med lignende verdier for 'timestamp' lagres nær hverandre. 
# dette gjør det mye raskere å hente ut data basert på tid, spesielt ved filtrering på dato/tid

# spark.sql("OPTIMIZE default.fakt_parkering ZORDER BY (timestamp)")


new_fakt.write.format("delta").mode("overwrite").saveAsTable("default.fakt_parkering_siste9")


