# TEEHR Integration with Iceberg Data Warehouse

This notebook demonstrates how to use TEEHR with the Iceberg-based data warehouse for large-scale hydrologic evaluation following the TEEHR coding guidelines.

## Overview
- Connect to deployed Iceberg REST catalog
- Configure PySpark for Iceberg operations
- Generate sample hydrologic time series data
- Insert and query data using PySpark SQL
- Demonstrate time series analysis patterns

## Import Required Libraries

Import all necessary libraries for TEEHR Iceberg operations including PySpark, PyIceberg, and data manipulation tools.

In [1]:
import sys
import json
import subprocess
from pathlib import Path
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, Tuple
import pandas as pd
import numpy as np

# Add project root to path
project_root = Path().resolve().parent if Path().name == 'examples' else Path().resolve()
sys.path.append(str(project_root))

print(f"Project root: {project_root}")

Project root: /Users/mdenno/repos/teehr-eval-sys/examples


In [2]:
!export AWS_PROFILE=ciroh_mdenno

In [3]:
# Import Iceberg dependencies
try:
    from pyiceberg.catalog import load_catalog
    from pyiceberg.schema import Schema
    from pyiceberg.types import (
        NestedField, StringType, DoubleType, TimestampType
    )
    import pyarrow as pa
    print("✅ PyIceberg and PyArrow available")
except ImportError as e:
    print(f"❌ Missing dependencies: {e}")
    print("Install with: pip install pyiceberg pyarrow")

✅ PyIceberg and PyArrow available


In [4]:
# Import PySpark for production workflows
try:
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, avg, count, stddev, min as spark_min, max as spark_max
    SPARK_AVAILABLE = True
    print("✅ PySpark available")
except ImportError:
    print("⚠️ PySpark not available. Install with: pip install pyspark==4.0.0")
    SPARK_AVAILABLE = False

✅ PySpark available


In [5]:
# Import TEEHR when available
try:
    import teehr
    TEEHR_AVAILABLE = True
    print("✅ TEEHR available")
except ImportError:
    print("⚠️ TEEHR not installed. Install with: pip install teehr")
    TEEHR_AVAILABLE = False

⚠️ TEEHR not installed. Install with: pip install teehr


## Configuration Functions

Functions to get catalog configuration from Terraform outputs and set up the Iceberg catalog connection.

In [6]:
def get_terraform_outputs() -> Tuple[str, str]:
    """Get catalog configuration from terraform outputs."""
    try:
        result = subprocess.run(
            ["terraform", "output", "-json"],
            cwd=project_root / "infrastructure" / "environments" / "dev",
            capture_output=True,
            text=True,
            check=True
        )
        outputs = json.loads(result.stdout)
        
        catalog_uri = outputs["catalog_endpoint"]["value"]
        warehouse_bucket = outputs["warehouse_bucket_name"]["value"]
        warehouse_location = f"s3://{warehouse_bucket}/warehouse/"
        
        print(f"📡 Catalog URI: {catalog_uri}")
        print(f"🪣 Warehouse: {warehouse_location}")
        
        return catalog_uri, warehouse_location
        
    except Exception as e:
        print(f"⚠️ Could not get terraform outputs: {e}")
        # Fallback to your deployed endpoints
        catalog_uri = "http://dev-teehr-sys-iceberg-alb-2105268770.us-east-2.elb.amazonaws.com"
        warehouse_location = "s3://dev-teehr-sys-iceberg-warehouse/warehouse/"
        return catalog_uri, warehouse_location

In [7]:
def setup_iceberg_catalog():
    """Configure Iceberg catalog connection following TEEHR patterns."""
    catalog_uri, warehouse_location = get_terraform_outputs()
    
    # Standard TEEHR Iceberg catalog configuration
    catalog_config = {
        'uri': catalog_uri,
        'credential': 'default',
        'warehouse': warehouse_location
    }
    
    try:
        catalog = load_catalog("rest", **catalog_config)
        
        # Verify connection
        namespaces = catalog.list_namespaces()
        print(f"✅ Connected to Iceberg catalog. Namespaces: {namespaces}")
        
        return catalog, warehouse_location
        
    except Exception as e:
        print(f"❌ Failed to connect to Iceberg catalog: {e}")
        raise

