### Setup and test

In [1]:
# --------------------------------------------------------------------------
# 00_setup_and_test.ipynb
# Spark + Iceberg + MinIO via Polaris REST catalog
# --------------------------------------------------------------------------

from pyspark.sql import SparkSession
from pathlib import Path
import os

# Polaris credentials (are generated into .env by the bootstrap container)
POLARIS_CLIENT_ID = os.getenv("POLARIS_CLIENT_ID")
POLARIS_CLIENT_SECRET = os.getenv("POLARIS_CLIENT_SECRET")

if not POLARIS_CLIENT_ID or not POLARIS_CLIENT_SECRET:
    raise RuntimeError(
        "‚ùå Polaris credentials ontbreken. Zet POLARIS_CLIENT_ID en POLARIS_CLIENT_SECRET in de omgeving."
    )

# Polaris endpoints + scope
POLARIS_URI = os.getenv("POLARIS_URI", "http://polaris:8181/api/catalog")
POLARIS_OAUTH2 = os.getenv(
    "POLARIS_OAUTH2_TOKEN_URL", "http://polaris:8181/api/catalog/v1/oauth/tokens"
)
POLARIS_SCOPE = os.getenv("POLARIS_SCOPE", "PRINCIPAL_ROLE:ALL")
POLARIS_WAREHOUSE = os.getenv("POLARIS_CATALOG_NAME", "polaris")

# === 0Ô∏è‚É£ Oude Spark-sessie stoppen (veilig) ===
if "spark" in locals():
    try:
        spark.stop()
        print("üßπ Oude Spark-sessie gestopt.")
    except Exception as e:
        print(f"‚ö†Ô∏è Kon Spark niet netjes stoppen: {e}")

# === 1Ô∏è‚É£ SparkSession met Iceberg Polaris REST Catalog ===
spark = (
    SparkSession.builder
        .appName("Lakehouse-Unplugged")

        # Iceberg
        .config(
            "spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
        )

        # Polaris REST catalog
        .config("spark.sql.defaultCatalog", "polaris")
        .config("spark.sql.catalog.polaris", "org.apache.polaris.spark.SparkCatalog")
        .config("spark.sql.catalog.polaris.uri", POLARIS_URI)
        .config("spark.sql.catalog.polaris.warehouse", POLARIS_WAREHOUSE)
        .config(
            "spark.sql.catalog.polaris.credential",
            f"{POLARIS_CLIENT_ID}:{POLARIS_CLIENT_SECRET}"
        )
        .config("spark.sql.catalog.polaris.oauth2-server-uri", POLARIS_OAUTH2)
        .config("spark.sql.catalog.polaris.scope", POLARIS_SCOPE)
        .config(
            "spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation",
            "vended-credentials",
        )
        .config("spark.sql.catalog.polaris.token-refresh-enabled", "true")

        # Performance (licht, PoC)
        .config("spark.sql.shuffle.partitions", "4")
        .config("spark.sql.adaptive.enabled", "true")

        # S3A / MinIO voor de data-files
        .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
        .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
        .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
        .config("spark.hadoop.fs.s3a.path.style.access", "true")
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

        .getOrCreate()
)

# === 2Ô∏è‚É£ Validatie ===
hconf = spark._jsc.hadoopConfiguration()

print("‚úÖ Spark gestart via Polaris")
print(f"üî¢ Spark versie: {spark.version}")
print(f"üì¶ Default catalog: {spark.conf.get('spark.sql.defaultCatalog')}")
print(f"üß≠ Polaris URI: {spark.conf.get('spark.sql.catalog.polaris.uri')}")
print(f"üè≠ Polaris warehouse: {spark.conf.get('spark.sql.catalog.polaris.warehouse')}")
print(f"üéØ Polaris scope: {spark.conf.get('spark.sql.catalog.polaris.scope')}")
print(f"üåê S3A endpoint: {hconf.get('fs.s3a.endpoint')}")



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/05 07:35:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


