In [None]:
pip install "pyiceberg[glue]" pyarrow



In [None]:
from pyiceberg.catalog import load_catalog
import os, getpass

catalog_properties = {
    "client.region": "us-east-1",
    "client.access-key-id":"",
    "client.secret-access-key": "",
}

catalog = load_catalog(type="glue", **catalog_properties)

In [None]:
namespace = "trials"
catalog.create_namespace_if_not_exists(namespace)

In [None]:
from pyiceberg.schema import Schema
from pyiceberg.types import (
    LongType,
    StringType,
    DoubleType,
    BooleanType,
    DateType,
    TimestampType,
    NestedField,
)


s3_bucket   = "open-table-demo/"
tables_path = "tables/"
table_name  = "observations"

schema = Schema(
    NestedField(1,  "file_no",      LongType(),     required=True),
    NestedField(2,  "id",           LongType(),     required=True),
    NestedField(3,  "name",         StringType(),   required=False),
    NestedField(4,  "category",     StringType(),   required=False),
    NestedField(5,  "region",       StringType(),   required=False),
    NestedField(6,  "value",        LongType(),     required=False),
    NestedField(7,  "price",        DoubleType(),   required=False),
    NestedField(8,  "is_active",    BooleanType(),  required=False),
    NestedField(9,  "ts",           TimestampType(),required=False),
    NestedField(10, "event_date",   DateType(),     required=False),
    NestedField(11, "note",         StringType(),   required=False),
)

table = catalog.create_table_if_not_exists(
    "{}.{}".format(namespace, table_name),
    schema,
    location="s3://{}{}".format(s3_bucket, tables_path),
)

print("Created/loaded table:", table.name, "->", table.location())


Created/loaded table: <bound method Table.name of observations(
  1: file_no: required long,
  2: id: required long,
  3: name: optional string,
  4: category: optional string,
  5: region: optional string,
  6: value: optional long,
  7: price: optional double,
  8: is_active: optional boolean,
  9: ts: optional timestamp,
  10: event_date: optional date,
  11: note: optional string
),
partition by: [],
sort order: [],
snapshot: null> -> s3://open-table-demo/tables


In [None]:
import pyarrow as pa

files_path = "inputs/"

fs = pa.fs.S3FileSystem(
    access_key="",
    secret_key="",
    region= "us-east-1",
)

selector = pa.fs.FileSelector("{}{}".format(s3_bucket, files_path), recursive=True)



In [None]:
file_list = [f.path for f in fs.get_file_info(selector)]

In [None]:
print(file_list)

['open-table-demo/inputs/events_part_01.parquet', 'open-table-demo/inputs/events_part_02.parquet', 'open-table-demo/inputs/events_part_03.parquet', 'open-table-demo/inputs/events_part_04.parquet', 'open-table-demo/inputs/events_part_05.parquet', 'open-table-demo/inputs/events_part_06.parquet', 'open-table-demo/inputs/events_part_07.parquet', 'open-table-demo/inputs/events_part_08.parquet', 'open-table-demo/inputs/events_part_09.parquet', 'open-table-demo/inputs/events_part_10.parquet']


In [None]:
import pyarrow as pa
from pyarrow import parquet as pq

start_idx = 1
end_idx   = 3

input_schema = pa.schema([
    ("file_no",    pa.int64(),  False),
    ("id",         pa.int64(),  False),
    ("name",       pa.string(), True),
    ("category",   pa.string(), True),
    ("region",     pa.string(), True),
    ("value",      pa.int64(),  True),
    ("price",      pa.float64(),True),
    ("is_active",  pa.bool_(),  True),
    ("ts",         pa.timestamp("us"), True),
    ("event_date", pa.date32(), True),
    ("note",       pa.string(), True),
])

for file_path in file_list[start_idx:end_idx]:
    tbl = pq.read_table(file_path, schema=input_schema, filesystem=fs)
    table.append(tbl)

print(f"Appended {end_idx - start_idx} file(s) into {table.name() if callable(getattr(table,'name',None)) else table.name}")


Appended 2 file(s) into ('trials', 'observations')


**1. ACID Transactions**

In [None]:
def load_as_arrow(path: str):
    t = pq.read_table(path, filesystem=fs)
    return t.cast(input_schema)

input_schema = pa.schema([
    ("file_no",    pa.int64(),  False),
    ("id",         pa.int64(),  False),
    ("name",       pa.string(), True),
    ("category",   pa.string(), True),
    ("region",     pa.string(), True),
    ("value",      pa.int64(),  True),
    ("price",      pa.float64(),True),
    ("is_active",  pa.bool_(),  True),
    ("ts",         pa.timestamp("us"), True),
    ("event_date", pa.date32(), True),
    ("note",       pa.string(), True),
])

