In [1]:
import os
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (NestedField,
                             StringType, FloatType)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform
from pyiceberg.expressions import EqualTo
import pyarrow as pa
import pandas as pd

In [2]:
warehouse_path = "."

catalog = SqlCatalog(
    "default",
    **{
        "uri": "sqlite:///pyiceberg_cat.db",
        "warehouse": f"file://{warehouse_path}"
    }
)

In [3]:
print(catalog.properties)

{'uri': 'sqlite:///pyiceberg_cat.db', 'warehouse': 'file://.'}


In [4]:
# Create an Iceberg schema for the ingested data
schema = Schema(
    NestedField(1, "pos_id", StringType(), required=True),
    NestedField(2, "total_amount", FloatType(), required=True),
    NestedField(3, "device_make", StringType(), required=True),
    identifier_field_ids=[1]  # 'device_id' is the primary key
)

In [5]:
# Create a partition specification with device_id as the partition key
partition_spec = PartitionSpec(
    PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="pos_id")
)

In [6]:
# Create a namespace and an iceberg table
catalog.create_namespace_if_not_exists('pos_ns')
pos_table = catalog.create_table_if_not_exists(
    identifier='pos_ns.pos_sales',
    schema=schema,
    partition_spec=partition_spec
)

In [7]:
# Create the first batch of in-memory arrow records
initial_data = pa.table([
    pa.array(['wi27', 'wi35', 'wi86']),
    pa.array([350.5, 564.32, 101.85]),
    pa.array(['INGENICO', 'VERIFONE', 'MSWIPE'])
], schema=schema.as_arrow())

In [8]:
# Insert initial data
pos_table.overwrite(initial_data)

In [9]:
# Print a Pandas dataframe representation of the table data
print("\nInsert operation completed")
print(pos_table.scan().to_pandas())


Insert operation completed
  pos_id  total_amount device_make
0   wi27    350.500000    INGENICO
1   wi35    564.320007    VERIFONE
2   wi86    101.849998      MSWIPE


In [10]:
# Create an UPSERT batch of Arrow records where one fot he device_make is changed
upsert_data = pa.table([
   pa.array(['wi27', 'wi35', 'wi86']),
    pa.array([350.5, 564.32, 101.85]),
    pa.array(['INGENICO', 'VERIFONE', 'PINELABS'])
], schema=schema.as_arrow())

In [11]:
# UPSERT changed data
try:
    join_columns = ["pos_id"]
    upsert_result = pos_table.upsert(upsert_data.select(["pos_id", "total_amount", "device_make"]))
except Exception as e:
    print(e)

In [12]:
print("\nUpsert operation completed")
print(pos_table.scan().to_pandas())
print("\n")
print(f"Rows Updated: {upsert_result.rows_updated}")


Upsert operation completed
  pos_id  total_amount device_make
0   wi86    101.849998    PINELABS
1   wi27    350.500000    INGENICO
2   wi35    564.320007    VERIFONE


Rows Updated: 1


In [13]:
# Filter columns
print("\nFilter records with device_make == PINELABS ")
print(pos_table.scan(row_filter=EqualTo('device_make', 'PINELABS')).to_pandas())
print("\n")


Filter records with device_make == PINELABS 
  pos_id  total_amount device_make
0   wi86    101.849998    PINELABS




In [14]:
# Delete row
pos_table.delete(delete_filter=EqualTo('device_make', 'INGENICO'))
print("\n After Delete")
print(pos_table.scan().to_pandas())


 After Delete
  pos_id  total_amount device_make
0   wi86    101.849998    PINELABS
1   wi35    564.320007    VERIFONE
