# On-Prem Iceberg Warehouse with Spark

This notebook demonstrates how to query Iceberg tables stored in MinIO using Spark connected to Polaris catalog.
All services run on-prem with no cloud connectivity.

## 1. Initialize Spark Session

The Spark session is automatically initialized with Polaris catalog configuration via PYSPARK_SUBMIT_ARGS

In [None]:
from pyspark.sql import SparkSession

# Check if spark session already exists (it should from PySpark notebook)
spark

## 2. Verify Polaris Catalog Connection

In [None]:
# List all catalogs available
spark.sql("SHOW CATALOGS").show()

## 3. Create a Namespace (Schema) in Polaris

In [None]:
# Create a namespace for your tables
spark.sql("CREATE NAMESPACE IF NOT EXISTS polaris.my_warehouse")
spark.sql("SHOW NAMESPACES IN polaris").show()

## 4. Create Sample Iceberg Table

In [None]:
# Create a sample table
spark.sql("""
    CREATE TABLE IF NOT EXISTS polaris.my_warehouse.users (
        id INT,
        name STRING,
        email STRING,
        created_date DATE
    )
    USING ICEBERG
    PARTITIONED BY (created_date)
""")

print("Table created successfully!")

## 5. Insert Sample Data

In [None]:
from datetime import date

# Insert sample data
spark.sql("""
    INSERT INTO polaris.my_warehouse.users VALUES
    (1, 'Alice', 'alice@example.com', '2025-01-01'),
    (2, 'Bob', 'bob@example.com', '2025-01-02'),
    (3, 'Charlie', 'charlie@example.com', '2025-01-03'),
    (4, 'Diana', 'diana@example.com', '2025-01-04')
""")

print("Data inserted successfully!")

## 6. Query Using Schema.TableName (Default Catalog)

In [None]:
# Since polaris is set as default catalog, you can query with just schema.tablename
result = spark.sql("SELECT * FROM my_warehouse.users")
result.show()

## 7. Query with Catalog.Schema.TableName

In [None]:
# You can also use full 3-part name
result = spark.sql("SELECT * FROM polaris.my_warehouse.users WHERE id > 2")
result.show()

## 8. Verify MinIO Storage

In [None]:
# List tables in the namespace
spark.sql("SHOW TABLES IN polaris.my_warehouse").show()

## 9. Check Table Metadata

In [None]:
# View table schema
spark.sql("DESCRIBE TABLE polaris.my_warehouse.users").show()

# View table properties
print("\n--- Table Properties ---")
spark.sql("SHOW TBLPROPERTIES polaris.my_warehouse.users").show()

## 10. Query with DataFrame API

In [None]:
# You can also use DataFrame API
df = spark.table("my_warehouse.users")
df.filter(df.id > 1).select("name", "email").show()

## 11. Create Another Table to Demonstrate Multi-table Queries

In [None]:
# Create orders table
spark.sql("""
    CREATE TABLE IF NOT EXISTS polaris.my_warehouse.orders (
        order_id INT,
        user_id INT,
        amount DECIMAL(10,2),
        order_date DATE
    )
    USING ICEBERG
    PARTITIONED BY (order_date)
""")

# Insert sample data
spark.sql("""
    INSERT INTO polaris.my_warehouse.orders VALUES
    (101, 1, 150.50, '2025-01-10'),
    (102, 2, 200.00, '2025-01-11'),
    (103, 1, 75.25, '2025-01-12'),
    (104, 3, 300.00, '2025-01-13')
""")

print("Orders table created and populated!")

## 12. Join Across Tables

In [None]:
# Join users and orders
result = spark.sql("""
    SELECT u.name, u.email, o.order_id, o.amount
    FROM my_warehouse.users u
    JOIN my_warehouse.orders o ON u.id = o.user_id
    ORDER BY o.order_id
""")

result.show()

## 13. Advanced: Time Travel (Iceberg Feature)

In [None]:
# View table history
spark.sql("SELECT * FROM polaris.my_warehouse.users.history").show()

# You can query specific snapshots if needed
# spark.sql("SELECT * FROM polaris.my_warehouse.users VERSION AS OF 1").show()

## 14. Configuration Check

In [None]:
# Verify your Spark configuration
print("Spark SQL Catalog Config:")
print(f"Default Catalog: {spark.conf.get('spark.sql.defaultCatalog')}")
print(f"Polaris URI: {spark.conf.get('spark.sql.catalog.polaris.uri')}")
print(f"S3 Endpoint: {spark.conf.get('spark.hadoop.fs.s3a.endpoint')}")
print(f"Warehouse Path: {spark.conf.get('spark.sql.catalog.polaris.warehouse')}")