### MinIO explained

In [1]:
import boto3
import os

# These come from your docker-compose env vars
aws_access_key_id = os.environ["AWS_ACCESS_KEY_ID"]     # "admin"
aws_secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"]  # "password"
aws_region = os.environ["AWS_REGION"] # us-east-1

# Mocked S3 client that connects to local MinIO
s3 = boto3.client(
    "s3",
    endpoint_url="http://minio:9000",  # Local MinIO service
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name=aws_region
)

# Create a bucket
s3.create_bucket(Bucket="poc")

# Create a blob (upload a file)
s3.put_object(Bucket="poc", Key="demo.txt", Body=b"Hello, Iceberg!")

{'ResponseMetadata': {'RequestId': '184BFD72F6D07E16',
  'HostId': 'dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'accept-ranges': 'bytes',
   'content-length': '0',
   'etag': '"3af42309382afb590d9143564b4bb8b8"',
   'server': 'MinIO',
   'strict-transport-security': 'max-age=31536000; includeSubDomains',
   'vary': 'Origin, Accept-Encoding',
   'x-amz-checksum-crc32': 'fW13PQ==',
   'x-amz-id-2': 'dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8',
   'x-amz-request-id': '184BFD72F6D07E16',
   'x-content-type-options': 'nosniff',
   'x-ratelimit-limit': '1036',
   'x-ratelimit-remaining': '1036',
   'x-xss-protection': '1; mode=block',
   'date': 'Tue, 24 Jun 2025 13:25:50 GMT'},
  'RetryAttempts': 0},
 'ETag': '"3af42309382afb590d9143564b4bb8b8"',
 'ChecksumCRC32': 'fW13PQ=='}

In [2]:
# List all the blobs

response = s3.list_objects_v2(Bucket="poc")
for obj in response.get("Contents", []):
    print(f"Found object: {obj['Key']}")

Found object: demo.txt


In [3]:
# Read the blob

response = s3.get_object(Bucket="poc", Key="demo.txt")
print(response["Body"].read().decode())

Hello, Iceberg!


In [4]:
# Clean it up

# Delete blob
s3.delete_object(Bucket="poc", Key="demo.txt")

# Delete bucket
s3.delete_bucket(Bucket="poc")

{'ResponseMetadata': {'RequestId': '184BFD7312AA1DF2',
  'HostId': 'dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'accept-ranges': 'bytes',
   'server': 'MinIO',
   'strict-transport-security': 'max-age=31536000; includeSubDomains',
   'vary': 'Origin, Accept-Encoding',
   'x-amz-id-2': 'dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8',
   'x-amz-request-id': '184BFD7312AA1DF2',
   'x-content-type-options': 'nosniff',
   'x-ratelimit-limit': '1036',
   'x-ratelimit-remaining': '1036',
   'x-xss-protection': '1; mode=block',
   'date': 'Tue, 24 Jun 2025 13:25:50 GMT'},
  'RetryAttempts': 0}}

### Iceberg time!

In [5]:
import pyarrow as pa
from pyiceberg.catalog import load_rest
from pyiceberg.exceptions import NamespaceAlreadyExistsError, TableAlreadyExistsError

In [6]:
import pyarrow as pa
from pyiceberg.catalog import load_rest
from pyiceberg.exceptions import NamespaceAlreadyExistsError, TableAlreadyExistsError

catalog = load_rest(
    name="rest",
    conf = {
        "uri": "http://rest:8181/",
        "s3.endpoint": "http://minio:9000",
        "s3.access-key": aws_access_key_id,
        "s3.secret-key": aws_secret_access_key
    }
)

In [7]:
from pyiceberg.exceptions import NamespaceAlreadyExistsError

namespace = "rideshare"

try:
    catalog.create_namespace(namespace)
except NamespaceAlreadyExistsError:
    pass  # It's fine if it already exists

In [8]:
namespaces = catalog.list_namespaces()
print("Namespaces:", namespaces)

Namespaces: [('rideshare',)]


In [9]:
# First, imports
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
    NestedField, UUIDType, StringType, TimestampType, DoubleType
)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.exceptions import NamespaceAlreadyExistsError