In [None]:
from pyarrow import parquet as pq
start_snapshot = table.current_snapshot()

batch_a = load_as_arrow(file_list[8])
table.append(batch_a)
snap_after_a = table.current_snapshot()

batch_b = load_as_arrow(file_list[9])
table.append(batch_b)
snap_after_b = table.current_snapshot()

print("Start snapshot:", start_snapshot.snapshot_id if start_snapshot else None)
print("After A:", snap_after_a.snapshot_id)
print("After B:", snap_after_b.snapshot_id)
print("Total snapshots:", len(list(table.snapshots())))


Start snapshot: 6258188288543477695
After A: 7864378507232075376
After B: 4951251162120782992
Total snapshots: 4


In [None]:
print("Start snapshot:", start_snapshot.snapshot_id if start_snapshot else None)
print("After A:", snap_after_a.snapshot_id)
print("After B:", snap_after_b.snapshot_id)
print("Total snapshots:", len(list(table.snapshots())))

Start snapshot: 6258188288543477695
After A: 7864378507232075376
After B: 4951251162120782992
Total snapshots: 4


In [None]:
for snap in table.snapshots():
    print("\nSnapshot ID:", snap.snapshot_id)
    print("Timestamp:", snap.timestamp_ms)
    print("Operation:", snap.summary.get('operation', 'N/A'))
    print("Manifest List file:", snap.manifest_list)


Snapshot ID: 1995780804529788759
Timestamp: 1755503024722
Operation: Operation.APPEND
Manifest List file: s3://open-table-demo/tables/metadata/snap-1995780804529788759-0-057eb60f-4e4e-426c-af1c-343ad20c6bdc.avro

Snapshot ID: 6258188288543477695
Timestamp: 1755503026778
Operation: Operation.APPEND
Manifest List file: s3://open-table-demo/tables/metadata/snap-6258188288543477695-0-3035acb8-676f-431b-bc5f-d2d915531cf8.avro

Snapshot ID: 7864378507232075376
Timestamp: 1755503028693
Operation: Operation.APPEND
Manifest List file: s3://open-table-demo/tables/metadata/snap-7864378507232075376-0-e475e69b-2db0-4d76-999f-29642d97735d.avro

Snapshot ID: 4951251162120782992
Timestamp: 1755503030543
Operation: Operation.APPEND
Manifest List file: s3://open-table-demo/tables/metadata/snap-4951251162120782992-0-57189eef-0cee-415b-9d61-7b877445861a.avro


In [None]:
snap = table.current_snapshot()

print("Latest snapshot:", snap.snapshot_id)
print("Manifest list file:", snap.manifest_list)

manifests = snap.manifests(io=table.io)

for i, m in enumerate(manifests, 1):
    path = getattr(m, "manifest_path", None) or getattr(m, "path", None)
    print(f"\nManifest {i}: {path}")
    print("  added   :", getattr(m, "added_files_count", None))
    print("  existing:", getattr(m, "existing_files_count", None))
    print("  deleted :", getattr(m, "deleted_files_count", None))


Latest snapshot: 4951251162120782992
Manifest list file: s3://open-table-demo/tables/metadata/snap-4951251162120782992-0-57189eef-0cee-415b-9d61-7b877445861a.avro

Manifest 1: s3://open-table-demo/tables/metadata/57189eef-0cee-415b-9d61-7b877445861a-m0.avro
  added   : 1
  existing: 0
  deleted : 0

Manifest 2: s3://open-table-demo/tables/metadata/e475e69b-2db0-4d76-999f-29642d97735d-m0.avro
  added   : 1
  existing: 0
  deleted : 0

Manifest 3: s3://open-table-demo/tables/metadata/3035acb8-676f-431b-bc5f-d2d915531cf8-m0.avro
  added   : 1
  existing: 0
  deleted : 0

Manifest 4: s3://open-table-demo/tables/metadata/057eb60f-4e4e-426c-af1c-343ad20c6bdc-m0.avro
  added   : 1
  existing: 0
  deleted : 0


In [None]:
def file_paths_for_snapshot(tbl, snapshot_id):
    tasks = tbl.scan(snapshot_id=snapshot_id).plan_files()
    return {t.file.file_path for t in tasks}

def snapshots_sorting(tbl):
    snaps = list(tbl.snapshots())
    return sorted(snaps, key=lambda s: s.timestamp_ms or 0)


In [None]:
snaps = snapshots_sorting(table)

print(f"Total there are {len(snaps)} snapshots\n")

