# Using Iceberg and Query Engines

This notebook demonstrates how you can use Iceberg and Spark / Trino to query your data.

We will:
- Configure Spark with Iceberg runtime libaries
- Explore how iceberg works
- Explore data we just generated
- Come up with a plan to run a Spark job with Spark Operator


## Setting up Spark and Iceberg

In [1]:
from pyspark.sql import SparkSession
import os

# These packages are necessary for running this section on EKS. We have made these jars available within the container image, therefore the line below is commented out.
# packages = "org.apache.iceberg:iceberg-spark:1.10.0,org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.0,org.apache.iceberg:iceberg-spark-extensions-4.0_2.13:1.10.0,com.amazonaws:aws-java-sdk-bundle:1.12.791,software.amazon.awssdk:bundle:2.34.0,org.apache.hadoop:hadoop-aws:3.4.1"
spark = SparkSession.builder \
    .appName("IcebergInspector") \
    .config("spark.sql.catalog.workshop", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.workshop.type", "glue") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.defaultCatalog", "workshop") \
    .config("spark.sql.catalog.workshop.warehouse", "s3a://data-on-eks-spark-logs-20251001184655839600000005/iceberg-warehouse/") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem").getOrCreate()
    # .config("spark.sql.catalog.workshop.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \


spark.sparkContext.setLogLevel("DEBUG")

## Explore Iceberg Tables

In [None]:
# We read database information from the catalog
spark.sql("show databases").show()
# Then use the database for all queries going forward.
spark.sql("use data_on_eks")
# Show all tables available with in the database
spark.sql("show tables").show()

# Peeking Under the Hood: Exploring Metadata Tables
One of Iceberg's most powerful features is its transparency. Unlike other table formats where the underlying structure is hidden, Iceberg allows us to directly query its metadata to understand exactly what's going on.
We'll use special sub-tables like .files, .history, and .snapshots to inspect the table's physical layout and history.

Let's start by looking at the actual data files on disk. Every time we write data, Iceberg creates one or more files (in this case, Parquet files). The .files metadata table gives us a complete list of every data file that makes up the current snapshot of the table.
This demonstrates how small, frequent writes can lead to multiple small files.



In [None]:
spark.sql("""
SELECT
    file_path, file_format, file_size_in_bytes, record_count
FROM
    data_on_eks.cat_locations_raw.files
ORDER BY file_path DESC
LIMIT 10
""").show()

## The History Table
The .history table is like a commit log for your data. Every change made to the table—like our INSERT statements—is recorded as an entry. This provides a clear, chronological audit trail of how the table has evolved. The query below will show the timeline of when each snapshot became the current version of the table. C. Show the Snapshot Details (.snapshots) While .history shows the timeline, the .snapshots table gives us the rich details for each snapshot. A snapshot is an immutable view of the table's complete state at a specific point in time. The query below shows what operation created each snapshot (append, overwrite, etc.) and a summary of the changes, like how many records and files were added. This table is the key that enables powerful features like time travel.

The `is_current_ancestor` column is a boolean flag in the history table that answers a simple question: "Is this historical snapshot part of the direct timeline that leads to the table's current state?"

  * If the value is **`true`**, the snapshot is a direct ancestor.
  * If the value is **`false`**, the snapshot belongs to an abandoned branch of history, most often created after a table rollback.

The diagrams below illustrate how this works.

-----

### Scenario 1: Linear History

Initially, your table has three commits (snapshots). Each is a direct ancestor of the current version (`S3`), so `is_current_ancestor` is **true** for all of them.

```
(Main Timeline)
+-------------+      +-------------+      +-------------+
| Snapshot S1 |----->| Snapshot S2 |----->| Snapshot S3 |
| ancestor: T |      | ancestor: T |      | ancestor: T |
+-------------+      +-------------+      +-------------+
                                                    ^
                                                    |
                                                 (current)
```


### Scenario 2: After Rolling Back to S2

Now, you roll the table back to `S2`. The main timeline is now shorter, and `S2` is the new current version.

Snapshot `S3` still exists in the table's history, but it's now on an **abandoned branch**. Its `is_current_ancestor` flag flips to **false**.