In [10]:
rides_schema = Schema(
    NestedField(field_id=1, name="ride_id", field_type=UUIDType(), required=True),
    NestedField(field_id=2, name="driver_id", field_type=StringType()),
    NestedField(field_id=3, name="customer_id", field_type=StringType()),
    NestedField(field_id=4, name="pickup_time", field_type=TimestampType()),
    NestedField(field_id=5, name="dropoff_time", field_type=TimestampType()),
    NestedField(field_id=6, name="fare", field_type=DoubleType()),
    NestedField(field_id=7, name="pickup_location", field_type=StringType()),
    NestedField(field_id=8, name="dropoff_location", field_type=StringType()),
    NestedField(field_id=9, name="status", field_type=StringType())
)

In [11]:
rides_partition_spec = PartitionSpec(
    fields=[
        PartitionField(
            source_id=4,       # pickup_time
            field_id=1000,     # unique ID for this partition
            transform="day",   # group by day
            name="pickup_day"
        )
    ]
)

In [12]:
catalog.create_table(
    identifier=f"{namespace}.rides",
    schema=rides_schema,
    partition_spec=rides_partition_spec
)

rides(
  1: ride_id: required uuid,
  2: driver_id: optional string,
  3: customer_id: optional string,
  4: pickup_time: optional timestamp,
  5: dropoff_time: optional timestamp,
  6: fare: optional double,
  7: pickup_location: optional string,
  8: dropoff_location: optional string,
  9: status: optional string
),
partition by: [pickup_day],
sort order: [],
snapshot: null

In [13]:
import json

response = s3.list_objects_v2(Bucket="warehouse", Prefix="rideshare/rides/metadata/")

for obj in response.get("Contents", []):
    obj_name = obj["Key"]
    print(f"Found file: {obj_name}")
    
    if obj_name.endswith("metadata.json"):
        response = s3.get_object(Bucket="warehouse", Key=obj_name)
        content = response["Body"].read().decode()
        metadata = json.loads(content)
        
        print("\n--- Parsed metadata.json ---\n")
        print(json.dumps(metadata, indent=2))

Found file: rideshare/rides/metadata/00000-f41ba4cc-4686-4c8b-98c1-290e8742bcdc.metadata.json

--- Parsed metadata.json ---