for snap in snaps:
    print(f"\n Snapshot ID={snap.snapshot_id}  ts_ms={snap.timestamp_ms}")

    print(f"Manifest list: {snap.manifest_list}")

    manifests = snap.manifests(io=table.io)
    for i, m in enumerate(manifests, 1):
        path = getattr(m, "manifest_path", None) or getattr(m, "path", None)
        added   = getattr(m, "added_files_count", None)
        existing= getattr(m, "existing_files_count", None)
        deleted = getattr(m, "deleted_files_count", None)
        print(f"  Manifest {i}: {path}  (added={added}, existing={existing}, deleted={deleted})")

    files = file_paths_for_snapshot(table, snap.snapshot_id)
    print(f"Data files in this snapshot: {len(files)}")
    for p in files:
        print(f"  {p}")

print("\nAdded data files per snapshot")
prev_set = set()
for idx, snap in enumerate(snaps):
    curr_set = set(file_paths_for_snapshot(table, snap.snapshot_id))
    added = curr_set - prev_set if idx > 0 else curr_set
    print(f"\nSnapshot {snap.snapshot_id}: added {len(added)} file(s)")
    for p in sorted(added):
        print(f"  {p}")
    prev_set = curr_set


Total there are 4 snapshots


 Snapshot ID=1995780804529788759  ts_ms=1755503024722
Manifest list: s3://open-table-demo/tables/metadata/snap-1995780804529788759-0-057eb60f-4e4e-426c-af1c-343ad20c6bdc.avro
  Manifest 1: s3://open-table-demo/tables/metadata/057eb60f-4e4e-426c-af1c-343ad20c6bdc-m0.avro  (added=1, existing=0, deleted=0)
Data files in this snapshot: 1
  s3://open-table-demo/tables/data/00000-0-057eb60f-4e4e-426c-af1c-343ad20c6bdc.parquet

 Snapshot ID=6258188288543477695  ts_ms=1755503026778
Manifest list: s3://open-table-demo/tables/metadata/snap-6258188288543477695-0-3035acb8-676f-431b-bc5f-d2d915531cf8.avro
  Manifest 1: s3://open-table-demo/tables/metadata/3035acb8-676f-431b-bc5f-d2d915531cf8-m0.avro  (added=1, existing=0, deleted=0)
  Manifest 2: s3://open-table-demo/tables/metadata/057eb60f-4e4e-426c-af1c-343ad20c6bdc-m0.avro  (added=1, existing=0, deleted=0)
Data files in this snapshot: 2
  s3://open-table-demo/tables/data/00000-0-3035acb8-676f-431b-bc5f-d2d915531cf8

**2. Time Travel**

In [None]:
snaps = list(table.snapshots())
for i, s in enumerate(snaps):
    print(f"{i}: snapshot_id={s.snapshot_id}  ts_ms={s.timestamp_ms}")

0: snapshot_id=1995780804529788759  ts_ms=1755503024722
1: snapshot_id=6258188288543477695  ts_ms=1755503026778
2: snapshot_id=7864378507232075376  ts_ms=1755503028693
3: snapshot_id=4951251162120782992  ts_ms=1755503030543


In [None]:
old_id = snaps[-2].snapshot_id
old_tbl = table.scan(snapshot_id=old_id).to_arrow()
cur_tbl = table.scan().to_arrow()

print("Old row count   :", old_tbl.num_rows)
print("Current row count:", cur_tbl.num_rows)

old_tbl.slice(0, 5), cur_tbl.slice(0, 5)

Old row count   : 1500
Current row count: 2000


