# 🧊 Hotels Iceberg Population Playground

**Why**: Provide a reproducible Booking-style dataset for SQL/PySpark drills.  
**How**: Synthesize booking-like DataFrames, merge them into Iceberg tables, and keep the run idempotent.  
**Notes**: Safe to rerun; tables live in the `iceberg.hotels_practice` namespace.


## ⚙️ Environment

In [None]:
import os

MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "http://minio:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ROOT_USER", "minio")
MINIO_SECRET_KEY = os.getenv("MINIO_ROOT_PASSWORD", "minio123")
HIVE_METASTORE_URI = os.getenv("HIVE_METASTORE_URI", "thrift://hive-metastore:9083")
TRINO_URL = os.getenv("TRINO_URL", "http://trino:8080")
SPARK_MASTER = os.getenv("SPARK_MASTER_URL", "spark://spark-master:7077")
S3_ENDPOINT = os.getenv("S3_ENDPOINT", "minio:9000")

os.environ.setdefault("AWS_REGION", "us-east-1")
os.environ.setdefault("AWS_DEFAULT_REGION", os.environ["AWS_REGION"])

print("MinIO:", MINIO_ENDPOINT)
print("Hive metastore:", HIVE_METASTORE_URI)
print("Spark master:", SPARK_MASTER)
print("Trino:", TRINO_URL)


In [None]:
from pyspark.sql import SparkSession

packages = [
    "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.2",
    "org.apache.hadoop:hadoop-aws:3.3.4",
    "software.amazon.awssdk:bundle:2.20.158"
]

print("🚀 Creating Spark session with Iceberg support …")

spark = (
    SparkSession.builder
    .appName("HotelsIcebergPopulation")
    .master(SPARK_MASTER)
    .config("spark.jars.packages", ",".join(packages))
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
    .config("spark.sql.catalog.spark_catalog.type", "hive")
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.iceberg.type", "rest")
    .config("spark.sql.catalog.iceberg.uri", "http://hive-metastore:9001/iceberg")
    .config("spark.sql.catalog.iceberg.warehouse", "s3a://iceberg/warehouse")
    .config("spark.sql.catalog.iceberg.s3.endpoint", f"http://{S3_ENDPOINT}")
    .config("spark.sql.catalog.iceberg.s3.access-key-id", MINIO_ACCESS_KEY)
    .config("spark.sql.catalog.iceberg.s3.secret-access-key", MINIO_SECRET_KEY)
    .config("spark.sql.catalog.iceberg.s3.region", os.environ["AWS_REGION"])
    .config("spark.sql.catalog.iceberg.s3.path-style-access", "true")
    .config("spark.sql.catalog.iceberg.s3.connection-ssl-enabled", "false")
    .config("spark.sql.catalog.iceberg.s3.sse.type", "none")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.hadoop.hive.metastore.uris", HIVE_METASTORE_URI)
    .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT)
    .config("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY)
    .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY)
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    .config("spark.sql.defaultCatalog", "iceberg")
    .enableHiveSupport()
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")
spark.conf.set("spark.sql.session.timeZone", "UTC")

print("✅ Spark session ready (Iceberg REST catalog)")
print("   Spark version:", spark.version)


## 📦 Configuration

Reproducible volumes and paths for the synthetic contract.

In [None]:
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict
import random

@dataclass
class HotelsPracticeConfig:
    random_seed: int = 7
    input_dir: str = os.getenv("BOOKING_INPUT_DIR", "/home/jovyan/work/booking_inputs")
    catalog: str = "iceberg"
    namespace: str = os.getenv("HOTELS_ICEBERG_NAMESPACE", "hotels_practice")
    volumes: Dict[str, int] = field(default_factory=lambda: {
        "hotels": 200,
        "users": 2000,
        "bookings": 8000,
        "reviews": 6000,
        "images": 2500,
    })

cfg = HotelsPracticeConfig()
input_dir = Path(cfg.input_dir)
input_dir.mkdir(parents=True, exist_ok=True)

rng = random.Random(cfg.random_seed)

print(cfg)
print(f"Input directory: {input_dir}")


