# Apache Iceberg Hands-on Lab: Cafe Chain Data Management

Software Engineering, Shenkar - BDE Course L6

In this lab, you'll work with Apache Iceberg to manage data for GlobalCafe, a multinational coffee chain. You'll learn how to:
- Set up an Iceberg environment
- Create and manage tables
- Handle schema evolution
- Perform time travel queries
- Implement data maintenance tasks

## Prerequisites
- Docker and Docker Compose installed
- Basic understanding of SQL and PySpark
- Git installed

## Part 1: Environment Setup

First, let's set up our environment. Run these commands in your terminal:

In [1]:
# Run these commands in your terminal, not in this notebook
'''
git clone https://github.com/your-repo/cafe-iceberg-lab
cd cafe-iceberg-lab
docker-compose up -d
'''

'\ngit clone https://github.com/your-repo/cafe-iceberg-lab\ncd cafe-iceberg-lab\ndocker-compose up -d\n'

Now let's initialize our PySpark session with Iceberg support:

In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CafeIcebergLab") \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "/warehouse/cafe") \
    .getOrCreate()

ModuleNotFoundError: No module named 'pyspark'

## Part 2: Creating Tables

Let's create tables for our cafe chain data model:

In [8]:
# Create database
spark.sql("CREATE DATABASE IF NOT EXISTS cafe_ops")
spark.sql("USE cafe_ops")

# Create sales transactions table
spark.sql("""
CREATE TABLE IF NOT EXISTS sales_transactions (
    transaction_id BIGINT,
    store_id BIGINT,
    cashier_id BIGINT,
    transaction_timestamp TIMESTAMP,
    total_amount DECIMAL(10,2),
    payment_method STRING,
    items ARRAY<STRUCT<
        item_id: BIGINT,
        quantity: INT,
        unit_price: DECIMAL(10,2)
    >>
) USING iceberg
PARTITIONED BY (days(transaction_timestamp))
""")

# Create store locations table
spark.sql("""
CREATE TABLE IF NOT EXISTS store_locations (
    store_id BIGINT,
    store_name STRING,
    city STRING,
    country STRING,
    opening_date DATE,
    store_type STRING,
    square_footage INT
) USING iceberg
""")

# Create menu items table
spark.sql("""
CREATE TABLE IF NOT EXISTS menu_items (
    item_id BIGINT,
    item_name STRING,
    category STRING,
    price DECIMAL(10,2),
    is_seasonal BOOLEAN,
    available_from DATE,
    available_until DATE,
    calories INT
) USING iceberg
""")

NameError: name 'spark' is not defined

## Exercise 1: Insert Sample Data

Insert the following sample data into your tables. Use both SQL and DataFrame APIs.

In [8]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import date

# Define the schema explicitly
schema = StructType([
    StructField("store_id", LongType(), False),
    StructField("store_name", StringType(), False),
    StructField("city", StringType(), False),
    StructField("country", StringType(), False),
    StructField("opening_date", DateType(), False),
    StructField("store_type", StringType(), False),
    StructField("square_footage", IntegerType(), False)
])

# Convert string dates to Python date objects
stores_data = [
    (1, "Downtown Plaza", "New York", "USA", date(2020, 1, 15), "flagship", 2500),
    (2, "Airport Terminal 3", "London", "UK", date(2021, 3, 20), "kiosk", 800),
    (3, "Shopping Mall", "Tokyo", "Japan", date(2019, 11, 30), "standard", 1500)
]

# Create DataFrame with schema
stores_df = spark.createDataFrame(stores_data, schema=schema)

# Now append to the table
stores_df.writeTo("cafe_ops.store_locations").append()
stores_df.show()

+--------+------------------+--------+-------+------------+----------+--------------+
|store_id|        store_name|    city|country|opening_date|store_type|square_footage|
+--------+------------------+--------+-------+------------+----------+--------------+
|       1|    Downtown Plaza|New York|    USA|  2020-01-15|  flagship|          2500|
|       2|Airport Terminal 3|  London|     UK|  2021-03-20|     kiosk|           800|
|       3|     Shopping Mall|   Tokyo|  Japan|  2019-11-30|  standard|          1500|
+--------+------------------+--------+-------+------------+----------+--------------+



## Explore MinIO

http://localhost:9001/browser/warehouse/cafe_ops/

In [5]:
# TODO: Exercise 1.1
# Insert menu items using SQL
# Hint: Use spark.sql() with INSERT INTO statement

# Your code here

## Exercise 2: Schema Evolution

Add nutritional information to the menu items table and update existing records.

In [None]:
# TODO: Exercise 2.1
# Add new columns for nutritional info
# Update existing records with the new information

# Your code here

## Exercise 3: Time Travel Queries

Perform time travel queries to explore different versions of your data.

In [None]:
# TODO: Exercise 3.1
# 1. Query the current state of menu_items
# 2. Make some changes to the data
# 3. Query a previous version using timestamp
# 4. Query using snapshot ID

# Your code here

## Exercise 4: Data Maintenance

Implement data maintenance tasks including snapshot expiration and file compaction.

In [None]:
# TODO: Exercise 4.1
# 1. Expire old snapshots
# 2. Rewrite data files (compaction)

# Your code here

In [None]:
print("Available tables:")
spark.sql("SHOW TABLES FROM cafe_ops").show()

## Bonus Exercise: Data Quality and Analytics

Implement data quality checks and write analytical queries.

In [None]:
# TODO: Bonus 1
# Implement data quality checks:
# 1. Check for negative prices
# 2. Check for future dates
# 3. Validate store_ids

# Your code here

In [None]:
# TODO: Bonus 2
# Write analytics queries:
# 1. Top selling items by revenue
# 2. Sales patterns by store type
# 3. Seasonal item performance

# Your code here

## Submission Questions

Please answer the following questions:

1. How does Iceberg handle schema evolution differently from traditional databases?
2. What are the benefits of time travel in this cafe chain context?
3. How would you implement a data retention policy using Iceberg features?