# S3 Tables Catalog REST Service Example

This notebook demonstrates a complete workflow using the S3 Tables Catalog REST service with PySpark:
1. Initialize connections
2. Create a namespace and table
3. Write data to the table
4. Read data from the table
5. Clean up resources

In [None]:
!pip install requests pyspark==3.5.0 'urllib3<2.0.0'
!mkdir -p jars
!wget -P jars https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.0/iceberg-spark-runtime-3.5_2.12-1.5.0.jar
!wget -P jars https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.29.26/bundle-2.29.26.jar
!wget -P jars https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/2.29.26/url-connection-client-2.29.26.jar

: 

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Initialize Spark session with S3 and Iceberg configurations
spark = SparkSession.builder \
    .appName("S3TablesCatalogExample") \
    .config("spark.jars", "jars/iceberg-spark-runtime-3.5_2.12-1.5.0.jar,jars/bundle-2.29.26.jar,jars/url-connection-client-2.29.26.jar") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.demo.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") \
    .config("spark.sql.catalog.demo.uri", "http://localhost:8080") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:4566") \
    .config("spark.hadoop.fs.s3a.access.key", "test") \
    .config("spark.hadoop.fs.s3a.secret.key", "test") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .getOrCreate()

In [None]:
# Test connection to S3 Tables Catalog REST service
response = requests.get('http://catalog-rest:8080/v1/config')
print("Catalog Configuration:")
print(json.dumps(response.json(), indent=2))

In [None]:
# Create a test namespace
namespace_data = {
    "namespace": "demo.test",
    "properties": {
        "comment": "Demo namespace for testing",
        "owner": "data_team"
    }
}

response = requests.post('http://catalog-rest:8080/v1/namespaces', json=namespace_data)
print(f"Create namespace status: {response.status_code}")

# List namespaces to verify creation
response = requests.get('http://catalog-rest:8080/v1/namespaces')
print("\nAvailable namespaces:")
print(json.dumps(response.json(), indent=2))

In [None]:
# Create a test table
table_data = {
    "namespace": "demo.test",
    "name": "sales",
    "schema": {
        "type": "struct",
        "fields": [
            {"id": 1, "name": "sale_id", "type": "long", "required": True},
            {"id": 2, "name": "product", "type": "string"},
            {"id": 3, "name": "quantity", "type": "integer"},
            {"id": 4, "name": "price", "type": "double"},
            {"id": 5, "name": "sale_date", "type": "date"}
        ]
    },
    "properties": {
        "write.format.default": "parquet",
        "write.metadata.compression-codec": "gzip"
    }
}

response = requests.post('http://catalog-rest:8080/v1/tables', json=table_data)
print("Table creation response:")
print(json.dumps(response.json(), indent=2))

In [None]:
# Create sample data
from datetime import date

# Define schema
schema = StructType([
    StructField("sale_id", LongType(), False),
    StructField("product", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", DoubleType(), True),
    StructField("sale_date", DateType(), True)
])

# Sample data
data = [
    (1, "Laptop", 1, 999.99, date(2024, 1, 1)),
    (2, "Mouse", 2, 24.99, date(2024, 1, 1)),
    (3, "Keyboard", 1, 89.99, date(2024, 1, 2)),
    (4, "Monitor", 2, 299.99, date(2024, 1, 2)),
    (5, "Headphones", 3, 79.99, date(2024, 1, 3))
]

# Create DataFrame
df = spark.createDataFrame(data, schema)
print("Sample DataFrame:")
df.show()

In [None]:
# Write data to the table
df.writeTo("demo.demo.test.sales").using("iceberg").createOrReplace()
print("Data written successfully!")

In [None]:
# Read data from the table
result = spark.table("demo.demo.test.sales")
print("Reading data from table:")
result.show()

# Run some analytics
print("\nSales summary by product:")
result.groupBy("product") \
    .agg({"quantity": "sum", "price": "sum"}) \
    .withColumnRenamed("sum(quantity)", "total_quantity") \
    .withColumnRenamed("sum(price)", "total_revenue") \
    .show()

In [None]:
# Clean up resources
# Drop the table
response = requests.delete('http://catalog-rest:8080/v1/tables/demo.test/sales')
print(f"Drop table status: {response.status_code}")

# List remaining tables
response = requests.get('http://catalog-rest:8080/v1/tables?namespace=demo.test')
print("\nRemaining tables:")
print(json.dumps(response.json(), indent=2))