‚úÖ Spark gestart via Polaris
üî¢ Spark versie: 3.5.1
üì¶ Default catalog: polaris
üß≠ Polaris URI: http://polaris:8181
üè≠ Polaris warehouse: polaris
üéØ Polaris scope: PRINCIPAL_ROLE:ALL
üåê S3A endpoint: http://minio:9000


#### Controleer de setup

In [4]:
from pyspark.sql import SparkSession

print("üîç Controle: Polaris REST + MinIO via S3A...")

try:
    # Polaris-config uit Spark
    catalog_uri = spark.conf.get("spark.sql.catalog.polaris.uri")
    warehouse = spark.conf.get("spark.sql.catalog.polaris.warehouse")
    scope = spark.conf.get("spark.sql.catalog.polaris.scope")

    print(f"üß≠ Polaris URI: {catalog_uri}")
    print(f"üè≠ Polaris warehouse: {warehouse}")
    print(f"üéØ Polaris scope: {scope}")

    # Hadoop-config uit Spark voor MinIO I/O
    conf = spark.sparkContext._jsc.hadoopConfiguration()
    endpoint = conf.get("fs.s3a.endpoint")
    impl = conf.get("fs.s3a.impl")
    access_key = conf.get("fs.s3a.access.key")

    print(f"üåê S3A endpoint: {endpoint}")
    print(f"‚öôÔ∏è  Implementation: {impl}")
    print(f"üîë Access key: {access_key}")

    print("üì° Probe: probeer S3A pad te controleren...")

    # Test pad: je daadwerkelijke warehouse in MinIO
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
        spark._jvm.java.net.URI("s3a://warehouse/"), conf
    )
    path = spark._jvm.org.apache.hadoop.fs.Path("s3a://warehouse/")

    if fs.exists(path):
        print("‚úÖ Verbinding OK ‚Äî 'warehouse' bucket is bereikbaar via S3A.")
    else:
        print("‚ö†Ô∏è Verbinding OK, maar 'warehouse' bucket bestaat nog niet.")
        print("   ‚ûú Spark maakt deze automatisch aan bij de eerste write.")

except Exception as e:
    print("‚ùå Fout bij verbinding met Polaris/MinIO:")
    print(e)



üîç Controle: Polaris REST + MinIO via S3A...
üß≠ Polaris URI: http://polaris:8181
üè≠ Polaris warehouse: polaris
üéØ Polaris scope: PRINCIPAL_ROLE:ALL
üåê S3A endpoint: http://minio:9000
‚öôÔ∏è  Implementation: org.apache.hadoop.fs.s3a.S3AFileSystem
üîë Access key: minioadmin
üì° Probe: probeer S3A pad te controleren...


26/01/05 07:36:28 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


‚úÖ Verbinding OK ‚Äî 'warehouse' bucket is bereikbaar via S3A.


#### Parkeer bestande in de landingzone

In [5]:
import boto3
from pathlib import Path

# ======================================================================
# 0Ô∏è‚É£ Helper: zoek automatisch lokaal data-bestand
# ======================================================================
def find_data_file(filename: str) -> Path:
    p = Path.cwd()
    for _ in range(4):
        candidate = p / "data" / filename
        if candidate.exists():
            return candidate
        p = p.parent
    raise FileNotFoundError(f"‚ùå Kon '{filename}' niet vinden in een 'data' map.")

# ======================================================================
# 1Ô∏è‚É£ Config
# ======================================================================
local_file = find_data_file("gekentekendevoertuigen_sample.json")
bucket = "warehouse"
prefix = "landing"

object_key = f"{prefix}/{local_file.name}"
s3_uri = f"s3a://{bucket}/{object_key}"

print(f"üìÑ Lokaal bestand: {local_file}")
print(f"‚¨ÜÔ∏è Upload naar:  {s3_uri}")


# ======================================================================
# 2Ô∏è‚É£ MinIO client via boto3
# ======================================================================
s3 = boto3.client(
    "s3",
    endpoint_url="http://minio:9000",
    aws_access_key_id="minioadmin",
    aws_secret_access_key="minioadmin",
    region_name="us-east-1",
)

# Upload bestand
s3.upload_file(str(local_file), bucket, object_key)

print("‚úÖ Upload gelukt.")


# ======================================================================
# 3Ô∏è‚É£ Verify: lijst objecten in prefix
# ======================================================================
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
print("üì¶ Objecten in MinIO:")
for item in response.get("Contents", []):
    print(" -", item["Key"])


# ======================================================================
# 4Ô∏è‚É£ Spark read via S3A (data-files), metadata via Polaris
# ======================================================================
print(f"üì• Inlezen via Spark: {s3_uri}")

df = spark.read.option("multiline", "true").json(s3_uri)

print(f"üìä Aantal records: {df.count():,}")
df.printSchema()



üìÑ Lokaal bestand: /workspace/data/gekentekendevoertuigen_sample.json
‚¨ÜÔ∏è Upload naar:  s3a://warehouse/landing/gekentekendevoertuigen_sample.json
‚úÖ Upload gelukt.
üì¶ Objecten in MinIO:
 - landing/gekentekendevoertuigen_sample.json
üì• Inlezen via Spark: s3a://warehouse/landing/gekentekendevoertuigen_sample.json


                                                                                

Py4JJavaError: An error occurred while calling o69.count.
: org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'polaris': org.apache.polaris.spark.SparkCatalog.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.catalogPluginClassNotFoundForCatalogError(QueryExecutionErrors.scala:1925)
	at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:70)
	at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:53)
	at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
	at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:53)
	at org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:122)
	at org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:93)
	at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:143)
	at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:140)
	at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:295)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:295)
	at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:275)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
	at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
	at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
	at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:152)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:148)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:144)
	at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:162)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:182)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:179)
	at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:238)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:284)
	at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:252)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:117)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:3613)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.ClassNotFoundException: org.apache.polaris.spark.SparkCatalog
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:60)
	... 58 more


#### Ingest into bronze table

In [6]:
# ======================================================================
# üîÑ Ingest van Landingzone ‚Üí Bronze (Iceberg via Polaris)
# ======================================================================

# Input configuratie (aangeleverd vanuit eerdere cel)
bucket = "warehouse"
prefix = "landing"
local_file = find_data_file("gekentekendevoertuigen_sample.json")   # naam komt uit je workflow
object_key = f"{prefix}/{local_file.name}"
s3_uri = f"s3a://{bucket}/{object_key}"

bronze_table = "polaris.bronze.gekentekendevoertuigen"

print(f"üì• Lezen vanuit landingzone: {s3_uri}")

# 1Ô∏è‚É£ Data inlezen uit landingzone
df = spark.read.json(s3_uri)

print(f"üì¶ Aantal records geladen: {df.count()}")

# 2Ô∏è‚É£ Namespace garanderen
spark.sql("CREATE NAMESPACE IF NOT EXISTS polaris.bronze")

# 3Ô∏è‚É£ Wegschrijven naar Iceberg Bronze
print(f"üßä Schrijven naar Bronze tabel: {bronze_table}")

(
    df.writeTo(bronze_table)
      .using("iceberg")
      .option("format-version", "2")
      .option("overwrite-mode", "dynamic")
      .createOrReplace()
)

print(f"‚úÖ Bronze tabel bijgewerkt: {bronze_table}")

# 4Ô∏è‚É£ Tabellen tonen
print("
üìã Tabellen in polaris.bronze:")
spark.sql("SHOW TABLES IN polaris.bronze").show(truncate=False)

# 5Ô∏è‚É£ Bronze teruglezen ter controle
bronze_df = spark.read.table(bronze_table)

print(f"
üîÅ Records in Bronze: {bronze_df.count():,}")
bronze_df.show(5, truncate=False)



üì• Lezen vanuit landingzone: s3a://warehouse/landing/gekentekendevoertuigen_sample.json


                                                                                

üì¶ Aantal records geladen: 10002
üßä Schrijven naar Bronze tabel: lakehouse.bronze.gekentekendevoertuigen


25/11/22 10:28:52 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/11/22 10:28:55 WARN HadoopTableOperations: Error reading version hint file s3a://warehouse/bronze/gekentekendevoertuigen/metadata/version-hint.text
java.io.FileNotFoundException: No such file or directory: s3a://warehouse/bronze/gekentekendevoertuigen/metadata/version-hint.text
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.extractOrFetchSimpleFileStatus(S3AFileSystem.java:5401)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1465)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1441)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
	at org.apache.iceberg.hadoop.HadoopTableOperations

‚úÖ Bronze tabel bijgewerkt: lakehouse.bronze.gekentekendevoertuigen

üìã Tabellen in lakehouse.bronze:
+---------+----------------------+-----------+
|namespace|tableName             |isTemporary|
+---------+----------------------+-----------+
|bronze   |gekentekendevoertuigen|false      |
+---------+----------------------+-----------+


üîÅ Records in Bronze: 10,002
+---------------+----------------------------+----------------------------+----------------+-------------+-----------------------+--------------------+-------------+------------------+-----------------------------------------------+---------------------------------------------+---------------------------+-----------------------------------------------+-----------------------------------------------+-----------------------------------------------+-------------------------------------------------+-----------------------------------------------+-------+------------------------+------------------------+---------+-----------

                                                                                

#### Query de bronze table

In [22]:
from pyspark.sql.functions import col

df = spark.read.table("polaris.bronze.gekentekendevoertuigen")

print("üöó Top 5 voertuigsoorten:")
(
    df.groupBy("voertuigsoort")
      .count()
      .orderBy(col("count").desc())
      .show(5, truncate=False)
)

print("
üè∑Ô∏è Top 5 merken:")
(
    df.groupBy("merk")
      .count()
      .orderBy(col("count").desc())
      .show(5, truncate=False)
)

print("
üî§ Top 5 handelsbenamingen:")
(
    df.groupBy("handelsbenaming")
      .count()
      .orderBy(col("count").desc())
      .show(5, truncate=False)
)

print("
‚ö° Top 5 voertuigen op vermogen (massarijklaar):")
(
    df.select("merk", "handelsbenaming", "vermogen_massarijklaar")
      .orderBy(col("vermogen_massarijklaar").desc_nulls_last())
      .show(5, truncate=False)
)



üöó Top 5 voertuigsoorten:
+--------------------+-----+
|voertuigsoort       |count|
+--------------------+-----+
|Personenauto        |7078 |
|Bedrijfsauto        |1237 |
|Bromfiets           |782  |
|Motorfiets          |258  |
|Middenasaanhangwagen|136  |
+--------------------+-----+
only showing top 5 rows


üè∑Ô∏è Top 5 merken:
+-------------+-----+
|merk         |count|
+-------------+-----+
|VOLKSWAGEN   |1076 |
|PEUGEOT      |615  |
|RENAULT      |606  |
|MERCEDES-BENZ|565  |
|FORD         |553  |
+-------------+-----+
only showing top 5 rows


üî§ Top 5 handelsbenamingen:
+---------------+-----+
|handelsbenaming|count|
+---------------+-----+
|POLO           |219  |
|GOLF           |202  |
|FOCUS          |138  |
|N/A            |135  |
|CLIO           |125  |
+---------------+-----+
only showing top 5 rows


‚ö° Top 5 voertuigen op vermogen (massarijklaar):
+-------+--------------------+----------------------+
|merk   |handelsbenaming     |vermogen_massarijklaar|
+-------+