In [None]:
import json
import uuid
from datetime import date, datetime, timedelta
from typing import Optional

from pyspark.sql import functions as F, types as T


def random_date(rng, start: date, end: date) -> date:
    return start + timedelta(days=rng.randint(0, (end - start).days))


def random_timestamp(rng, start: datetime, end: datetime) -> datetime:
    total_seconds = int((end - start).total_seconds())
    return start + timedelta(seconds=rng.randint(0, total_seconds))


def deterministic_uuid(prefix: str, index: int) -> str:
    return str(uuid.uuid5(uuid.NAMESPACE_URL, f"{prefix}-{index}"))


today = datetime.utcnow().date()
current_ts = datetime.utcnow()


In [None]:
country_cities = {
    "US": ["New York", "San Francisco", "Chicago", "Austin", "Miami"],
    "CA": ["Toronto", "Vancouver", "Montreal", "Calgary", "Ottawa"],
    "GB": ["London", "Manchester", "Edinburgh", "Bristol", "Bath"],
    "FR": ["Paris", "Lyon", "Nice", "Bordeaux", "Lille"],
    "NL": ["Amsterdam", "Rotterdam", "Utrecht", "The Hague", "Eindhoven"],
    "DE": ["Berlin", "Munich", "Frankfurt", "Hamburg", "Cologne"],
    "ES": ["Barcelona", "Madrid", "Seville", "Valencia", "Bilbao"],
    "IT": ["Rome", "Florence", "Milan", "Venice", "Bologna"],
    "IL": ["Tel Aviv", "Jerusalem", "Haifa", "Eilat", "Herzliya"],
    "PT": ["Lisbon", "Porto", "Faro", "Braga", "Coimbra"],
    "SE": ["Stockholm", "Gothenburg", "Malmo", "Uppsala", "Visby"],
    "JP": ["Tokyo", "Kyoto", "Osaka", "Sapporo", "Fukuoka"],
    "AU": ["Sydney", "Melbourne", "Perth", "Brisbane", "Adelaide"],
    "BR": ["Rio", "Sao Paulo", "Brasilia", "Salvador", "Recife"],
    "AE": ["Dubai", "Abu Dhabi", "Sharjah", "Al Ain", "Ras Al Khaimah"],
    "ZA": ["Cape Town", "Johannesburg", "Durban", "Pretoria", "Bloemfontein"],
}

chains = ["StayPro", "BoutiqueX", "UrbanNest", "Skyline", "Coastline", "Heritage", "Aurora"]
adjectives = ["Grand", "Crystal", "Sunset", "Aurora", "Vertex", "Harbor", "Atlas", "Zenith", "Summit", "Velvet"]
nouns = ["Resort", "Suites", "Inn", "Lodge", "Retreat", "Villa", "Boutique", "Haven", "Plaza", "Terrace"]
room_rates = {}

hotels_records = []
for idx in range(cfg.volumes["hotels"]):
    country = rng.choice(list(country_cities))
    city = rng.choice(country_cities[country])
    stars = rng.choices([1, 2, 3, 4, 5], weights=[0.05, 0.15, 0.35, 0.3, 0.15])[0]
    num_rooms = rng.randint(35, 420)
    chain = rng.choice(chains) if rng.random() < 0.4 else None
    hotel_id = f"H{100000 + idx}"
    room_rates[hotel_id] = rng.randint(80, 420)
    hotel_name = f"{rng.choice(adjectives)} {rng.choice(nouns)} {city}"
    hotels_records.append({
        "hotel_id": hotel_id,
        "hotel_name": hotel_name,
        "country": country,
        "city": city,
        "stars": stars,
        "num_rooms": num_rooms,
        "chain": chain,
    })

hotel_schema = T.StructType([
    T.StructField("hotel_id", T.StringType(), False),
    T.StructField("hotel_name", T.StringType(), False),
    T.StructField("country", T.StringType(), False),
    T.StructField("city", T.StringType(), False),
    T.StructField("stars", T.IntegerType(), False),
    T.StructField("num_rooms", T.IntegerType(), False),
    T.StructField("chain", T.StringType(), True),
])