(pyarrow.Table
 file_no: int64 not null
 id: int64 not null
 name: large_string
 category: large_string
 region: large_string
 value: int64
 price: double
 is_active: bool
 ts: timestamp[us]
 event_date: date32[day]
 note: large_string
 ----
 file_no: [[9,9,9,9,9]]
 id: [[4000,4001,4002,4003,4004]]
 name: [["Name_4000","Name_4001","Name_4002","Name_4003","Name_4004"]]
 category: [["C","C","C","A","D"]]
 region: [["LATAM","EMEA","EMEA","LATAM","APAC"]]
 value: [[771,444,964,629,858]]
 price: [[559.12,700.13,711.5,185.47,686.54]]
 is_active: [[false,true,false,true,false]]
 ts: [[2025-01-10 00:00:00.000000,2025-01-10 01:00:00.000000,2025-01-10 02:00:00.000000,2025-01-10 03:00:00.000000,2025-01-10 04:00:00.000000]]
 event_date: [[2025-01-10,2025-01-10,2025-01-10,2025-01-10,2025-01-10]]
 ...,
 pyarrow.Table
 file_no: int64 not null
 id: int64 not null
 name: large_string
 category: large_string
 region: large_string
 value: int64
 price: double
 is_active: bool
 ts: timestamp[us]
 event_da

In [None]:
def count_files(snapshot_id):
    tasks = table.scan(snapshot_id=snapshot_id).plan_files()
    return sum(t.file.record_count or 0 for t in tasks), len(list(tasks))

old_rows, old_files = count_files(old_id)
cur_rows, cur_files = count_files(table.current_snapshot().snapshot_id)

print(f"Old: {old_rows} rows across {old_files} files")
print(f"Cur: {cur_rows} rows across {cur_files} files")


Old: 1500 rows across 3 files
Cur: 2000 rows across 4 files


In [None]:

old_idx = -2
old_id = snaps[old_idx].snapshot_id

print(f"\n Switching to older snapshot (ID: {old_id})")
old_data = table.scan(snapshot_id=old_id).to_arrow()
print(f"Row count at this snapshot: {old_data.num_rows}")

old_data.to_pandas().head()



 Switching to older snapshot (ID: 7864378507232075376)
Row count at this snapshot: 1500


Unnamed: 0,file_no,id,name,category,region,value,price,is_active,ts,event_date,note
0,9,4000,Name_4000,C,LATAM,771,559.12,False,2025-01-10 00:00:00,2025-01-10,note_0
1,9,4001,Name_4001,C,EMEA,444,700.13,True,2025-01-10 01:00:00,2025-01-10,note_1
2,9,4002,Name_4002,C,EMEA,964,711.5,False,2025-01-10 02:00:00,2025-01-10,
3,9,4003,Name_4003,A,LATAM,629,185.47,True,2025-01-10 03:00:00,2025-01-10,note_3
4,9,4004,Name_4004,D,APAC,858,686.54,False,2025-01-10 04:00:00,2025-01-10,note_4


In [None]:
latest_id = snaps[-1].snapshot_id

print(f"\n Switching to latest snapshot (ID: {latest_id})")
latest_data = table.scan(snapshot_id=latest_id).to_arrow()
print(f"Row count at this snapshot: {latest_data.num_rows}")

latest_data.to_pandas().head()



 Switching to latest snapshot (ID: 4951251162120782992)
Row count at this snapshot: 2000


Unnamed: 0,file_no,id,name,category,region,value,price,is_active,ts,event_date,note
0,10,4500,Name_4500,A,APAC,249,32.83,False,2025-01-11 00:00:00,2025-01-11,
1,10,4501,Name_4501,D,EMEA,734,75.88,False,2025-01-11 01:00:00,2025-01-11,note_1
2,10,4502,Name_4502,D,APAC,558,402.19,False,2025-01-11 02:00:00,2025-01-11,note_2
3,10,4503,Name_4503,B,EMEA,727,817.48,True,2025-01-11 03:00:00,2025-01-11,note_3
4,10,4504,Name_4504,B,LATAM,441,945.71,True,2025-01-11 04:00:00,2025-01-11,note_4


In [None]:
import pandas as pd

df_old = old_data.to_pandas()
df_latest = latest_data.to_pandas()

print("\n Comparing row counts:")
print("Old snapshot:", len(df_old))
print("Latest snapshot:", len(df_latest))

print("\n First 5 rows from old snapshot:")
display(df_old.head())

print("\n First 5 rows from latest snapshot:")
display(df_latest.head())



 Comparing row counts:
Old snapshot: 1500
Latest snapshot: 2000

 First 5 rows from old snapshot:


Unnamed: 0,file_no,id,name,category,region,value,price,is_active,ts,event_date,note
0,9,4000,Name_4000,C,LATAM,771,559.12,False,2025-01-10 00:00:00,2025-01-10,note_0
1,9,4001,Name_4001,C,EMEA,444,700.13,True,2025-01-10 01:00:00,2025-01-10,note_1
2,9,4002,Name_4002,C,EMEA,964,711.5,False,2025-01-10 02:00:00,2025-01-10,
3,9,4003,Name_4003,A,LATAM,629,185.47,True,2025-01-10 03:00:00,2025-01-10,note_3
4,9,4004,Name_4004,D,APAC,858,686.54,False,2025-01-10 04:00:00,2025-01-10,note_4



 First 5 rows from latest snapshot:


Unnamed: 0,file_no,id,name,category,region,value,price,is_active,ts,event_date,note
0,10,4500,Name_4500,A,APAC,249,32.83,False,2025-01-11 00:00:00,2025-01-11,
1,10,4501,Name_4501,D,EMEA,734,75.88,False,2025-01-11 01:00:00,2025-01-11,note_1
2,10,4502,Name_4502,D,APAC,558,402.19,False,2025-01-11 02:00:00,2025-01-11,note_2
3,10,4503,Name_4503,B,EMEA,727,817.48,True,2025-01-11 03:00:00,2025-01-11,note_3
4,10,4504,Name_4504,B,LATAM,441,945.71,True,2025-01-11 04:00:00,2025-01-11,note_4


**3. Schema Evolution**

In [None]:
old_snap = table.current_snapshot()
print("Snapshot BEFORE schema evolution:", old_snap.snapshot_id)


Snapshot BEFORE schema evolution: 4951251162120782992


In [None]:
[f.name for f in table.schema().fields]


['file_no',
 'id',
 'name',
 'category',
 'region',
 'value',
 'price',
 'is_active',
 'ts',
 'event_date',
 'note']

In [None]:
from pyiceberg.types import StringType

us = table.update_schema()
us = us.add_column("new_column", StringType(), required=False)

us.commit()
print("Schema updated. New schema:")
for f in table.schema().fields:
    print(f"{f.field_id}: {f.name} - {type(f.field_type).__name__}, required={f.required}")


Schema updated. New schema:
1: file_no - LongType, required=True
2: id - LongType, required=True
3: name - StringType, required=False
4: category - StringType, required=False
5: region - StringType, required=False
6: value - LongType, required=False
7: price - DoubleType, required=False
8: is_active - BooleanType, required=False
9: ts - TimestampType, required=False
10: event_date - DateType, required=False
11: note - StringType, required=False
12: new_column - StringType, required=False


In [None]:
print(table.current_snapshot())

Operation.APPEND: id=4951251162120782992, parent_id=7864378507232075376, schema_id=0


In [None]:
us = table.update_schema()
us = us.delete_column("new_column")
us.commit()

print("Column has been deleted.")


Column has been deleted.


In [None]:
[f.name for f in table.schema().fields]


['file_no',
 'id',
 'name',
 'category',
 'region',
 'value',
 'price',
 'is_active',
 'ts',
 'event_date',
 'note']

In [None]:
snapshots_before = list(table.snapshots())
old_snapshot_id = snapshots_before[-1].snapshot_id

update = table.update_schema()
update = update.rename_column("note", "comment")
update.commit()

snapshots_after = list(table.snapshots())
new_snapshot_id = snapshots_after[-1].snapshot_id

In [None]:
print("Old snapshot ID:", old_snapshot_id)
print("New snapshot ID:", new_snapshot_id)

Old snapshot ID: 4951251162120782992
New snapshot ID: 4951251162120782992


In [None]:
old_arrow = table.scan(snapshot_id=old_snapshot_id).to_arrow()
print("\nSchema from old snapshot:")
for field in old_arrow.schema:
    print(f"{field.name} - {field.type}")


new_arrow = table.scan().to_arrow()
print("\nSchema from new snapshot:")
for field in new_arrow.schema:
    print(f"{field.name} - {field.type}")


Schema from old snapshot:
file_no - int64
id - int64
name - large_string
category - large_string
region - large_string
value - int64
price - double
is_active - bool
ts - timestamp[us]
event_date - date32[day]
note - large_string

Schema from new snapshot:
file_no - int64
id - int64
name - large_string
category - large_string
region - large_string
value - int64
price - double
is_active - bool
ts - timestamp[us]
event_date - date32[day]
comment - large_string


In [None]:
update = table.update_schema()
update = update.rename_column("comment", "note")
update.commit()

**4. Hidden Partitioning**

In [58]:
from pyiceberg.partitioning import PartitionSpec, PartitionField
# source_id : event_date - 10th one and field_id : virtual id for the new field
spec = PartitionSpec(
    PartitionField(source_id=10, field_id=2001, transform="year", name="event_year")
)

partitioned_name = f"{table_name}_by_year"
partitioned_loc = "s3://open-table-demo/results_by_year/"

pt = catalog.create_table_if_not_exists(
    f"{namespace}.{partitioned_name}",
    schema=table.schema(),
    location=partitioned_loc,
    partition_spec=spec
)

print("Partitioned table created:", pt.name())
print("Partition spec:", [ (f.name, f.transform) for f in pt.spec().fields ])


Partitioned table created: ('trials', 'observations_by_year')
Partition spec: [('event_year', YearTransform())]


In [59]:
subset = [load_as_arrow(fp) for fp in file_list[4:6]]
pt.append(pa.concat_tables(subset, promote=True))
print("Appended 2 files to partitioned table")


Appended 2 files to partitioned table