```
(Main Timeline)                               (Abandoned Branch)
+-------------+      +-------------+                 +-------------+
| Snapshot S1 |----->| Snapshot S2 |                 | Snapshot S3 |
| ancestor: T |      | ancestor: T |                 | ancestor: F |
+-------------+      +-------------+                 +-------------+
                           ^
                           |
                        (current)

In [None]:
spark.sql("""
SELECT
    *
FROM
    data_on_eks.cat_locations_raw.history
""").show()

### Now that we understand how to view the metadata, let's enrich the raw data

**Be sure to replace the S3_BUCKET with your bucket id**


#### ETL Pipeline: Daily Cat Summary

This cell executes a Spark batch job that performs a daily ETL (Extract, Transform, Load) process to create an aggregated summary of cat activity.

*   **Extract**: Reads raw data from the `cat_wellness_raw`, `cat_interactions_raw`, and `cats_profiles_raw` Iceberg tables.
*   **Transform**: Aggregates metrics for each cat by day, calculating:
    *   Average activity level
    *   Maximum hours since last drink
    *   Total interaction count
    *   Total "like" interactions
*   **Load**: Joins the aggregated data to create a comprehensive daily summary, which is then saved to the `daily_cat_summary` Iceberg table for easier querying and analysis.

In [1]:
# Set necessary configurations

import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, avg, max, count, lit, sum, when

# ==============================================================================
#  1. Configuration
# ==============================================================================
# These should be configured to match your environment
S3_WAREHOUSE_PATH = os.getenv('S3_WAREHOUSE_PATH', 's3a://data-on-eks-spark-logs-20251007153910952700000001/iceberg-warehouse/') # replace the S3_BUCKET with your bucket id
ICEBERG_CATALOG_NAME = 'workshop'
GLUE_DATABASE_NAME = 'data_on_eks'



In [None]:
"""
A Spark batch job that reads from raw Iceberg tables, creates a summary for each cat
"""
print("Starting Spark session for Daily Cat Summary job...")
spark = (
    SparkSession.builder
    .appName("DailyCatSummaryETL")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config(f"spark.sql.catalog.{ICEBERG_CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog")
    .config(f"spark.sql.catalog.{ICEBERG_CATALOG_NAME}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config(f"spark.sql.catalog.{ICEBERG_CATALOG_NAME}.warehouse", S3_WAREHOUSE_PATH)
    .config(f"spark.sql.catalog.{ICEBERG_CATALOG_NAME}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .getOrCreate()
)
print("Spark session created.")

# Define the full path to our source and destination tables
wellness_table = f"{ICEBERG_CATALOG_NAME}.{GLUE_DATABASE_NAME}.cat_wellness_raw"
interactions_table = f"{ICEBERG_CATALOG_NAME}.{GLUE_DATABASE_NAME}.cat_interactions_raw"
profiles_table = f"{ICEBERG_CATALOG_NAME}.{GLUE_DATABASE_NAME}.cats_profiles_raw"
summary_table = f"{ICEBERG_CATALOG_NAME}.{GLUE_DATABASE_NAME}.daily_cat_summary"

try:
    # ==============================================================================
    #  2. EXTRACT: Read the raw source tables from Iceberg
    # ==============================================================================
    print(f"Reading from source tables: {wellness_table} and {interactions_table}")
    wellness_df = spark.table(wellness_table)
    interactions_df = spark.table(interactions_table)
    profiles_df = spark.table(profiles_table)

    # ==============================================================================
    #  3. TRANSFORM: Perform aggregations and joins
    # ==============================================================================
    print("Transforming data: Aggregating daily wellness and interaction metrics...")

    # --- Aggregate wellness data by day ---
    wellness_daily_df = (
        wellness_df
        .withColumn("day", to_date(col("event_time")))
        .groupBy("cat_id", "day")
        .agg(
            avg("activity_level").alias("avg_activity_level"),
            max("hours_since_last_drink").alias("max_hours_since_drink")
        )
    )

    # --- Aggregate interaction data by day ---
    interactions_daily_df = (
        interactions_df
        .withColumn("day", to_date(col("event_time")))
        .groupBy("cat_id", "day")
        .agg(
            count("*").alias("total_interaction_count"),
            sum(when(col("interaction_type") == "like", 1).otherwise(0)).alias("like_count")
        )
    )

    # --- Join the summaries together ---
    print("Joining daily summaries...")
    daily_summary_df = (
        wellness_daily_df
        .join(interactions_daily_df, ["cat_id", "day"], "full_outer")
        # Join with profiles to get the cat's name
        .join(profiles_df.select("cat_id", "name"), "cat_id", "left")
        .na.fill(0) # Fill nulls with 0 for counts/metrics where one side of the join had no data
        .select(
            "day",
            "cat_id",
            "name",
            "avg_activity_level",
            "max_hours_since_drink",
            "total_interaction_count",
            "like_count"
        )
    )

    daily_summary_df.show()

except Exception as e:
    print(f"An error occurred during the ETL job: {e}")
    raise
finally:
    print("\nBatch job complete. Stopping Spark session.")
    spark.stop()


### ETL Jobs

While running Spark code in a notebook is excellent for development and interactive analysis, you wouldn't typically run a scheduled production job this way.

In a real-world scenario, this ETL logic would be packaged and defined as a `SparkApplication` custom resource for Kubernetes. This allows the Spark on Kubernetes Operator to
manage the job's entire lifecycle, including scheduling, resource allocation, and retries, making it a robust and automated part of a data pipeline.

---

**Next Steps**

Great work! Now that the summary table has been created, please **return to your terminal** to proceed with the next module of the workshop.