{
  "format-version": 2,
  "table-uuid": "4fce4b55-e6b4-4ea0-9597-3603e62e9fec",
  "location": "s3://warehouse/rideshare/rides",
  "last-sequence-number": 0,
  "last-updated-ms": 1750771559065,
  "last-column-id": 9,
  "current-schema-id": 0,
  "schemas": [
    {
      "type": "struct",
      "schema-id": 0,
      "fields": [
        {
          "id": 1,
          "name": "ride_id",
          "required": true,
          "type": "uuid"
        },
        {
          "id": 2,
          "name": "driver_id",
          "required": false,
          "type": "string"
        },
        {
          "id": 3,
          "name": "customer_id",
          "required": false,
          "type": "string"
        },
        {
          "id": 4,
          "name": "pickup_time",
          "required": false,
          "type": "timestamp"
        },
        {
          "id": 5,
        

In [14]:
sql = """
CREATE CATALOG iceberg_catalog WITH (
  'type'='iceberg',
  'catalog-type'='rest',
  'uri'='http://rest:8181',
  'warehouse'='s3://warehouse',
  'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
  's3.endpoint'='http://minio:9000',
  's3.path-style-access'='true',
  'aws.region'='us-east-1',
  'aws.access-key-id'='admin',
  'aws.secret-access-key'='password'
);

USE CATALOG iceberg_catalog;
USE rideshare;

INSERT INTO rides VALUES
  ('r1', TIMESTAMP '2025-06-01 08:15:00', 19.80),
  ('r2', TIMESTAMP '2025-06-02 14:30:00', 13.50);
"""

with open("/sql/insert.sql", "w") as f:
    f.write(sql)

import subprocess
subprocess.run([
    "docker", "exec", "-i", "flink_sql_client",
    "/opt/flink/bin/sql-client.sh",
    "-f", "/opt/flink/sql/insert.sql"
])

FileNotFoundError: [Errno 2] No such file or directory: 'docker'

In [15]:
import subprocess

In [16]:
subprocess.run([
    "/opt/flink/bin/sql-client.sh",
    "-f", "/sql/insert.sql"
])

Jun 24, 2025 1:26:54 PM org.jline.utils.Log logr


[34;1m[INFO] Executing SQL from file.[0m

Command history file path: /root/.flink-sql-history
Flink SQL> CREATE CATALOG iceberg_catalog WITH (
>   'type'='iceberg',
>   'catalog-type'='rest',
>   'uri'='http://rest:8181',
>   'warehouse'='s3://warehouse',
>   'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
>   's3.endpoint'='http://minio:9000',
>   's3.path-style-access'='true',
>   'aws.region'='us-east-1',
>   'aws.access-key-id'='admin',
>   'aws.secret-access-key'='password'
> )[31;1m[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'iceberg' that implements 'org.apache.flink.table.factories.CatalogFactory' in the classpath.

Available factory identifiers are:

generic_in_memory[0m

Shutting down the session...
done.


CompletedProcess(args=['/opt/flink/bin/sql-client.sh', '-f', '/sql/insert.sql'], returncode=0)

In [106]:
import duckdb
print(duckdb.__version__)

1.3.1


In [108]:
tables = catalog.list_tables("rideshare")
for namespace, table_name in catalog.list_tables("rideshare"):
    print(f"{namespace}.{table_name}")

rideshare.rides


In [109]:
table = catalog.load_table(("rideshare", "rides"))  # or "rideshare.rides"
print(table.location())

s3://warehouse/rideshare/rides


In [131]:

import duckdb

con = duckdb.connect()

con.execute("INSTALL iceberg;")
con.execute("LOAD iceberg;")
con.execute("UPDATE EXTENSIONS;")

con.execute("SET s3_region TO 'us-east-1'")
con.execute("SET s3_access_key_id TO 'admin'")
con.execute("SET s3_secret_access_key TO 'password'")
con.execute("SET s3_endpoint TO 'minio:9000'")
con.execute("SET s3_use_ssl TO false")  # ✅ THIS is the key!

<duckdb.duckdb.DuckDBPyConnection at 0x7fcadd4ca8b0>

In [132]:
# Check if DuckDB can write to storage
df = pd.DataFrame([{"x": 1, "y": "test"}])
con.register("mydf", df)

con.execute("""
    COPY mydf TO 's3://warehouse/test-parquet/test.parquet' (FORMAT 'parquet');
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7fcadd4ca8b0>

In [133]:
import pandas as pd
from datetime import datetime
import uuid

df = pd.DataFrame([
    {
        "ride_id": str(uuid.uuid4()),
        "driver_id": "d1",
        "customer_id": "c1",
        "pickup_time": datetime(2024, 6, 1, 8, 15),
        "dropoff_time": datetime(2024, 6, 1, 8, 45),
        "fare": 19.80,
        "pickup_location": "Downtown",
        "dropoff_location": "Airport",
        "status": "completed"
    },
    {
        "ride_id": str(uuid.uuid4()),
        "driver_id": "d2",
        "customer_id": "c2",
        "pickup_time": datetime(2024, 6, 2, 14, 30),
        "dropoff_time": datetime(2024, 6, 2, 14, 55),
        "fare": 13.50,
        "pickup_location": "Midtown",
        "dropoff_location": "Suburbs",
        "status": "completed"
    }
])

In [134]:
con.register("rides_df", df)

<duckdb.duckdb.DuckDBPyConnection at 0x7fcadd4ca8b0>

In [135]:
con.execute("""
    CREATE TABLE 's3://warehouse/rideshare/rides' AS
    SELECT * FROM rides_df
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7fcadd4ca8b0>

In [136]:
con.execute("SELECT * FROM 's3://warehouse/rideshare/rides'").fetchdf()

Unnamed: 0,ride_id,driver_id,customer_id,pickup_time,dropoff_time,fare,pickup_location,dropoff_location,status
0,39945a6b-9cd1-4389-9e93-5e55f39fbf0d,d1,c1,2024-06-01 08:15:00,2024-06-01 08:45:00,19.8,Downtown,Airport,completed
1,8c164803-8e00-485e-95d4-4cfd81344fc8,d2,c2,2024-06-02 14:30:00,2024-06-02 14:55:00,13.5,Midtown,Suburbs,completed


In [118]:
con.execute("""
ATTACH 's3://warehouse' AS warehouse
(
    TYPE ICEBERG,
    CATALOG_TYPE HADOOP,
    STORAGE_NAMESPACE 's3://warehouse',
    AUTHORIZATION_TYPE 'none'
)
""")

Error: Unhandled options found: catalog_type, storage_namespace

In [115]:
# Insert more sample rides
df2 = pd.DataFrame([
    {
        "ride_id": str(uuid.uuid4()),
        "driver_id": "d3",
        "customer_id": "c3",
        "pickup_time": datetime(2024, 6, 3, 9, 15),
        "dropoff_time": datetime(2024, 6, 3, 9, 45),
        "fare": 29.99,
        "pickup_location": "OldTown",
        "dropoff_location": "CityCenter",
        "status": "completed"
    }
])

con.register("rides_df2", df2)

con.execute("""
    INSERT INTO 's3://warehouse/rideshare/rides'
    SELECT * FROM rides_df2
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7fcadde9e830>

In [116]:
con.execute("SELECT * FROM 's3://warehouse/rideshare/rides'").fetchdf()

Unnamed: 0,ride_id,driver_id,customer_id,pickup_time,dropoff_time,fare,pickup_location,dropoff_location,status
0,bf986a84-6769-4008-aa9b-cc74c5df4297,d1,c1,2024-06-01 08:15:00,2024-06-01 08:45:00,19.8,Downtown,Airport,completed
1,5ac2471f-6cec-41cc-a1c2-86435c01cda2,d2,c2,2024-06-02 14:30:00,2024-06-02 14:55:00,13.5,Midtown,Suburbs,completed
2,7d82f8fa-6198-4358-854b-2cc5ace076b6,d3,c3,2024-06-03 09:15:00,2024-06-03 09:45:00,29.99,OldTown,CityCenter,completed


In [103]:
con.execute('DROP TABLE "s3://warehouse/rideshare/rides"')

<duckdb.duckdb.DuckDBPyConnection at 0x7fcad823bdf0>

In [73]:
con.execute("""
    CREATE TABLE 's3://warehouse/rideshare/rides' AS
    SELECT * FROM rides_df2
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7fcadd539230>

In [75]:
con.execute("SELECT * FROM 's3://warehouse/rideshare/rides'").fetchdf()

Unnamed: 0,ride_id,driver_id,customer_id,pickup_time,dropoff_time,fare,pickup_location,dropoff_location,status
0,5c3136f2-8f59-48b4-9b01-0055348a92e8,d3,c3,2024-06-03 09:15:00,2024-06-03 09:45:00,29.99,OldTown,CityCenter,completed


In [74]:
response = s3.list_objects_v2(Bucket="warehouse", Prefix="rideshare/rides/data/")
for obj in response.get("Contents", []):
    print(obj["Key"], obj["Size"])

In [81]:
con.execute('DROP TABLE IF EXISTS "s3://warehouse/rideshare/rides"')

<duckdb.duckdb.DuckDBPyConnection at 0x7fcadc118770>

In [96]:
df

Unnamed: 0,x,y
0,1,test


In [98]:
import duckdb

con = duckdb.connect()


con.execute("SET s3_region TO 'us-east-1'")
con.execute("SET s3_access_key_id TO 'admin'")
con.execute("SET s3_secret_access_key TO 'password'")
con.execute("SET s3_endpoint TO 'minio:9000'")
con.execute("SET s3_url_style TO 'path'")
con.execute("SET s3_use_ssl TO false")  # ✅ THIS is the key!

con.register("rides_df", df)

# ✅ Use the raw S3 path to the Iceberg table
con.execute("""
    CREATE TABLE 's3://warehouse/rideshare/rides' AS
    SELECT * FROM rides_df
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7fcad823bdf0>

In [99]:
df = pd.DataFrame([{"x": 1, "y": "test"}])
con.register("mydf", df)

con.execute("""
    COPY mydf TO 's3://warehouse/test-parquet/test.parquet' (FORMAT 'parquet');
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7fcad823bdf0>

In [100]:
con.execute("SELECT * FROM 's3://warehouse/rideshare/rides'").fetchdf()

Unnamed: 0,x,y
0,1,test


In [101]:
df = pd.DataFrame([{"x": 1, "y": "test"}])
con.register("mydf", df)

con.execute("""
    COPY mydf TO 's3://warehouse/test-parquet/test.parquet' (FORMAT 'parquet');
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7fcad823bdf0>

In [91]:
s3.create_bucket(Bucket="warehouse")

BucketAlreadyOwnedByYou: An error occurred (BucketAlreadyOwnedByYou) when calling the CreateBucket operation: Your previous request to create the named bucket succeeded and you already own it.

In [90]:
s3.list_buckets()

{'ResponseMetadata': {'RequestId': '184BEFCA3E0C4B10',
  'HostId': 'dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'accept-ranges': 'bytes',
   'content-length': '367',
   'content-type': 'application/xml',
   'server': 'MinIO',
   'strict-transport-security': 'max-age=31536000; includeSubDomains',
   'vary': 'Origin, Accept-Encoding',
   'x-amz-id-2': 'dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8',
   'x-amz-request-id': '184BEFCA3E0C4B10',
   'x-content-type-options': 'nosniff',
   'x-ratelimit-limit': '1588',
   'x-ratelimit-remaining': '1588',
   'x-xss-protection': '1; mode=block',
   'date': 'Tue, 24 Jun 2025 09:15:32 GMT'},
  'RetryAttempts': 0},
 'Buckets': [{'Name': 'warehouse',
   'CreationDate': datetime.datetime(2025, 6, 23, 14, 34, 21, 19000, tzinfo=tzlocal())}],
 'Owner': {'DisplayName': 'minio',
  'ID': '02d6176db174dc93cb1b899f7c6078f08654445fe8cf1b6ce98d8855f66bdbf4'}}

In [77]:
import duckdb
import pandas as pd
import uuid
from datetime import datetime

# Sample row
df = pd.DataFrame([{
    "ride_id": str(uuid.uuid4()),
    "driver_id": "d100",
    "pickup_time": datetime.now(),
    "fare": 99.99
}])

# Connect
con = duckdb.connect()

# Register
con.register("df", df)

# Try writing directly to S3 (just as Parquet)
con.execute("""
    COPY df TO 's3://warehouse/test-output/rides.parquet'
    (FORMAT 'parquet');
""")

IOException: IO Error: Cannot open file "s3://warehouse/test-output/rides.parquet": No such file or directory

In [62]:
table = catalog.load_table("rideshare.rides")
print(table.snapshots())

[]


In [41]:
con.execute("SELECT * FROM 's3://warehouse/rideshare/rides'").fetchdf()

CatalogException: Catalog Error: Table with name s3://warehouse/rideshare/rides does not exist!
Did you mean "pg_sequence"?

LINE 1: SELECT * FROM 's3://warehouse/rideshare/rides'
                      ^

In [27]:
con.execute("""
    ATTACH 's3://warehouse' AS warehouse
    (TYPE ICEBERG,
     CATALOG_TYPE 'rest',
     URI 'http://localhost:8181',
     STORAGE_NAMESPACE 's3://warehouse',
     AUTHORIZATION_TYPE 'none');
""")

Error: Unhandled options found: catalog_type, storage_namespace, uri

In [22]:
con.register("rides_df", df)

<duckdb.duckdb.DuckDBPyConnection at 0x7fddf3d58630>

In [23]:
con.execute("""
    INSERT INTO 's3://warehouse/rideshare/rides'
    SELECT * FROM rides_df
""")

CatalogException: Catalog Error: Table with name s3://warehouse/rideshare/rides does not exist!
Did you mean "pg_sequence"?

In [28]:
# Define schema using NestedField

# ❗ Iceberg requires all fields to have stable, explicit IDs.
# This is critical for schema evolution and tracking changes over time.
# That's why we use NestedField() — each field has:
# - field_id: required, stable numeric ID
# - name: field name
# - field_type: Iceberg data type
# - required: whether the field is NOT NULL

rides_schema = Schema(
    NestedField(field_id=1, name="ride_id", field_type=UUIDType(), required=True),
    NestedField(field_id=2, name="driver_id", field_type=StringType(), required=False),
    NestedField(field_id=3, name="customer_id", field_type=StringType(), required=False),
    NestedField(field_id=4, name="pickup_time", field_type=TimestampType(), required=False),
    NestedField(field_id=5, name="dropoff_time", field_type=TimestampType(), required=False),
    NestedField(field_id=6, name="fare", field_type=DoubleType(), required=False),
    NestedField(field_id=7, name="pickup_location", field_type=StringType(), required=False),
    NestedField(field_id=8, name="dropoff_location", field_type=StringType(), required=False)
)

from pyiceberg.partitioning import PartitionSpec

rides_partition_spec = PartitionSpec(
    fields=[
        PartitionField(
            source_id=4,
            field_id=1000,
            transform="identity",
            name="pickup_time"
        )
    ]
)

# Drop if exists (optional)
# try:
#     catalog.drop_table(identifier=f"{namespace}.rides")
# except NoSuchTableError:
#     pass

catalog.create_table(
    identifier=f"{namespace}.rides",
    schema=rides_schema,
    partition_spec=rides_partition_spec
)

rides(
  1: ride_id: required uuid,
  2: driver_id: optional string,
  3: customer_id: optional string,
  4: pickup_time: optional timestamp,
  5: dropoff_time: optional timestamp,
  6: fare: optional double,
  7: pickup_location: optional string,
  8: dropoff_location: optional string
),
partition by: [pickup_time],
sort order: [],
snapshot: null

In [34]:
drivers_schema = Schema(
    NestedField(field_id=1, name="driver_id", field_type=StringType(), required=True),
    NestedField(field_id=2, name="full_name", field_type=StringType(), required=False),
    NestedField(field_id=3, name="city", field_type=StringType(), required=False),
    NestedField(field_id=4, name="active", field_type=BooleanType(), required=False),
    NestedField(field_id=5, name="rating", field_type=IntegerType(), required=False),
    NestedField(field_id=6, name="last_updated", field_type=TimestampType(), required=False)
)

# Partition by city (field_id=3)
drivers_partition_spec = PartitionSpec(
    fields=[
        PartitionField(
            source_id=3,
            field_id=1001,
            transform="identity",
            name="city"
        )
    ]
)

# Create the table
catalog.create_table(
    identifier=f"{namespace}.drivers",
    schema=drivers_schema,
    partition_spec=drivers_partition_spec
)

drivers(
  1: driver_id: required string,
  2: full_name: optional string,
  3: city: optional string,
  4: active: optional boolean,
  5: rating: optional int,
  6: last_updated: optional timestamp
),
partition by: [city],
sort order: [],
snapshot: null

In [36]:
payments_schema = Schema(
    NestedField(field_id=1, name="payment_id", field_type=UUIDType(), required=True),
    NestedField(field_id=2, name="ride_id", field_type=StringType(), required=False),
    NestedField(field_id=3, name="customer_id", field_type=StringType(), required=False),
    NestedField(field_id=4, name="amount", field_type=DecimalType(precision=10, scale=2), required=False),
    NestedField(field_id=5, name="status", field_type=StringType(), required=False),  # e.g. "paid", "refunded"
    NestedField(field_id=6, name="timestamp", field_type=TimestampType(), required=False)
)

# Partition by 'timestamp' (field_id=6)
payments_partition_spec = PartitionSpec(
    fields=[
        PartitionField(
            source_id=6,
            field_id=1002,
            transform="identity",
            name="timestamp"
        )
    ]
)


catalog.create_table(
    identifier=f"{namespace}.payments",
    schema=payments_schema,
    partition_spec=payments_partition_spec
)

payments(
  1: payment_id: required uuid,
  2: ride_id: optional string,
  3: customer_id: optional string,
  4: amount: optional decimal(10, 2),
  5: status: optional string,
  6: timestamp: optional timestamp
),
partition by: [timestamp],
sort order: [],
snapshot: null

In [None]:
import pyarrow as pa
from pyiceberg.catalog import load_rest
from pyiceberg.exceptions import NamespaceAlreadyExistsError, TableAlreadyExistsError
import boto3

aws_access_key_id = os.environ["AWS_ACCESS_KEY_ID"] # admin
aws_secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"] # password

catalog = load_rest(
    name="rest",
    conf = {
        "uri": "http://rest:8181/",
        "s3.endpoint": "http://minio:9000",
        "s3.access-key": aws_access_key_id,
        "s3.secret-key": aws_secret_access_key
    }
)

# Create a S3 "mocked" client with iceberg user credentials
s3 = boto3.client(
    "s3",
    endpoint_url="http://minio:9000",  # ✅ Use the container name
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name="us-east-1"
)

namespace = "poc_new"
try:
    catalog.create_namespace(namespace)
except NamespaceAlreadyExistsError as e:
    pass

namespaces = catalog.list_namespaces()
print("Namespaces:", namespaces)

def list_blobs(bucket=None):
    """
    Lists blobs (objects) in a specific S3 bucket or in all buckets.

    Parameters:
        bucket (str, optional): Bucket name. If not provided, lists objects in all buckets.
    """
    if bucket:
        print(f"\nObjects in bucket: {bucket}")
        _print_bucket_objects(bucket)
    else:
        buckets = s3.list_buckets()["Buckets"]
        for b in buckets:
            bucket_name = b["Name"]
            print(f"\nObjects in bucket: {bucket_name}")
            _print_bucket_objects(bucket_name)


def _print_bucket_objects(bucket_name):
    response = s3.list_objects_v2(Bucket=bucket_name)
    if "Contents" in response:
        for obj in response["Contents"]:
            print(f" - {obj['Key']}")
    else:
        print(" (Empty)")

list_blobs()

df = pa.Table.from_pylist(
    [
        {"lat": 52.371807, "long": 4.896029},
        {"lat": 52.387386, "long": 4.646219},
        {"lat": 52.078663, "long": 4.288788},
    ],
)
schema = df.schema

table_name = "coordinates"
table_identifier = f"{namespace}.{table_name}"

try:
    table = catalog.create_table(
        identifier=table_identifier,
        schema=schema,
    )
except TableAlreadyExistsError as e:
    pass

table = catalog.load_table(table_identifier)
table.append(df)

result = table.scan().to_arrow()
print(result)

list_blobs()

In [4]:
# Create a S3 "mocked" client with iceberg user credentials
s3 = boto3.client(
    "s3",
    endpoint_url="http://minio:9000",  # ✅ Use the container name
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name="us-east-1"
)

In [5]:
namespace = "poc_new"
try:
    catalog.create_namespace(namespace)
except NamespaceAlreadyExistsError as e:
    pass

In [6]:
namespaces = catalog.list_namespaces()
print("Namespaces:", namespaces)

Namespaces: [('poc_new',)]


In [7]:
def list_blobs(bucket=None):
    """
    Lists blobs (objects) in a specific S3 bucket or in all buckets.

    Parameters:
        bucket (str, optional): Bucket name. If not provided, lists objects in all buckets.
    """
    if bucket:
        print(f"\nObjects in bucket: {bucket}")
        _print_bucket_objects(bucket)
    else:
        buckets = s3.list_buckets()["Buckets"]
        for b in buckets:
            bucket_name = b["Name"]
            print(f"\nObjects in bucket: {bucket_name}")
            _print_bucket_objects(bucket_name)


def _print_bucket_objects(bucket_name):
    response = s3.list_objects_v2(Bucket=bucket_name)
    if "Contents" in response:
        for obj in response["Contents"]:
            print(f" - {obj['Key']}")
    else:
        print(" (Empty)")


In [8]:
list_blobs()


Objects in bucket: warehouse
 (Empty)


In [9]:
df = pa.Table.from_pylist(
    [
        {"lat": 52.371807, "long": 4.896029},
        {"lat": 52.387386, "long": 4.646219},
        {"lat": 52.078663, "long": 4.288788},
    ],
)
schema = df.schema

table_name = "coordinates"
table_identifier = f"{namespace}.{table_name}"

In [10]:
try:
    table = catalog.create_table(
        identifier=table_identifier,
        schema=schema,
    )
except TableAlreadyExistsError as e:
    pass

In [11]:
table = catalog.load_table(table_identifier)
table.append(df)

In [12]:
result = table.scan().to_arrow()
print(result)

pyarrow.Table
lat: double
long: double
----
lat: [[52.371807,52.387386,52.078663]]
long: [[4.896029,4.646219,4.288788]]




In [13]:
list_blobs()


Objects in bucket: warehouse
 - poc_new/coordinates/data/00000-0-977d2bf6-fc86-443a-bd63-5e7b06caffbd.parquet
 - poc_new/coordinates/metadata/00000-ab97f938-d7c6-4d14-8142-eb88f3da9569.metadata.json
 - poc_new/coordinates/metadata/00001-565668f6-fe20-4ee2-98f6-0fc10bba87c7.metadata.json
 - poc_new/coordinates/metadata/977d2bf6-fc86-443a-bd63-5e7b06caffbd-m0.avro
 - poc_new/coordinates/metadata/snap-1981489265837032690-0-977d2bf6-fc86-443a-bd63-5e7b06caffbd.avro