## PySpark Configuration for Iceberg

Create a PySpark session configured for TEEHR Iceberg evaluation with proper Iceberg extensions and AWS S3 support.

In [8]:
def create_spark_session_for_iceberg(catalog_uri: str, warehouse_location: str, catalog_name: str) -> SparkSession:
    """
    Create Spark session configured for TEEHR Iceberg evaluation.
    Follows TEEHR coding guidelines for PySpark 4.0.0 + Iceberg 1.6.0.
    """
    if not SPARK_AVAILABLE:
        raise ImportError("PySpark not available. Install with: pip install pyspark==4.0.0")
    
    try:
        # Clean up any existing sessions
        if SparkSession._instantiatedSession is not None:
            SparkSession._instantiatedSession.stop()
            SparkSession._instantiatedSession = None
        
        # TEEHR standard Spark configuration for Iceberg
        spark = SparkSession.builder \
            .appName("teehr-iceberg-evaluation") \
            .master("local[*]") \
            .config("spark.driver.host", "localhost") \
            .config("spark.driver.bindAddress", "127.0.0.1") \
            .config("spark.network.timeout", "800s") \
            .config("spark.executor.heartbeatInterval", "60s") \
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
            .config("spark.sql.adaptive.enabled", "true") \
            .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
            .config("spark.jars.packages", 
                    "org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.0,"
                    "org.apache.hadoop:hadoop-aws:3.4.0,"
                    "com.amazonaws:aws-java-sdk-bundle:1.12.772") \
            .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
            .config(f"spark.sql.catalog.{catalog_name}.type", "rest") \
            .config(f"spark.sql.catalog.{catalog_name}.uri", catalog_uri) \
            .config(f"spark.sql.catalog.{catalog_name}.warehouse", warehouse_location) \
            .config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
            .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
            .config("spark.hadoop.fs.s3a.aws.credentials.provider", 
                "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
            .getOrCreate()
        
        print(f"✅ Created Spark {spark.version} session with Iceberg support")
        return spark
        
    except Exception as e:
        print(f"❌ Failed to create Spark session: {e}")
        raise

## Sample Data Generation

Generate sample hydrologic time series data that mimics real USGS gage observations and NWM model simulations.

In [9]:
def generate_sample_data(num_locations: int = 5, num_days: int = 30) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Generate sample hydrologic time series data for demonstration.
    
    Returns:
        Tuple of (observed_data, simulated_data) DataFrames
    """
    np.random.seed(42)
    
    # Generate location IDs (USGS gage format)
    locations = [f"USGS-0{8000000 + i:07d}" for i in range(num_locations)]
    
    # Generate time series
    start_date = datetime(2023, 1, 1)
    dates = [start_date + timedelta(days=i) for i in range(num_days)]
    
    observed_data = []
    simulated_data = []
    
    for location in locations:
        # Base flow with seasonal variation
        base_flow = np.random.uniform(10, 100)
        seasonal_factor = 1 + 0.3 * np.sin(np.linspace(0, 2*np.pi, num_days))
        noise = np.random.normal(0, 0.1, num_days)
        
        observed_values = base_flow * seasonal_factor * (1 + noise)
        # Simulated has some bias and different noise
        simulated_values = observed_values * np.random.uniform(0.9, 1.1) + np.random.normal(0, 2, num_days)
        
        for i, date in enumerate(dates):
            observed_data.append({
                'location_id': location,
                'timestamp': date,
                'value': max(0, observed_values[i]),  # No negative flows
                'variable_name': 'streamflow_daily_mean',
                'configuration': 'observed',
                'measurement_unit': 'cfs',
                'reference_time': date
            })
            
            simulated_data.append({
                'location_id': location,
                'timestamp': date,
                'value': max(0, simulated_values[i]),  # No negative flows
                'variable_name': 'streamflow_daily_mean',
                'configuration': 'nwm_retrospective',
                'measurement_unit': 'cfs',
                'reference_time': date
            })
    
    return pd.DataFrame(observed_data), pd.DataFrame(simulated_data)

## Data Operations Functions

Functions for inserting and querying data from Iceberg tables using PySpark.

In [10]:
def insert_data_with_pyspark(spark: SparkSession, df: pd.DataFrame, catalog_name: str, table_name: str):
    """Insert data into Iceberg table using PySpark."""
    try:
        # Convert pandas DataFrame to Spark DataFrame
        spark_df = spark.createDataFrame(df)
        
        print(f"📝 Inserting {len(df)} records into {table_name}")
        print(f"   Data shape: {df.shape}")
        print(f"   Columns: {list(df.columns)}")
        
        # Write to Iceberg table using Spark SQL
        spark_df.createOrReplaceTempView("temp_data")
        
        insert_sql = f"""
        INSERT INTO {catalog_name}.{table_name}
        SELECT * FROM temp_data
        """
        
        spark.sql(insert_sql)
        print(f"✅ Successfully inserted data into {table_name}")
        
        # Verify insertion
        count_sql = f"SELECT COUNT(*) as record_count FROM {catalog_name}.{table_name}"
        result = spark.sql(count_sql).collect()
        total_records = result[0]['record_count']
        print(f"   Total records in table: {total_records}")
        
    except Exception as e:
        print(f"❌ Failed to insert data into {table_name}: {e}")
        raise

In [11]:
def query_data_with_pyspark(spark: SparkSession, catalog_name: str, table_name: str, location_id: str = None):
    """Query data from Iceberg table using PySpark SQL."""
    try:
        print(f"🔍 Querying data from {table_name}")
        
        # Basic query with optional location filter
        if location_id:
            query_sql = f"""
            SELECT location_id, configuration, 
                   COUNT(*) as record_count,
                   MIN(value) as min_value,
                   MAX(value) as max_value,
                   AVG(value) as avg_value
            FROM {catalog_name}.{table_name}
            WHERE location_id = '{location_id}'
            GROUP BY location_id, configuration
            ORDER BY configuration
            """
            print(f"   Filtering by location: {location_id}")
        else:
            query_sql = f"""
            SELECT location_id, configuration,
                   COUNT(*) as record_count,
                   MIN(value) as min_value,
                   MAX(value) as max_value,
                   AVG(value) as avg_value
            FROM {catalog_name}.{table_name}
            GROUP BY location_id, configuration
            ORDER BY location_id, configuration
            """
        
        result_df = spark.sql(query_sql)
        
        print("📊 Query Results:")
        result_df.show(20, truncate=False)
        
        return result_df
        
    except Exception as e:
        print(f"❌ Failed to query {table_name}: {e}")
        raise

## Time Series Analysis Functions

Demonstrate time-series specific queries and analysis patterns on Iceberg tables.

In [12]:
def demonstrate_time_series_operations(spark: SparkSession):
    """Demonstrate time-series specific queries on Iceberg tables."""
    try:
        print("\n🕐 Time Series Analysis Examples")
        print("-" * 40)
        
        # 1. Time range query
        print("1️⃣ Querying specific time range:")
        time_range_sql = """
        SELECT location_id, configuration, timestamp, value
        FROM iceberg.teehr.timeseries
        WHERE timestamp >= '2023-01-15' AND timestamp <= '2023-01-20'
        ORDER BY location_id, configuration, timestamp
        """
        spark.sql(time_range_sql).show(10)
        
        # 2. Daily statistics by configuration
        print("\n2️⃣ Daily averages by configuration:")
        daily_stats_sql = """
        SELECT DATE(timestamp) as date,
               configuration,
               COUNT(*) as locations_count,
               AVG(value) as daily_avg_flow,
               MIN(value) as daily_min_flow,
               MAX(value) as daily_max_flow
        FROM iceberg.teehr.timeseries
        GROUP BY DATE(timestamp), configuration
        ORDER BY date, configuration
        """
        spark.sql(daily_stats_sql).show(10)
        
        # 3. Location comparison
        print("\n3️⃣ Observed vs Simulated by location:")
        comparison_sql = """
        WITH obs AS (
            SELECT location_id, AVG(value) as observed_avg
            FROM iceberg.teehr.timeseries
            WHERE configuration = 'observed'
            GROUP BY location_id
        ),
        sim AS (
            SELECT location_id, AVG(value) as simulated_avg
            FROM iceberg.teehr.timeseries
            WHERE configuration = 'nwm_retrospective'
            GROUP BY location_id
        )
        SELECT obs.location_id,
               ROUND(obs.observed_avg, 2) as observed_avg,
               ROUND(sim.simulated_avg, 2) as simulated_avg,
               ROUND(sim.simulated_avg - obs.observed_avg, 2) as difference
        FROM obs
        JOIN sim ON obs.location_id = sim.location_id
        ORDER BY obs.location_id
        """
        spark.sql(comparison_sql).show()
        
    except Exception as e:
        print(f"❌ Time series analysis failed: {e}")
        raise

## Main Demonstration

Now let's run the complete demonstration of TEEHR Iceberg data warehouse operations.

In [13]:
print("🌊 TEEHR Iceberg Data Warehouse with PySpark")
print("=" * 60)

# 1. Connect to Iceberg catalog
print("1️⃣ Connecting to Iceberg catalog...")
catalog, warehouse_location = setup_iceberg_catalog()
catalog_uri, _ = get_terraform_outputs()

🌊 TEEHR Iceberg Data Warehouse with PySpark
1️⃣ Connecting to Iceberg catalog...
⚠️ Could not get terraform outputs: [Errno 2] No such file or directory: PosixPath('/Users/mdenno/repos/teehr-eval-sys/examples/infrastructure/environments/dev')
✅ Connected to Iceberg catalog. Namespaces: [('teehr',)]
⚠️ Could not get terraform outputs: [Errno 2] No such file or directory: PosixPath('/Users/mdenno/repos/teehr-eval-sys/examples/infrastructure/environments/dev')
✅ Connected to Iceberg catalog. Namespaces: [('teehr',)]
⚠️ Could not get terraform outputs: [Errno 2] No such file or directory: PosixPath('/Users/mdenno/repos/teehr-eval-sys/examples/infrastructure/environments/dev')


In [14]:
# 2. Create Spark session
print("2️⃣ Creating PySpark session...")
spark = create_spark_session_for_iceberg(catalog_uri, warehouse_location, catalog_name="iceberg")

2️⃣ Creating PySpark session...


:: loading settings :: url = jar:file:/Users/mdenno/repos/teehr-eval-sys/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/mdenno/.ivy2.5.2/cache
The jars for the packages stored in: /Users/mdenno/.ivy2.5.2/jars
org.apache.iceberg#iceberg-spark-runtime-4.0_2.13 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0d2db201-a6bc-41c2-b3bf-4b0611771028;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-4.0_2.13;1.10.0 in central
	found org.apache.hadoop#hadoop-aws;3.4.0 in central
	found software.amazon.awssdk#bundle;2.23.19 in central
	found org.wildfly.openssl#wildfly-openssl;1.1.3.Final in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.772 in central
:: loading settings :: url = jar:file:/Users/mdenno/repos/teehr-eval-sys/.venv/lib

✅ Created Spark 4.0.0 session with Iceberg support


In [15]:
# 3. Verify table exists
print("3️⃣ Verifying Iceberg tables...")
tables_sql = "SHOW TABLES IN iceberg.teehr"
tables_df = spark.sql(tables_sql)
print("Available tables:")
tables_df.show()

3️⃣ Verifying Iceberg tables...
Available tables:
+---------+--------------+-----------+
|namespace|     tableName|isTemporary|
+---------+--------------+-----------+
|    teehr|configurations|      false|
|    teehr|     locations|      false|
|    teehr|    timeseries|      false|
+---------+--------------+-----------+

Available tables:
+---------+--------------+-----------+
|namespace|     tableName|isTemporary|
+---------+--------------+-----------+
|    teehr|configurations|      false|
|    teehr|     locations|      false|
|    teehr|    timeseries|      false|
+---------+--------------+-----------+



In [16]:
# 4. Generate sample data
print("4️⃣ Generating sample hydrologic data...")
observed_df, simulated_df = generate_sample_data(num_locations=5, num_days=30)

print(f"Generated data:")
print(f"  - Observed: {len(observed_df)} records across {observed_df['location_id'].nunique()} locations")
print(f"  - Simulated: {len(simulated_df)} records across {simulated_df['location_id'].nunique()} locations")
print(f"  - Time range: {observed_df['timestamp'].min()} to {observed_df['timestamp'].max()}")

# Show sample of generated data
print("\nSample observed data:")
print(observed_df.head())

4️⃣ Generating sample hydrologic data...
Generated data:
  - Observed: 150 records across 5 locations
  - Simulated: 150 records across 5 locations
  - Time range: 2023-01-01 00:00:00 to 2023-01-30 00:00:00

Sample observed data:
     location_id  timestamp      value          variable_name configuration  \
0  USGS-08000000 2023-01-01  38.848737  streamflow_daily_mean      observed   
1  USGS-08000000 2023-01-02  48.011198  streamflow_daily_mean      observed   
2  USGS-08000000 2023-01-03  50.587728  streamflow_daily_mean      observed   
3  USGS-08000000 2023-01-04  56.862714  streamflow_daily_mean      observed   
4  USGS-08000000 2023-01-05  50.583062  streamflow_daily_mean      observed   

  measurement_unit reference_time  
0              cfs     2023-01-01  
1              cfs     2023-01-02  
2              cfs     2023-01-03  
3              cfs     2023-01-04  
4              cfs     2023-01-05  


In [17]:
all_data = pd.concat([observed_df, simulated_df], ignore_index=True)

In [18]:
# 5. Insert data
print("5️⃣ Inserting data into Iceberg tables...")

# Combine observed and simulated data
insert_data_with_pyspark(spark, all_data, catalog_name="iceberg", table_name="teehr.timeseries")

5️⃣ Inserting data into Iceberg tables...
📝 Inserting 300 records into teehr.timeseries
   Data shape: (300, 7)
   Columns: ['location_id', 'timestamp', 'value', 'variable_name', 'configuration', 'measurement_unit', 'reference_time']
📝 Inserting 300 records into teehr.timeseries
   Data shape: (300, 7)
   Columns: ['location_id', 'timestamp', 'value', 'variable_name', 'configuration', 'measurement_unit', 'reference_time']


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  (0 + 14) / 14]
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
25/09/24 22:18:09 ERROR Utils: Aborting task
java.io.UncheckedIOException: Failed to close current writer
	at org.apache.iceberg.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:128)
	at org.apache.iceberg.io.RollingFileWriter.close(RollingFileWriter.java:156)
	at org.apache.iceberg.io.RollingDataWriter.close(RollingDataWriter.java:32)
	at org.apache.iceberg.spark.source.SparkWrite$UnpartitionedDataWriter.close(SparkWrite.java:792)
	at org.apache.iceberg.spark.source.SparkWrite$UnpartitionedDataWriter.commit(SparkWrite.java:774)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:510)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1323)
	at org.apache.spar

❌ Failed to insert data into teehr.timeseries: An error occurred while calling o61.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in stage 0.0 failed 1 times, most recent failure: Lost task 11.0 in stage 0.0 (TID 11) (rti-lv2d02p06x executor driver): java.io.UncheckedIOException: Failed to close current writer
	at org.apache.iceberg.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:128)
	at org.apache.iceberg.io.RollingFileWriter.close(RollingFileWriter.java:156)
	at org.apache.iceberg.io.RollingDataWriter.close(RollingDataWriter.java:32)
	at org.apache.iceberg.spark.source.SparkWrite$UnpartitionedDataWriter.close(SparkWrite.java:792)
	at org.apache.iceberg.spark.source.SparkWrite$UnpartitionedDataWriter.commit(SparkWrite.java:774)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:510)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1323)
	at 

Py4JJavaError: An error occurred while calling o61.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in stage 0.0 failed 1 times, most recent failure: Lost task 11.0 in stage 0.0 (TID 11) (rti-lv2d02p06x executor driver): java.io.UncheckedIOException: Failed to close current writer
	at org.apache.iceberg.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:128)
	at org.apache.iceberg.io.RollingFileWriter.close(RollingFileWriter.java:156)
	at org.apache.iceberg.io.RollingDataWriter.close(RollingDataWriter.java:32)
	at org.apache.iceberg.spark.source.SparkWrite$UnpartitionedDataWriter.close(SparkWrite.java:792)
	at org.apache.iceberg.spark.source.SparkWrite$UnpartitionedDataWriter.commit(SparkWrite.java:774)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:510)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1323)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:535)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:466)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:584)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:427)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.IOException: software.amazon.awssdk.services.s3.model.S3Exception: Access Denied (Service: S3, Status Code: 403, Request ID: 2DGPV97T9AE3ZMR8, Extended Request ID: W/NyIZnRnAMDVCOttiIgBifpZm76duAfcm5YeH+cqu81vTvlD4JgOxHq+P9AAq/L2oUsT8H+7SQ=)
	at org.apache.iceberg.shaded.org.apache.parquet.io.DelegatingPositionOutputStream.close(DelegatingPositionOutputStream.java:41)
	at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.close(ParquetFileWriter.java:1830)
	at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1816)
	at org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:261)
	at org.apache.iceberg.io.DataWriter.close(DataWriter.java:82)
	at org.apache.iceberg.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:126)
	... 21 more
Caused by: software.amazon.awssdk.services.s3.model.S3Exception: Access Denied (Service: S3, Status Code: 403, Request ID: 2DGPV97T9AE3ZMR8, Extended Request ID: W/NyIZnRnAMDVCOttiIgBifpZm76duAfcm5YeH+cqu81vTvlD4JgOxHq+P9AAq/L2oUsT8H+7SQ=)
	at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156)
	at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108)
	at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85)
	at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43)
	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler$Crc32ValidationResponseHandler.handle(AwsSyncClientHandler.java:93)
	at software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:279)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:50)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:38)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
	at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:224)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:173)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:80)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:182)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:74)
	at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:53)
	at software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:10191)
	at org.apache.iceberg.aws.s3.S3OutputStream.completeUploads(S3OutputStream.java:443)
	at org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:269)
	at org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:255)
	at org.apache.iceberg.shaded.org.apache.parquet.io.DelegatingPositionOutputStream.close(DelegatingPositionOutputStream.java:40)
	... 26 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:424)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:397)
	at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:237)
	at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:360)
	at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:358)
	at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:237)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$2(QueryExecution.scala:155)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:155)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$eagerlyExecute$1(QueryExecution.scala:154)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:169)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:164)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:360)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:356)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:164)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyCommandExecuted$1(QueryExecution.scala:126)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:131)
	at org.apache.spark.sql.classic.Dataset.<init>(Dataset.scala:277)
	at org.apache.spark.sql.classic.Dataset$.$anonfun$ofRows$5(Dataset.scala:140)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.classic.Dataset$.ofRows(Dataset.scala:136)
	at org.apache.spark.sql.classic.SparkSession.$anonfun$sql$1(SparkSession.scala:462)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.classic.SparkSession.sql(SparkSession.scala:449)
	at org.apache.spark.sql.classic.SparkSession.sql(SparkSession.scala:467)
	at org.apache.spark.sql.classic.SparkSession.sql(SparkSession.scala:91)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:840)
	Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
		at scala.Option.getOrElse(Option.scala:201)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
		at scala.collection.immutable.List.foreach(List.scala:334)
		at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
		at scala.Option.foreach(Option.scala:437)
		at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
		at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
		at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
		at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
		at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:424)
		at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:397)
		at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:237)
		at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:360)
		at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:358)
		at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:237)
		at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
		at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
		at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$2(QueryExecution.scala:155)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
		at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
		at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
		at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
		at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
		at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124)
		at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
		at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
		at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
		at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:155)
		at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
		at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$eagerlyExecute$1(QueryExecution.scala:154)
		at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:169)
		at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:164)
		at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
		at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
		at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:360)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:356)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
		at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:164)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyCommandExecuted$1(QueryExecution.scala:126)
		at scala.util.Try$.apply(Try.scala:217)
		at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
		at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
		at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
		... 23 more
Caused by: java.io.UncheckedIOException: Failed to close current writer
	at org.apache.iceberg.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:128)
	at org.apache.iceberg.io.RollingFileWriter.close(RollingFileWriter.java:156)
	at org.apache.iceberg.io.RollingDataWriter.close(RollingDataWriter.java:32)
	at org.apache.iceberg.spark.source.SparkWrite$UnpartitionedDataWriter.close(SparkWrite.java:792)
	at org.apache.iceberg.spark.source.SparkWrite$UnpartitionedDataWriter.commit(SparkWrite.java:774)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:510)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1323)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:535)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:466)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:584)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:427)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: java.io.IOException: software.amazon.awssdk.services.s3.model.S3Exception: Access Denied (Service: S3, Status Code: 403, Request ID: 2DGPV97T9AE3ZMR8, Extended Request ID: W/NyIZnRnAMDVCOttiIgBifpZm76duAfcm5YeH+cqu81vTvlD4JgOxHq+P9AAq/L2oUsT8H+7SQ=)
	at org.apache.iceberg.shaded.org.apache.parquet.io.DelegatingPositionOutputStream.close(DelegatingPositionOutputStream.java:41)
	at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.close(ParquetFileWriter.java:1830)
	at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1816)
	at org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:261)
	at org.apache.iceberg.io.DataWriter.close(DataWriter.java:82)
	at org.apache.iceberg.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:126)
	... 21 more
Caused by: software.amazon.awssdk.services.s3.model.S3Exception: Access Denied (Service: S3, Status Code: 403, Request ID: 2DGPV97T9AE3ZMR8, Extended Request ID: W/NyIZnRnAMDVCOttiIgBifpZm76duAfcm5YeH+cqu81vTvlD4JgOxHq+P9AAq/L2oUsT8H+7SQ=)
	at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156)
	at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108)
	at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85)
	at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43)
	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler$Crc32ValidationResponseHandler.handle(AwsSyncClientHandler.java:93)
	at software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:279)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:50)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:38)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
	at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:224)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:173)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:80)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:182)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:74)
	at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:53)
	at software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:10191)
	at org.apache.iceberg.aws.s3.S3OutputStream.completeUploads(S3OutputStream.java:443)
	at org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:269)
	at org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:255)
	at org.apache.iceberg.shaded.org.apache.parquet.io.DelegatingPositionOutputStream.close(DelegatingPositionOutputStream.java:40)
	... 26 more


25/09/24 22:41:04 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1067079 ms exceeds timeout 800000 ms
25/09/24 22:41:04 WARN SparkContext: Killing executors is not supported by current scheduler.
25/09/24 23:26:51 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:81)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:669)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1296)
	at 

In [None]:
# 6. Query data back
print("6️⃣ Querying data from Iceberg...")

# Query all data summary
print("Summary by location and configuration:")
query_data_with_pyspark(spark, catalog_name="iceberg", table_name="teehr.timeseries")

In [None]:
# Query specific location
sample_location = observed_df['location_id'].iloc[0]
print(f"\nDetailed view for location {sample_location}:")
query_data_with_pyspark(spark, "iceberg", "teehr.timeseries", sample_location)

In [None]:
# 7. Demonstrate time series operations
demonstrate_time_series_operations(spark)

## Summary and Cleanup

Complete the demonstration and clean up resources.

In [None]:
print("\n✅ TEEHR Iceberg PySpark demonstration completed successfully!")
print("🚀 Data successfully stored and queried from persistent Iceberg warehouse")
print(f"📊 Total records processed: {len(all_data)}")

# Clean up Spark session
spark.stop()
print("🧹 Spark session stopped")

## What's Stored Where?

After running this notebook:

### 📊 In S3 Bucket (`dev-teehr-sys-iceberg-warehouse`):
- **Data Files**: Parquet files containing your time series data
- **Metadata Files**: Iceberg metadata (manifests, table metadata JSON)
- **Warehouse Structure**: Organized by namespace and table

### 🗃️ In PostgreSQL Database:
- **Table Registry**: Which tables exist and their schemas
- **Metadata Pointers**: References to S3 metadata files
- **Transaction History**: Snapshots and commit information
- **Table Properties**: Compression, formats, partitioning

The Iceberg architecture provides:
- ✅ **ACID transactions** for data consistency
- ✅ **Schema evolution** without breaking existing data
- ✅ **Time travel** to query historical table states
- ✅ **Efficient querying** with predicate pushdown and pruning