In [None]:
from pyspark.sql import SparkSession

# Spark session - uses spark-defaults.conf, no manual config needed
spark = SparkSession.builder \
    .appName("Hive-Iceberg-MinIO") \
    .enableHiveSupport() \
    .getOrCreate()

print("✓ Spark session created!")
print("✓ Connected to Hive Metastore")
print("✓ Iceberg catalog ready")
print()
print(f"Spark version: {spark.version}")
print(f"Default catalog: {spark.catalog.currentCatalog()}")

## Create a Database and Table

In [None]:
# Create a database (namespace)
spark.sql("CREATE DATABASE IF NOT EXISTS demo")
print("✓ Database 'demo' created!")

# Show databases
spark.sql("SHOW DATABASES").show()

In [None]:
# Create an Iceberg table
spark.sql("""
    CREATE TABLE IF NOT EXISTS demo.users (
        id INT,
        name STRING,
        email STRING,
        created_at TIMESTAMP
    )
    USING iceberg
    PARTITIONED BY (days(created_at))
""")
print("✓ Table 'demo.users' created!")

# Show tables
spark.sql("SHOW TABLES IN demo").show()

## Insert and Query Data

In [None]:
# Insert some data
spark.sql("""
    INSERT INTO demo.users VALUES
    (1, 'Alice', 'alice@example.com', current_timestamp()),
    (2, 'Bob', 'bob@example.com', current_timestamp()),
    (3, 'Charlie', 'charlie@example.com', current_timestamp())
""")
print("✓ Data inserted!")

In [None]:
# Query the data
spark.sql("SELECT * FROM demo.users").show()

## Updates and Deletes (Iceberg feature)

In [None]:
# Update a row
spark.sql("UPDATE demo.users SET email = 'alice.new@example.com' WHERE id = 1")
print("✓ Updated Alice's email")

# Delete a row
spark.sql("DELETE FROM demo.users WHERE id = 3")
print("✓ Deleted Charlie")

# Check results
spark.sql("SELECT * FROM demo.users").show()

## Time Travel (Iceberg feature)

Every change creates a snapshot. You can query historical versions.

In [None]:
# View table history
spark.sql("SELECT * FROM demo.users.history").show(truncate=False)

In [None]:
# View snapshots
spark.sql("SELECT snapshot_id, committed_at, operation FROM demo.users.snapshots").show(truncate=False)

In [None]:
# Query an older snapshot (get snapshot_id from above)
# Uncomment and replace with actual snapshot_id:
# spark.sql("SELECT * FROM demo.users VERSION AS OF <snapshot_id>").show()

## Schema Evolution (Iceberg feature)

Add, drop, rename columns without rewriting data.

In [None]:
# Add a new column
spark.sql("ALTER TABLE demo.users ADD COLUMN age INT")
print("✓ Added 'age' column")

# Describe the table
spark.sql("DESCRIBE demo.users").show()

In [None]:
# Update with new column
spark.sql("UPDATE demo.users SET age = 30 WHERE id = 1")
spark.sql("UPDATE demo.users SET age = 25 WHERE id = 2")

spark.sql("SELECT * FROM demo.users").show()

## Table Metadata

In [None]:
# View files that make up the table
spark.sql("SELECT file_path, file_size_in_bytes, record_count FROM demo.users.files").show(truncate=False)

In [None]:
# View partitions
spark.sql("SELECT * FROM demo.users.partitions").show()

## Cleanup (Optional)

In [None]:
# Drop table (uncomment to run)
# spark.sql("DROP TABLE IF EXISTS demo.users")
# print("✓ Table dropped")

# Drop database (uncomment to run)
# spark.sql("DROP DATABASE IF EXISTS demo CASCADE")
# print("✓ Database dropped")

In [None]:
# Stop Spark session (optional, frees resources)
# spark.stop()