hotels_df = spark.createDataFrame(hotels_records, schema=hotel_schema)
print(f"Hotels: {hotels_df.count()}")
hotels_df.orderBy("hotel_id").show(5, truncate=False)


In [None]:
users_records = []
user_schema = T.StructType([
    T.StructField("user_id", T.StringType(), False),
    T.StructField("home_country", T.StringType(), False),
    T.StructField("signup_date", T.DateType(), False),
    T.StructField("age", T.IntegerType(), False),
])

user_ids = []
for idx in range(cfg.volumes["users"]):
    user_id = f"U{100000 + idx}"
    user_ids.append(user_id)
    signup_date = random_date(rng, date(2015, 1, 1), date(2024, 1, 1))
    users_records.append({
        "user_id": user_id,
        "home_country": rng.choice(list(country_cities)),
        "signup_date": signup_date,
        "age": rng.randint(18, 78),
    })

users_df = spark.createDataFrame(users_records, schema=user_schema)
print(f"Users: {users_df.count()}")
users_df.orderBy("user_id").show(5, truncate=False)


In [None]:
booking_schema = T.StructType([
    T.StructField("booking_id", T.StringType(), False),
    T.StructField("user_id", T.StringType(), False),
    T.StructField("hotel_id", T.StringType(), False),
    T.StructField("checkin_date", T.DateType(), False),
    T.StructField("checkout_date", T.DateType(), False),
    T.StructField("nights", T.IntegerType(), False),
    T.StructField("price_usd", T.DoubleType(), False),
    T.StructField("status", T.StringType(), False),
])

booking_records = []
hotel_ids = [row["hotel_id"] for row in hotels_records]
status_weights = [("completed", 0.78), ("cancelled", 0.14), ("no_show", 0.08)]

recent_booking_start = today - timedelta(days=540)
focus_booking_start = today - timedelta(days=180)
primary_end = today - timedelta(days=1)

for idx in range(cfg.volumes["bookings"]):
    user_id = rng.choice(user_ids)
    hotel_id = rng.choice(hotel_ids)
    if rng.random() < 0.9:
        window_start = focus_booking_start
        window_end = primary_end
    else:
        window_start = recent_booking_start
        window_end = max(recent_booking_start, focus_booking_start - timedelta(days=1))
    if window_start > window_end:
        window_start, window_end = recent_booking_start, primary_end
    checkin = random_date(rng, window_start, window_end)
    max_nights = max(1, min(14, (today - checkin).days))
    nights = rng.randint(1, max_nights)
    checkout = checkin + timedelta(days=nights)
    status = rng.choices([s for s, _ in status_weights], weights=[w for _, w in status_weights])[0]
    base_price = room_rates[hotel_id] * nights * rng.uniform(0.9, 1.3)
    if status == "completed":
        price = round(base_price, 2)
    elif status == "cancelled":
        price = round(base_price * rng.uniform(0.1, 0.4), 2)
    else:
        price = round(base_price * rng.uniform(0.0, 0.2), 2)
    booking_records.append({
        "booking_id": deterministic_uuid("booking", idx),
        "user_id": user_id,
        "hotel_id": hotel_id,
        "checkin_date": checkin,
        "checkout_date": checkout,
        "nights": nights,
        "price_usd": float(price),
        "status": status,
    })

bookings_df = spark.createDataFrame(booking_records, schema=booking_schema)
print(f"Bookings: {bookings_df.count()}")
bookings_df.groupBy("status").count().show()


In [None]:
review_schema = T.StructType([
    T.StructField("review_id", T.StringType(), False),
    T.StructField("user_id", T.StringType(), False),
    T.StructField("hotel_id", T.StringType(), False),
    T.StructField("rating", T.IntegerType(), False),
    T.StructField("created_at", T.TimestampType(), False),
    T.StructField("review_text", T.StringType(), True),
    T.StructField("review_metadata", T.StringType(), True),
    T.StructField("lang", T.StringType(), True),
])

hotel_lookup = {record["hotel_id"]: record for record in hotels_records}

