From 7690ab06b7c5531eb10fa0d3a6c74988f509ede2 Mon Sep 17 00:00:00 2001 From: Pramila Bishnoi Date: Tue, 18 Nov 2025 16:25:54 +0530 Subject: [PATCH] docs: Add comprehensive Data Write Operations and Transaction API documentation --- mkdocs/docs/api.md | 395 ++++++++++++++++++++------------------------- 1 file changed, 171 insertions(+), 224 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index c36c24bc3e..23dc9b1475 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -47,6 +47,39 @@ catalog: warehouse: my-warehouse ``` +````python +from pyiceberg.catalog import load_catalog +from pyiceberg.expressions import EqualTo, LessThan, Or +import pyarrow as pa +import uuid + +# Set up a temporary in-memory catalog and create a sample table +CATALOG = load_catalog("local", type="in-memory") +CATALOG.create_namespace_if_not_exists("default") + +# Define a simple schema (ID, Data, Partition_Col) +SCHEMA = pa.schema([ + pa.field("id", pa.int32()), + pa.field("data_col", pa.string()), + pa.field("partition_col", pa.string()), +]) + +# Create a partitioned table for demonstration +TABLE = CATALOG.create_table( + f"default.write_test_{uuid.uuid4().hex[:6]}", + schema=SCHEMA, + partition_spec=f"partition_col" +) + +# Initial data load +initial_data = pa.table({ + "id": pa.array([1, 2, 3]), + "data_col": pa.array(["A", "B", "C"]), + "partition_col": pa.array(["X", "Y", "Z"]), +}) +TABLE.append(initial_data) + + The different catalogs can be loaded in PyIceberg by their name: `load_catalog(name="hive")` and `load_catalog(name="rest")`. An overview of the configuration options can be found on the [configuration page](https://py.iceberg.apache.org/configuration/). This information must be placed inside a file called `.pyiceberg.yaml` located either in the `$HOME` or `%USERPROFILE%` directory (depending on whether the operating system is Unix-based or Windows-based, respectively), in the current working directory, or in the `$PYICEBERG_HOME` directory (if the corresponding environment variable is set). @@ -66,7 +99,7 @@ catalog = load_catalog( "s3.secret-access-key": "password", } ) -``` +```` Next, create a namespace: @@ -196,7 +229,7 @@ static_table = StaticTable.from_metadata( ) ``` -The static-table does not allow for write operations. If your table metadata directory contains a `version-hint.text` file, you can just specify the table root path, and the latest `metadata.json` file will be resolved automatically: +The static-table does not allow for write operations. If your table metadata directory contains a `version-hint.text` file, you can just specify the table root path, and the latest `metadata.json` file will be resolved automatically: ```python from pyiceberg.table import StaticTable @@ -216,292 +249,206 @@ catalog.table_exists("docs_example.bids") Returns `True` if the table already exists. -## Write to a table +## Data Write Operations -Reading and writing is being done using [Apache Arrow](https://arrow.apache.org/). Arrow is an in-memory columnar format for fast data interchange and in-memory analytics. Let's consider the following Arrow Table: +PyIceberg tables offer several high-level methods for writing, deleting, and updating data. These methods are available directly on the `Table` object (Table API shorthand) and within the `Transaction` context manager (Transaction API). -```python -import pyarrow as pa +### Common Setup for Examples -df = pa.Table.from_pylist( - [ - {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, - {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, - {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, - {"city": "Paris", "lat": 48.864716, "long": 2.349014}, - ], -) -``` - -Next, create a table using the Arrow schema: +All examples below assume you have a loaded `Table` object and `pyarrow` is imported. ```python from pyiceberg.catalog import load_catalog +from pyiceberg.expressions import EqualTo, LessThan, Or +import pyarrow as pa +import uuid -catalog = load_catalog("default") - -tbl = catalog.create_table("default.cities", schema=df.schema) -``` - -Next, write the data to the table. Both `append` and `overwrite` produce the same result, since the table is empty on creation: +# Set up a temporary in-memory catalog and create a sample table +CATALOG = load_catalog("local", type="in-memory") +CATALOG.create_namespace_if_not_exists("default") - +# Define a simple schema (ID, Data, Partition_Col) +SCHEMA = pa.schema([ + pa.field("id", pa.int32()), + pa.field("data_col", pa.string()), + pa.field("partition_col", pa.string()), +]) -!!! note inline end "Fast append" - PyIceberg defaults to the [fast append](https://iceberg.apache.org/spec/#snapshots) to minimize the amount of data written. This enables fast commit operations, reducing the possibility of conflicts. The downside of the fast append is that it creates more metadata than a merge commit. [Compaction is planned](https://github.com/apache/iceberg-python/issues/270) and will automatically rewrite all the metadata when a threshold is hit, to maintain performant reads. +# Create a partitioned table for demonstration +TABLE = CATALOG.create_table( + f"default.write_test_{uuid.uuid4().hex[:6]}", + schema=SCHEMA, + partition_spec=f"partition_col" +) - +# Initial data load +initial_data = pa.table({ + "id": pa.array([1, 2, 3]), + "data_col": pa.array(["A", "B", "C"]), + "partition_col": pa.array(["X", "Y", "Z"]), +}) +TABLE.append(initial_data) +``` -```python -tbl.append(df) +--- -# or +### Write Mode: append() -tbl.overwrite(df) -``` +The `append` operation is the most straightforward method, used to **add new data** (rows) to the table without modifying or deleting any existing records. -Now, the data is written to the table, and the table can be read using `tbl.scan().to_arrow()`: +**Parameters:** -```python -pyarrow.Table -city: string -lat: double -long: double ----- -city: [["Amsterdam","San Francisco","Drachten","Paris"]] -lat: [[52.371807,37.773972,53.11254,48.864716]] -long: [[4.896029,-122.431297,6.0989,2.349014]] -``` +- **`df` (PyArrow Table):** The data to append. +- **`branch` (str, optional):** Specifies a branch or tag reference to append the data to. -If we want to add more data, we can use `.append()` again: +**Example: Appending New Rows** ```python -tbl.append(pa.Table.from_pylist( - [{"city": "Groningen", "lat": 53.21917, "long": 6.56667}], -)) -``` - -When reading the table `tbl.scan().to_arrow()` you can see that `Groningen` is now also part of the table: +# Data to append +new_rows = pa.table({ + "id": pa.array([4, 5]), + "data_col": pa.array(["D", "E"]), + "partition_col": pa.array(["X", "Y"]), +}) -```python -pyarrow.Table -city: string -lat: double -long: double ----- -city: [["Amsterdam","San Francisco","Drachten","Paris"],["Groningen"]] -lat: [[52.371807,37.773972,53.11254,48.864716],[53.21917]] -long: [[4.896029,-122.431297,6.0989,2.349014],[6.56667]] +# Perform the append operation +TABLE.append(new_rows) +# The table now contains rows 1, 2, 3, 4, and 5. ``` -The nested lists indicate the different Arrow buffers. Each of the writes produce a [Parquet file](https://parquet.apache.org/) where each [row group](https://parquet.apache.org/docs/concepts/) translates into an Arrow buffer. In the case where the table is large, PyIceberg also allows the option to stream the buffers using the Arrow [RecordBatchReader](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html), avoiding pulling everything into memory right away: +--- -```python -for buf in tbl.scan().to_arrow_batch_reader(): - print(f"Buffer contains {len(buf)} rows") -``` +### Write Mode: overwrite() -To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow: +The `overwrite` operation **replaces** data in the table. It works by first deleting rows that match the **`overwrite_filter`** and then appending the new input data (`df`). -```python -df = pa.Table.from_pylist( - [{"city": "Groningen", "lat": 53.21917, "long": 6.56667}], schema=table.schema().as_arrow() -) +**Key Parameters:** -tbl.append(df) -``` +- **`df` (PyArrow Table):** The new data that will be inserted after the delete step. +- **`overwrite_filter` (BooleanExpression or str, optional):** This is the **most critical** parameter. It defines which existing rows to delete. + - **Default (`ALWAYS_TRUE`):** Deletes and replaces the entire table content. + - **Custom Filter:** Only deletes rows matching the expression. -You can delete some of the data from the table by calling `tbl.delete()` with a desired `delete_filter`. This will use the Iceberg metadata to only open up the Parquet files that contain relevant information. +**Example: Partial Overwrite** ```python -tbl.delete(delete_filter="city == 'Paris'") -``` +# Data intended to update row with id=1 +update_data = pa.table({ + "id": pa.array([1]), + "data_col": pa.array(["UPDATED_A"]), + "partition_col": pa.array(["X"]), +}) -In the above example, any records where the city field value equals to `Paris` will be deleted. Running `tbl.scan().to_arrow()` will now yield: +# Define a filter to delete the existing row(s) where id = 1 +overwrite_expression = EqualTo("id", 1) -```python -pyarrow.Table -city: string -lat: double -long: double ----- -city: [["Amsterdam","San Francisco","Drachten"],["Groningen"]] -lat: [[52.371807,37.773972,53.11254],[53.21917]] -long: [[4.896029,-122.431297,6.0989],[6.56667]] +# The operation first deletes rows where id=1, then inserts the update_data. +TABLE.overwrite(update_data, overwrite_filter=overwrite_expression) +# Row with id=1 now has data_col='UPDATED_A'. ``` -In the case of `tbl.delete(delete_filter="city == 'Groningen'")`, the whole Parquet file will be dropped without checking it contents, since from the Iceberg metadata PyIceberg can derive that all the content in the file matches the predicate. +--- -### Partial overwrites +### Write Mode: delete() -When using the `overwrite` API, you can use an `overwrite_filter` to delete data that matches the filter before appending new data into the table. For example, consider the following Iceberg table: +The `delete` operation simply removes rows from the table that match the specified filter expression. No new data is added. -```python -import pyarrow as pa -df = pa.Table.from_pylist( - [ - {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, - {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, - {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, - {"city": "Paris", "lat": 48.864716, "long": 2.349014}, - ], -) +**Key Parameters:** -from pyiceberg.catalog import load_catalog -catalog = load_catalog("default") - -tbl = catalog.create_table("default.cities", schema=df.schema) - -tbl.append(df) -``` +- **`delete_filter` (BooleanExpression or str):** The required filter defining which rows to remove. -You can overwrite the record of `Paris` with a record of `New York`: +**Example: Deleting Rows** ```python -from pyiceberg.expressions import EqualTo -df = pa.Table.from_pylist( - [ - {"city": "New York", "lat": 40.7128, "long": 74.0060}, - ] -) -tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris")) -``` +# Define a filter to remove all rows in partition 'Z' +delete_expression = EqualTo("partition_col", "Z") -This produces the following result with `tbl.scan().to_arrow()`: - -```python -pyarrow.Table -city: large_string -lat: double -long: double ----- -city: [["New York"],["Amsterdam","San Francisco","Drachten"]] -lat: [[40.7128],[52.371807,37.773972,53.11254]] -long: [[74.006],[4.896029,-122.431297,6.0989]] +# Perform the delete operation +TABLE.delete(delete_filter=delete_expression) +# All rows where partition_col='Z' (row id=3 in the initial data) are removed. ``` -If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the existing partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically from the provided arrow table. -For example, with an iceberg table with a partition specified on `"city"` field: +--- -```python -from pyiceberg.schema import Schema -from pyiceberg.types import DoubleType, NestedField, StringType +### Write Mode: dynamic_partition_overwrite() -schema = Schema( - NestedField(1, "city", StringType(), required=False), - NestedField(2, "lat", DoubleType(), required=False), - NestedField(3, "long", DoubleType(), required=False), -) +This is a specialized form of overwrite that automatically determines the partition values present in the input data (`df`). It then deletes all existing data in those matching partitions and appends the new data. -tbl = catalog.create_table( - "default.cities", - schema=schema, - partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="city_identity")) -) -``` +**Important Note:** This mode only works correctly on partitioned tables where the partition keys use an Identity Transform (i.e., the column is used directly as the partition key, as in our setup example). -And we want to overwrite the data for the partition of `"Paris"`: +**Example: Dynamic Overwrite (replaces Partition 'X' and 'Y')** ```python -import pyarrow as pa +# New data that contains partition values 'X' and 'Y' +data_for_partitions_xy = pa.table({ + "id": pa.array([100, 101]), + "data_col": pa.array(["NEW_X", "NEW_Y"]), + "partition_col": pa.array(["X", "Y"]), # Partition values to overwrite +}) -df = pa.Table.from_pylist( - [ - {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, - {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, - {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, - {"city": "Paris", "lat": -48.864716, "long": -2.349014}, - ], -) -tbl.append(df) +# The operation detects 'X' and 'Y', deletes all existing data in those two partitions, +# and then writes the new data. +TABLE.dynamic_partition_overwrite(data_for_partitions_xy) +# Original rows with partition 'X' (id=1) and 'Y' (id=2) are gone, replaced by 100 and 101. ``` -Then we can call `dynamic_partition_overwrite` with this arrow table: +--- -```python -df_corrected = pa.Table.from_pylist([ - {"city": "Paris", "lat": 48.864716, "long": 2.349014} -]) -tbl.dynamic_partition_overwrite(df_corrected) -``` +### Write Mode: upsert() -This produces the following result with `tbl.scan().to_arrow()`: +The `upsert` (Update or Insert) operation performs a merge. It identifies rows in the input data (`df`) that already exist (based on `join_cols`) and updates them, while inserting the rows that do not exist. -```python -pyarrow.Table -city: large_string -lat: double -long: double ----- -city: [["Paris"],["Amsterdam"],["Drachten"],["San Francisco"]] -lat: [[48.864716],[52.371807],[53.11254],[37.773972]] -long: [[2.349014],[4.896029],[6.0989],[-122.431297]] -``` +**Key Parameters:** -### Upsert +- **`df` (PyArrow Table):** The input data for merging. +- **`join_cols` (list[str]):** The columns used to match rows between the input data and the table. Defaults to Iceberg's identifier fields if not provided. -PyIceberg supports upsert operations, meaning that it is able to merge an Arrow table into an Iceberg table. Rows are considered the same based on the [identifier field](https://iceberg.apache.org/spec/?column-projection#identifier-field-ids). If a row is already in the table, it will update that row. If a row cannot be found, it will insert that new row. - -Consider the following table, with some data: +**Example: Upsert Operation** ```python -from pyiceberg.schema import Schema -from pyiceberg.types import IntegerType, NestedField, StringType +# Data to upsert +upsert_data = pa.table({ + "id": pa.array([4, 99]), + "data_col": pa.array(["UPDATED_D_AGAIN", "NEW_F"]), # Row 4 exists, Row 99 is new + "partition_col": pa.array(["X", "A"]), +}) -import pyarrow as pa +# Perform the upsert using 'id' as the join key +result = TABLE.upsert(upsert_data, join_cols=["id"]) -schema = Schema( - NestedField(1, "city", StringType(), required=True), - NestedField(2, "inhabitants", IntegerType(), required=True), - # Mark City as the identifier field, also known as the primary-key - identifier_field_ids=[1] -) +print(f"Rows updated: {result.rows_updated}") # Should be 1 (for id=4) +print(f"Rows inserted: {result.rows_inserted}") # Should be 1 (for id=99) +``` -tbl = catalog.create_table("default.cities", schema=schema) +--- -arrow_schema = pa.schema( - [ - pa.field("city", pa.string(), nullable=False), - pa.field("inhabitants", pa.int32(), nullable=False), - ] -) +### Transaction API -# Write some data -df = pa.Table.from_pylist( - [ - {"city": "Amsterdam", "inhabitants": 921402}, - {"city": "San Francisco", "inhabitants": 808988}, - {"city": "Drachten", "inhabitants": 45019}, - {"city": "Paris", "inhabitants": 2103000}, - ], - schema=arrow_schema -) -tbl.append(df) -``` +The Transaction API provides a context manager to group multiple mutations (append, delete, overwrite) into a single, atomic commit. This guarantees that all operations succeed, or none of them do (roll back). -Next, we'll upsert a table into the Iceberg table: +**Example: Atomic Delete and Append** ```python -df = pa.Table.from_pylist( - [ - # Will be updated, the inhabitants has been updated - {"city": "Drachten", "inhabitants": 45505}, +# Use the same sample table from above - # New row, will be inserted - {"city": "Berlin", "inhabitants": 3432000}, +# Start a transaction block using the 'with' context manager +with TABLE.transaction() as tx: - # Ignored, already exists in the table - {"city": "Paris", "inhabitants": 2103000}, - ], - schema=arrow_schema -) -upd = tbl.upsert(df) + # 1. Delete rows in partition 'X' (staged for commit) + delete_expression = EqualTo("partition_col", "X") + tx.delete(delete_filter=delete_expression) -assert upd.rows_updated == 1 -assert upd.rows_inserted == 1 -``` + # 2. Append new, replacement data for partition 'X' (staged for commit) + replacement_data = pa.table({ + "id": pa.array([200, 201]), + "data_col": pa.array(["R_1", "R_2"]), + "partition_col": pa.array(["X", "X"]), + }) + tx.append(replacement_data) -PyIceberg will automatically detect which rows need to be updated, inserted or can simply be ignored. +# The changes (both the delete and the append) are committed to the table +# in a single operation when the 'with' block successfully finishes. +``` ## Inspecting tables @@ -996,7 +943,7 @@ readable_metrics: [ ``` !!! info - Content refers to type of content stored by the data file: `0` - `Data`, `1` - `Position Deletes`, `2` - `Equality Deletes` +Content refers to type of content stored by the data file: `0` - `Data`, `1` - `Position Deletes`, `2` - `Equality Deletes` To show only data files or delete files in the current snapshot, use `table.inspect.data_files()` and `table.inspect.delete_files()` respectively. @@ -1022,11 +969,11 @@ Expert Iceberg users may choose to commit existing parquet files to the Iceberg ### Usage -| Parameter | Required? | Type | Description | -| ------------------------- | --------- | -------------- | ----------------------------------------------------------------------- | -| `file_paths` | ✔️ | List[str] | The list of full file paths to be added as data files to the table | +| Parameter | Required? | Type | Description | +| ----------------------- | --------- | -------------- | ----------------------------------------------------------------------- | +| `file_paths` | ✔️ | List[str] | The list of full file paths to be added as data files to the table | | `snapshot_properties` | | Dict[str, str] | Properties to set for the new snapshot. Defaults to an empty dictionary | -| `check_duplicate_files` | | bool | Whether to check for duplicate files. Defaults to `True` | +| `check_duplicate_files` | | bool | Whether to check for duplicate files. Defaults to `True` | ### Example @@ -1935,7 +1882,7 @@ PyIceberg integrates with [Apache DataFusion](https://datafusion.apache.org/) th !!! warning "Experimental Feature" - The DataFusion integration is considered **experimental**. +The DataFusion integration is considered **experimental**. The integration has a few caveats: