# On-Prem Iceberg Warehouse with Spark

This notebook demonstrates how to query Iceberg tables stored in MinIO using Spark connected to Polaris catalog.
All services run on-prem with no cloud connectivity.

## 1. Initialize Spark Session

The Spark session is automatically initialized with Polaris catalog configuration via PYSPARK_SUBMIT_ARGS

In [2]:
from pyspark.sql import SparkSession
import os

AWS_REGION = os.getenv("AWS_REGION")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")

POLARIS_USERNAME = os.getenv("POLARIS_USERNAME")
POLARIS_PASSWORD = os.getenv("POLARIS_PASSWORD")
POLARIS_CATALOG_NAME = os.getenv("POLARIS_CATALOG_NAME", 'demo_catalog')

packages = ",".join(
    [
        "org.apache.hadoop:hadoop-aws:3.3.4",
        "com.amazonaws:aws-java-sdk-bundle:1.12.262",
        "org.apache.iceberg:iceberg-aws-bundle:1.4.3",
        "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.3",
    ]
)

# Polaris + Iceberg with MinIO
spark = SparkSession.builder \
    .appName("Polaris-Iceberg-MinIO") \
    .config("spark.jars.packages", packages) \
    .config("spark.sql.catalog.polaris", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.polaris.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") \
    .config("spark.sql.catalog.polaris.uri", "http://polaris:8181/api/catalog") \
    .config("spark.sql.catalog.polaris.credential", f"{POLARIS_USERNAME}:{POLARIS_PASSWORD}") \
    .config("spark.sql.catalog.polaris.warehouse", POLARIS_CATALOG_NAME) \
    .config("spark.sql.catalog.polaris.scope", "PRINCIPAL_ROLE:ALL") \
    .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID) \
    .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .getOrCreate()

print("✓ Spark session created!")

✓ Spark session created!


## 2. Verify Polaris Catalog Connection

In [3]:
# List all catalogs available
spark.sql("SHOW CATALOGS").show()

+-------------+
|      catalog|
+-------------+
|spark_catalog|
+-------------+



## 3. Create a Namespace (Schema) in Polaris

In [3]:
# Create a namespace for your tables
spark.sql("CREATE NAMESPACE IF NOT EXISTS polaris.test_db")
spark.sql("SHOW NAMESPACES IN polaris").show()

+------------+
|   namespace|
+------------+
|     test_db|
|my_warehouse|
+------------+



## 4. Create Sample Iceberg Table

In [20]:
# Create a sample table
spark.sql("""
    CREATE TABLE IF NOT EXISTS polaris.my_warehouse.users (
        id INT,
        name STRING,
        email STRING,
        created_date DATE
    )
    USING ICEBERG
    PARTITIONED BY (created_date)
""")

print("Table created successfully!")

Table created successfully!


In [16]:
# Create a sample table
spark.sql("""
    drop table polaris.my_warehouse.users
""")

print("Table created successfully!")

Table created successfully!


## 5. Insert Sample Data

In [21]:


# Insert sample data
spark.sql("""
    INSERT INTO polaris.my_warehouse.users VALUES
    (1, 'Alice', 'alice@example.com', DATE '2025-01-01'),
    (2, 'Bob', 'bob@example.com', DATE '2025-01-02'),
    (3, 'Charlie', 'charlie@example.com', DATE '2025-01-03'),
    (4, 'Diana', 'diana@example.com', DATE '2025-01-04')
"""
)
print("Data inserted successfully!")

Data inserted successfully!


## 6. Query Using Schema.TableName (Default Catalog)

In [22]:
# Since polaris is set as default catalog, you can query with just schema.tablename
spark.sql("use polaris")

result = spark.sql("SELECT * FROM my_warehouse.users")
result.show()

+---+-------+-------------------+------------+
| id|   name|              email|created_date|
+---+-------+-------------------+------------+
|  1|  Alice|  alice@example.com|  2025-01-01|
|  2|    Bob|    bob@example.com|  2025-01-02|
|  3|Charlie|charlie@example.com|  2025-01-03|
|  4|  Diana|  diana@example.com|  2025-01-04|
+---+-------+-------------------+------------+



## 7. Query with Catalog.Schema.TableName

In [23]:
# You can also use full 3-part name
result = spark.sql("SELECT * FROM polaris.my_warehouse.users WHERE id > 2")
result.show()

+---+-------+-------------------+------------+
| id|   name|              email|created_date|
+---+-------+-------------------+------------+
|  3|Charlie|charlie@example.com|  2025-01-03|
|  4|  Diana|  diana@example.com|  2025-01-04|
+---+-------+-------------------+------------+



## 8. Verify MinIO Storage

In [24]:
# List tables in the namespace
spark.sql("SHOW TABLES IN polaris.my_warehouse").show()

+------------+---------+-----------+
|   namespace|tableName|isTemporary|
+------------+---------+-----------+
|my_warehouse|    users|      false|
|my_warehouse|   orders|      false|
+------------+---------+-----------+



## 9. Check Table Metadata

In [25]:
# View table schema
spark.sql("DESCRIBE TABLE polaris.my_warehouse.users").show()

# View table properties
print("\n--- Table Properties ---")
spark.sql("SHOW TBLPROPERTIES polaris.my_warehouse.users").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|                  id|      int|   NULL|
|                name|   string|   NULL|
|               email|   string|   NULL|
|        created_date|     date|   NULL|
|# Partition Infor...|         |       |
|          # col_name|data_type|comment|
|        created_date|     date|   NULL|
+--------------------+---------+-------+


--- Table Properties ---
+--------------------+--------------------+
|                 key|               value|
+--------------------+--------------------+
|          created-at|2025-12-11T14:22:...|
| current-snapshot-id| 9196138478529762456|
|              format|     iceberg/parquet|
|      format-version|                   2|
|write.parquet.com...|                zstd|
+--------------------+--------------------+



## 10. Query with DataFrame API

In [26]:
# You can also use DataFrame API
df = spark.table("my_warehouse.users")
df.filter(df.id > 1).select("name", "email").show()

+-------+-------------------+
|   name|              email|
+-------+-------------------+
|    Bob|    bob@example.com|
|Charlie|charlie@example.com|
|  Diana|  diana@example.com|
+-------+-------------------+



## 11. Create Another Table to Demonstrate Multi-table Queries

In [27]:
# Create orders table
spark.sql("""
    CREATE TABLE IF NOT EXISTS polaris.my_warehouse.orders (
        order_id INT,
        user_id INT,
        amount DECIMAL(10,2),
        order_date DATE
    )
    USING ICEBERG
    PARTITIONED BY (order_date)
""")

# Insert sample data
spark.sql("""
    INSERT INTO polaris.my_warehouse.orders VALUES
    (101, 1, 150.50, DATE '2025-01-10'),
    (102, 2, 200.00, DATE '2025-01-11'),
    (103, 1, 75.25, DATE '2025-01-12'),
    (104, 3, 300.00, DATE '2025-01-13')
""")

print("Orders table created and populated!")

Orders table created and populated!


## 12. Join Across Tables

In [28]:
# Join users and orders
result = spark.sql("""
    SELECT u.name, u.email, o.order_id, o.amount
    FROM my_warehouse.users u
    JOIN my_warehouse.orders o ON u.id = o.user_id
    ORDER BY o.order_id
""")

result.show()

+-------+-------------------+--------+------+
|   name|              email|order_id|amount|
+-------+-------------------+--------+------+
|  Alice|  alice@example.com|     101|150.50|
|  Alice|  alice@example.com|     101|150.50|
|    Bob|    bob@example.com|     102|200.00|
|    Bob|    bob@example.com|     102|200.00|
|  Alice|  alice@example.com|     103| 75.25|
|  Alice|  alice@example.com|     103| 75.25|
|Charlie|charlie@example.com|     104|300.00|
|Charlie|charlie@example.com|     104|300.00|
+-------+-------------------+--------+------+



## 13. Advanced: Time Travel (Iceberg Feature)

In [38]:
# View table history
spark.sql("SELECT * FROM polaris.my_warehouse.users.snapshots").show()

# You can query specific snapshots if needed
# spark.sql("SELECT * FROM polaris.my_warehouse.users VERSION AS OF 9196138478529762456").show()

+--------------------+-------------------+---------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+---------+---------+--------------------+--------------------+
|2025-12-11 14:22:...|9196138478529762456|     NULL|   append|s3://warehouse/my...|{spark.app.id -> ...|
+--------------------+-------------------+---------+---------+--------------------+--------------------+



## 14. Configuration Check

In [34]:
# Verify your Spark configuration
print("Spark SQL Catalog Config:")
print(f"Default Catalog: {spark.conf.get('spark.sql.defaultCatalog')}")
print(f"Polaris URI: {spark.conf.get('spark.sql.catalog.polaris.uri')}")
print(f"S3 Endpoint: {spark.conf.get('spark.hadoop.fs.s3a.endpoint')}")
print(f"Warehouse Path: {spark.conf.get('spark.sql.catalog.polaris.warehouse')}")

Spark SQL Catalog Config:
Default Catalog: spark_catalog
Polaris URI: http://host.docker.internal:8181/api/catalog
S3 Endpoint: http://minio:9000
Warehouse Path: demo_catalog