rating_templates = {
    5: [
        {"text": "Exceptional stay at {hotel_name} in {city}; staff anticipated every need.", "lang": "en"},
        {"text": "Worth the {price_per_night:.0f} USD nightly rate at {hotel_name}; pure comfort.", "lang": "en"},
        {"text": "Servicio excelente en {hotel_name} ({city}), volveremos pronto.", "lang": "es"},
        {"text": "Esperienza eccellente al {hotel_name} di {city}; ritorneremo certamente.", "lang": "it"},
        {"text": "חוויה מושלמת ב-{hotel_name} שב-{city}, הכל היה מדויק.", "lang": "he"},
    ],
    4: [
        {"text": "{hotel_name} in {city} delivered a smooth check-in and quiet room.", "lang": "en"},
        {"text": "Bon rapport qualité/prix à {hotel_name}; équipe attentionnée.", "lang": "fr"},
        {"text": "Rooftop bar at {hotel_name} made the {nights} nights memorable.", "lang": "en"},
        {"text": "Camere spaziose e personale cordiale al {hotel_name} di {city}.", "lang": "it"},
    ],
    3: [
        {"text": "Stay at {hotel_name} in {city} was decent, though amenities felt basic.", "lang": "en"},
        {"text": "Habitación correcta en {hotel_name}, pero poco encanto.", "lang": "es"},
        {"text": "{hotel_name} bietet solide Lage in {city}, aber Service wirkt routiniert.", "lang": "de"},
        {"text": "Camera nella media al {hotel_name}; niente di speciale.", "lang": "it"},
    ],
    2: [
        {"text": "{hotel_name} in {city} struggled with slow check-in and thin walls.", "lang": "en"},
        {"text": "Peu de pression d'eau et chauffage capricieux à {hotel_name}.", "lang": "fr"},
        {"text": "Servicio frío en {hotel_name}; necesitamos mejorar la limpieza.", "lang": "es"},
        {"text": "Personale distratto al {hotel_name}; esperienza deludente.", "lang": "it"},
    ],
    1: [
        {"text": "Worst check-in at {hotel_name}; {nights} nights felt endless.", "lang": "en"},
        {"text": "Nos fuimos antes: {hotel_name} en {city} fue un desastre.", "lang": "es"},
        {"text": "לעולם לא נחזור ל-{hotel_name}; רעש בלתי נסבל.", "lang": "he"},
        {"text": "Esperienza pessima al {hotel_name}; siamo andati via subito.", "lang": "it"},
    ],
}

def build_review_snippet(rating: int, booking: dict):
    templates = rating_templates.get(rating)
    if not templates:
        return None, None
    hotel = hotel_lookup.get(booking["hotel_id"], {})
    context = {
        "hotel_name": hotel.get("hotel_name", "the hotel"),
        "city": hotel.get("city", "the city"),
        "chain": hotel.get("chain") or "independent",
        "nights": booking["nights"],
        "price_per_night": booking["price_usd"] / booking["nights"] if booking["nights"] else booking["price_usd"],
    }
    template = rng.choice(templates)
    text = template["text"].format(**context)
    if rating in followup_phrases and rng.random() < 0.4:
        text = f"{text} {rng.choice(followup_phrases[rating])}"
    return text, template["lang"]

followup_phrases = {
    5: [
        "Breakfast buffet was a highlight.",
        "Spa booking was effortless.",
        "Loved the skyline view from the suite.",
    ],
    4: [
        "Would happily stay again once the gym reopens.",
        "Concierge solved transport within minutes.",
    ],
    3: [
        "Staff tried their best, yet small details were missed.",
        "Good for a short business hop.",
    ],
    2: [
        "Maintenance team eventually helped, but the wait was long.",
        "Noise from hallway kept us up.",
    ],
    1: [
        "Requested refund for the final night.",
        "Left after the first evening despite plans.",
    ],
}

review_tags_vocab = ["clean", "view", "staff", "breakfast", "location", "spa", "business", "family", "quiet", "food", "design", "access"]
review_sources = ["mobile_app", "desktop_web", "partner_site"]
review_devices = ["ios", "android", "web"]
stay_purposes = ["business", "leisure", "family", "event"]

completed_bookings = [rec for rec in booking_records if rec["status"] == "completed"]
num_reviews = min(cfg.volumes["reviews"], len(completed_bookings))
rng.shuffle(completed_bookings)
review_candidates = completed_bookings[:num_reviews]

recent_review_floor = datetime.combine(today - timedelta(days=540), datetime.min.time())
user_last_review_ts = {}
review_records = []
for idx, booking in enumerate(review_candidates):
    rating = rng.choices([1, 2, 3, 4, 5], weights=[0.04, 0.08, 0.22, 0.36, 0.30])[0]
    base_datetime = (
        datetime.combine(booking["checkout_date"], datetime.min.time())
        + timedelta(hours=rng.randint(7, 22), minutes=rng.randint(0, 59))
    )
    if rng.random() < 0.3 and booking["user_id"] in user_last_review_ts:
        created_at = user_last_review_ts[booking["user_id"]] + timedelta(minutes=rng.randint(5, 25))
    else:
        created_at = base_datetime + timedelta(minutes=rng.randint(0, 240))
    prev_ts = user_last_review_ts.get(booking["user_id"])
    if prev_ts and created_at <= prev_ts:
        created_at = prev_ts + timedelta(minutes=rng.randint(10, 240))
    if created_at > current_ts:
        created_at = current_ts - timedelta(minutes=rng.randint(5, 240))
    if created_at < recent_review_floor:
        created_at = recent_review_floor + timedelta(days=rng.randint(0, 30), minutes=rng.randint(0, 120))
    if rng.random() < 0.1:
        review_text, lang = None, None
    else:
        review_text, lang = build_review_snippet(rating, booking)
    sentiment = "positive" if rating >= 4 else "negative" if rating <= 2 else "neutral"
    tags = rng.sample(review_tags_vocab, k=rng.randint(1, min(3, len(review_tags_vocab))))
    metadata = json.dumps({
        "tags": tags,
        "source": rng.choice(review_sources),
        "device": rng.choice(review_devices),
        "stay_purpose": rng.choice(stay_purposes),
        "sentiment": sentiment,
    })
    review_records.append({
        "review_id": deterministic_uuid("review", idx),
        "user_id": booking["user_id"],
        "hotel_id": booking["hotel_id"],
        "rating": int(rating),
        "created_at": created_at,
        "review_text": review_text,
        "review_metadata": metadata,
        "lang": lang,
    })
    user_last_review_ts[booking["user_id"]] = created_at

reviews_df = spark.createDataFrame(review_records, schema=review_schema)
print(f"Reviews: {reviews_df.count()}")
reviews_df.groupBy("rating").count().orderBy("rating").show()


In [None]:
image_schema = T.StructType([
    T.StructField("image_id", T.StringType(), False),
    T.StructField("hotel_id", T.StringType(), False),
    T.StructField("url", T.StringType(), False),
    T.StructField("width", T.IntegerType(), False),
    T.StructField("height", T.IntegerType(), False),
    T.StructField("aspect_ratio", T.DoubleType(), False),
    T.StructField("tag", T.StringType(), False),
    T.StructField("quality_score", T.DoubleType(), False),
    T.StructField("created_at", T.TimestampType(), False),
])

image_tags = ["room", "lobby", "pool", "restaurant", "gym", "spa", "exterior", "suite", "bar", "conference"]
width_choices = [800, 1024, 1280, 1600, 1920, 2560]
height_choices = [600, 720, 900, 1080, 1440]

image_records = []
image_counter = 0
image_window_start = current_ts - timedelta(days=365)
image_recent_threshold = current_ts - timedelta(days=90)
image_end = current_ts

for hotel in hotels_records:
    base_images = rng.randint(6, 10)
    for idx in range(base_images):
        width = rng.choice(width_choices)
        height = rng.choice(height_choices)
        aspect_ratio = round(width / height, 4)
        created_at = random_timestamp(rng, image_window_start, image_end)
        quality = rng.uniform(0.6, 0.97)
        if created_at >= image_recent_threshold:
            quality = max(quality, rng.uniform(0.75, 0.98))
        image_records.append({
            "image_id": deterministic_uuid("image", image_counter),
            "hotel_id": hotel["hotel_id"],
            "url": f"https://cdn.practice.example/hotels/{hotel['hotel_id']}/img_{idx:03d}.jpg",
            "width": width,
            "height": height,
            "aspect_ratio": float(aspect_ratio),
            "tag": rng.choice(image_tags),
            "quality_score": round(quality, 3),
            "created_at": created_at,
        })
        image_counter += 1
    recent_min = rng.randint(4, 6)
    for r_idx in range(recent_min):
        width = rng.choice(width_choices)
        height = rng.choice(height_choices)
        aspect_ratio = round(width / height, 4)
        created_at = random_timestamp(rng, image_recent_threshold, image_end)
        quality = rng.uniform(0.78, 0.98)
        image_records.append({
            "image_id": deterministic_uuid("image", image_counter),
            "hotel_id": hotel["hotel_id"],
            "url": f"https://cdn.practice.example/hotels/{hotel['hotel_id']}/recent_{r_idx:03d}.jpg",
            "width": width,
            "height": height,
            "aspect_ratio": float(aspect_ratio),
            "tag": rng.choice(image_tags),
            "quality_score": round(quality, 3),
            "created_at": created_at,
        })
        image_counter += 1

images_df = spark.createDataFrame(image_records, schema=image_schema)
print(f"Images: {images_df.count()}")
images_df.groupBy("tag").count().orderBy("count", ascending=False).show(5)


In [None]:
mismatch = bookings_df.filter(F.datediff("checkout_date", "checkin_date") != F.col("nights"))
assert mismatch.count() == 0, "Night count mismatch detected"
print("✅ Nights column validated")


## 🧊 Iceberg Population

Merge the freshly generated DataFrames directly into Iceberg tables and inspect the row counts before cleanup.


In [None]:
staging_views = {
    "stg_hotels": hotels_df,
    "stg_users": users_df,
    "stg_bookings": bookings_df,
    "stg_reviews": reviews_df,
    "stg_images": images_df,
}

for view_name, dataframe in staging_views.items():
    dataframe.createOrReplaceTempView(view_name)

print("Staging counts:")
for name, df in staging_views.items():
    print(f" - {name.replace('stg_', '')}: {df.count()}")


In [None]:
namespace = f"{cfg.catalog}.{cfg.namespace}"

spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {namespace}")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.hotels (
    hotel_id STRING,
    hotel_name STRING,
    country STRING,
    city STRING,
    stars INT,
    num_rooms INT,
    chain STRING
)
USING ICEBERG
PARTITIONED BY (country)
TBLPROPERTIES ('format-version'='2')
""")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.users (
    user_id STRING,
    home_country STRING,
    signup_date DATE,
    age INT
)
USING ICEBERG
PARTITIONED BY (home_country)
TBLPROPERTIES ('format-version'='2')
""")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.bookings (
    booking_id STRING,
    user_id STRING,
    hotel_id STRING,
    checkin_date DATE,
    checkout_date DATE,
    nights INT,
    price_usd DOUBLE,
    status STRING
)
USING ICEBERG
PARTITIONED BY (months(checkin_date), bucket(16, hotel_id))
TBLPROPERTIES ('format-version'='2')
""")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.reviews (
    review_id STRING,
    user_id STRING,
    hotel_id STRING,
    rating INT,
    created_at TIMESTAMP,
    review_text STRING,
    review_metadata STRING,
    lang STRING
)
USING ICEBERG
PARTITIONED BY (months(created_at))
TBLPROPERTIES ('format-version'='2')
""")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.images (
    image_id STRING,
    hotel_id STRING,
    url STRING,
    width INT,
    height INT,
    aspect_ratio DOUBLE,
    tag STRING,
    quality_score DOUBLE,
    created_at TIMESTAMP
)
USING ICEBERG
PARTITIONED BY (bucket(32, hotel_id))
TBLPROPERTIES ('format-version'='2')
""")

print(f"✅ Tables ensured in {namespace}")


In [None]:
spark.sql(f"""
MERGE INTO {namespace}.hotels AS target
USING stg_hotels AS source
ON target.hotel_id = source.hotel_id
WHEN MATCHED THEN UPDATE SET
  hotel_name = source.hotel_name,
  country = source.country,
  city = source.city,
  stars = source.stars,
  num_rooms = source.num_rooms,
  chain = source.chain
WHEN NOT MATCHED THEN INSERT (hotel_id, hotel_name, country, city, stars, num_rooms, chain)
VALUES (source.hotel_id, source.hotel_name, source.country, source.city, source.stars, source.num_rooms, source.chain)
""")

spark.sql(f"""
MERGE INTO {namespace}.users AS target
USING stg_users AS source
ON target.user_id = source.user_id
WHEN MATCHED THEN UPDATE SET
  home_country = source.home_country,
  signup_date = source.signup_date,
  age = source.age
WHEN NOT MATCHED THEN INSERT (user_id, home_country, signup_date, age)
VALUES (source.user_id, source.home_country, source.signup_date, source.age)
""")

spark.sql(f"""
MERGE INTO {namespace}.bookings AS target
USING stg_bookings AS source
ON target.booking_id = source.booking_id
WHEN MATCHED THEN UPDATE SET
  user_id = source.user_id,
  hotel_id = source.hotel_id,
  checkin_date = source.checkin_date,
  checkout_date = source.checkout_date,
  nights = source.nights,
  price_usd = source.price_usd,
  status = source.status
WHEN NOT MATCHED THEN INSERT (booking_id, user_id, hotel_id, checkin_date, checkout_date, nights, price_usd, status)
VALUES (source.booking_id, source.user_id, source.hotel_id, source.checkin_date, source.checkout_date, source.nights, source.price_usd, source.status)
""")

spark.sql(f"""
MERGE INTO {namespace}.reviews AS target
USING stg_reviews AS source
ON target.review_id = source.review_id
WHEN MATCHED THEN UPDATE SET
  user_id = source.user_id,
  hotel_id = source.hotel_id,
  rating = source.rating,
  created_at = source.created_at,
  review_text = source.review_text,
  review_metadata = source.review_metadata,
  lang = source.lang
WHEN NOT MATCHED THEN INSERT (review_id, user_id, hotel_id, rating, created_at, review_text, review_metadata, lang)
VALUES (source.review_id, source.user_id, source.hotel_id, source.rating, source.created_at, source.review_text, source.review_metadata, source.lang)
""")

spark.sql(f"""
MERGE INTO {namespace}.images AS target
USING stg_images AS source
ON target.image_id = source.image_id
WHEN MATCHED THEN UPDATE SET
  hotel_id = source.hotel_id,
  url = source.url,
  width = source.width,
  height = source.height,
  aspect_ratio = source.aspect_ratio,
  tag = source.tag,
  quality_score = source.quality_score,
  created_at = source.created_at
WHEN NOT MATCHED THEN INSERT (image_id, hotel_id, url, width, height, aspect_ratio, tag, quality_score, created_at)
VALUES (source.image_id, source.hotel_id, source.url, source.width, source.height, source.aspect_ratio, source.tag, source.quality_score, source.created_at)
""")

print("✅ Iceberg tables merged")


In [None]:
print("Row counts in Iceberg namespace:")
for table in ["hotels", "users", "bookings", "reviews", "images"]:
    result = spark.sql(f"SELECT COUNT(*) AS rows FROM {namespace}.{table}")
    count = result.collect()[0]["rows"]
    print(f" - {table}: {count}")

spark.sql(f"SELECT status, COUNT(*) AS cnt FROM {namespace}.bookings GROUP BY status ORDER BY status").show()


In [None]:
print("🧹 Cleaning up Iceberg namespace...")
print("   (Comment out this cell if you want to keep the generated tables.)")
tables = ['hotels', 'users', 'bookings', 'reviews', 'images']
for table in tables:
    spark.sql(f"DROP TABLE IF EXISTS {namespace}.{table}")
spark.sql(f"DROP NAMESPACE IF EXISTS {namespace}")
print("✅ Cleanup complete.")
