Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
f8e561e
[python] Introduce read-only SystemTable base class
TheR1sing3un May 19, 2026
981dc8a
[python] Add SystemTableLoader registry
TheR1sing3un May 19, 2026
9400b2f
[python] Route $-suffixed identifiers to SystemTableLoader in Filesys…
TheR1sing3un May 19, 2026
8b8658c
[python] Route $-suffixed identifiers to SystemTableLoader in RESTCat…
TheR1sing3un May 19, 2026
b230060
[python] Add in-memory read pipeline for system tables
TheR1sing3un May 19, 2026
7f27c6c
[python] Add bulk-listing and mtime helpers needed by system tables
TheR1sing3un May 19, 2026
4e47569
[python] Implement $options system table
TheR1sing3un May 19, 2026
a5d1ac5
[python] Implement $branches system table
TheR1sing3un May 19, 2026
c66ab8f
[python] Implement $tags system table
TheR1sing3un May 19, 2026
0b86981
[python] Implement $schemas system table
TheR1sing3un May 19, 2026
46f7492
[python] Implement $snapshots system table
TheR1sing3un May 19, 2026
1ec12f8
[python] Implement $manifests system table
TheR1sing3un May 19, 2026
1a14a8b
[python] Implement $partitions system table
TheR1sing3un May 19, 2026
57ea99c
[python] Implement $files system table
TheR1sing3un May 19, 2026
92d4ded
[python] Regression: list_tables hides system-table suffixes
TheR1sing3un May 19, 2026
b0f5cec
[python] Document PyPaimon system tables
TheR1sing3un May 19, 2026
027c6b2
[python] Drop internal phasing language from system-table comments an…
TheR1sing3un May 19, 2026
c21ed2c
[python] Tighten system-tables doc example
TheR1sing3un May 19, 2026
9951236
[python] Align $branches with Java by reading mtime via FileIO directly
TheR1sing3un May 19, 2026
bd6f04b
[python] Drop cross-runtime alignment phrasing from system-table notes
TheR1sing3un May 19, 2026
311c64f
[python] Trigger CI
TheR1sing3un May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 235 additions & 0 deletions docs/content/pypaimon/system-tables.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
---
title: "System Tables"
weight: 6
type: docs
aliases:
- /pypaimon/system-tables.html
---

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# System Tables

PyPaimon exposes `table$<name>` system tables through the existing
catalog and read-builder APIs. The supported short names are:
`snapshots`, `schemas`, `options`, `manifests`, `files`, `partitions`,
`tags`, and `branches`. Global tables under the `sys` database
(`sys.all_tables`, `sys.catalog_options`, ...) and the streaming
`audit_log` / `binlog` family are not exposed yet.

## Basic Usage

Reuse a single read builder for both the scan and the read so that any
projection or limit set on it is honoured by both sides:

```python
from pypaimon import CatalogFactory

catalog = CatalogFactory.create({'warehouse': '/path/to/warehouse'})
snapshots = catalog.get_table('default.my_table$snapshots')

read_builder = snapshots.new_read_builder()
splits = read_builder.new_scan().plan().splits()
print(read_builder.new_read().to_pandas(splits))
```

`with_projection` and `with_limit` chain on the same builder:

```python
read_builder = (
snapshots.new_read_builder()
.with_projection(['snapshot_id', 'commit_user', 'commit_time'])
.with_limit(10)
)
splits = read_builder.new_scan().plan().splits()
arrow_table = read_builder.new_read().to_arrow(splits)
```

The returned object exposes the regular `Table` surface, so the same
read builder works with `to_pandas`, `to_arrow`, `to_iterator`,
`to_record_batch_iterator`, and `to_duckdb`. Writes raise
`NotImplementedError` — system tables are read-only.

## Available Tables

Each system table is listed below with its column layout (including
nullability) and primary-key choice. Tables are listed in the order
they appear in `SystemTableLoader`.

### `$snapshots`

One row per persisted snapshot.

| Column | Type | Notes |
|---------------------------|-----------------|--------------------------------|
| `snapshot_id` | BIGINT NOT NULL | Primary key |
| `schema_id` | BIGINT NOT NULL | |
| `commit_user` | STRING NOT NULL | |
| `commit_identifier` | BIGINT NOT NULL | |
| `commit_kind` | STRING NOT NULL | `APPEND`, `COMPACT`, ... |
| `commit_time` | TIMESTAMP(3) NOT NULL | |
| `base_manifest_list` | STRING NOT NULL | |
| `delta_manifest_list` | STRING NOT NULL | |
| `changelog_manifest_list` | STRING | |
| `total_record_count` | BIGINT | |
| `delta_record_count` | BIGINT | |
| `changelog_record_count` | BIGINT | |
| `watermark` | BIGINT | |
| `next_row_id` | BIGINT | |

### `$schemas`

Every committed schema version, with `fields` / `partition_keys` /
`primary_keys` / `options` encoded as compact JSON strings.

| Column | Type | Notes |
|------------------|-----------------|--------------|
| `schema_id` | BIGINT NOT NULL | Primary key |
| `fields` | STRING NOT NULL | JSON |
| `partition_keys` | STRING NOT NULL | JSON list |
| `primary_keys` | STRING NOT NULL | JSON list |
| `options` | STRING NOT NULL | JSON map |
| `comment` | STRING | |
| `update_time` | TIMESTAMP(3) NOT NULL | |

### `$options`

Two columns echoing the active table options.

| Column | Type | Notes |
|---------|-----------------|--------------|
| `key` | STRING NOT NULL | Primary key |
| `value` | STRING NOT NULL | |

### `$manifests`

Manifest list for the latest snapshot.

| Column | Type | Notes |
|-----------------------|-----------------|-------------------------------|
| `file_name` | STRING NOT NULL | Primary key |
| `file_size` | BIGINT NOT NULL | |
| `num_added_files` | BIGINT NOT NULL | |
| `num_deleted_files` | BIGINT NOT NULL | |
| `schema_id` | BIGINT NOT NULL | |
| `min_partition_stats` | STRING | Placeholder (see Limitations) |
| `max_partition_stats` | STRING | Placeholder (see Limitations) |
| `min_row_id` | BIGINT | |
| `max_row_id` | BIGINT | |

### `$files`

One row per ADD entry surviving the latest snapshot. Stats columns are
compact JSON dictionaries keyed by column name. The wire name
`deleteRowCount` is intentionally camelCase.

| Column | Type | Notes |
|-------------------------|---------------------|-----------------------------|
| `partition` | STRING | `pt=v/pt2=v2` |
| `bucket` | INT NOT NULL | |
| `file_path` | STRING NOT NULL | Primary key |
| `file_format` | STRING NOT NULL | |
| `schema_id` | BIGINT NOT NULL | |
| `level` | INT NOT NULL | |
| `record_count` | BIGINT NOT NULL | |
| `file_size_in_bytes` | BIGINT NOT NULL | |
| `min_key` | STRING | JSON list (PK tables only) |
| `max_key` | STRING | JSON list (PK tables only) |
| `null_value_counts` | STRING NOT NULL | JSON map |
| `min_value_stats` | STRING NOT NULL | JSON map |
| `max_value_stats` | STRING NOT NULL | JSON map |
| `min_sequence_number` | BIGINT | |
| `max_sequence_number` | BIGINT | |
| `creation_time` | TIMESTAMP(3) | |
| `deleteRowCount` | BIGINT | camelCase wire name |
| `file_source` | STRING | |
| `first_row_id` | BIGINT | |
| `write_cols` | ARRAY<STRING> | |

### `$partitions`

Aggregated partition statistics for the latest snapshot.

| Column | Type | Notes |
|-----------------------|-----------------------|--------------------------------|
| `partition` | STRING | `pt=v/pt2=v2`; primary key |
| `record_count` | BIGINT NOT NULL | |
| `file_size_in_bytes` | BIGINT NOT NULL | |
| `file_count` | BIGINT NOT NULL | |
| `last_update_time` | TIMESTAMP(3) | |
| `created_at` | TIMESTAMP(3) | Filesystem path returns `NULL` |
| `created_by` | STRING | Filesystem path returns `NULL` |
| `updated_by` | STRING | Filesystem path returns `NULL` |
| `options` | STRING | Filesystem path returns `NULL` |
| `total_buckets` | INT NOT NULL | |
| `done` | BOOLEAN NOT NULL | Filesystem path returns `False`|

### `$tags`

Snapshot metadata for every tag.

| Column | Type | Notes |
|-----------------|-----------------------|--------------------------------|
| `tag_name` | STRING NOT NULL | Primary key |
| `snapshot_id` | BIGINT NOT NULL | |
| `schema_id` | BIGINT NOT NULL | |
| `commit_time` | TIMESTAMP(3) NOT NULL | |
| `record_count` | BIGINT | |
| `create_time` | TIMESTAMP(3) | Currently emitted as `NULL` |
| `time_retained` | STRING | Currently emitted as `NULL` |

### `$branches`

Every named branch with the branch directory's modification time.

| Column | Type | Notes |
|---------------|-----------------------|--------------|
| `branch_name` | STRING NOT NULL | Primary key |
| `create_time` | TIMESTAMP(3) NOT NULL | |

## Limitations

* **Predicate pushdown is not yet implemented.** Calling
`with_filter(...)` is accepted, but invoking `new_read()` later will
raise `NotImplementedError` rather than silently dropping the
predicate. Filter the resulting Arrow table / DataFrame on the
client side instead.
* **`min_partition_stats` / `max_partition_stats` in `$manifests`**
are emitted as `NULL`. PyPaimon does not yet ship a helper that casts
a partition row to its string form.
* **`tag.time_retained` and `tag.create_time` are `NULL`.** PyPaimon's
`Tag` dataclass does not yet carry these fields — matching
`FileSystemCatalog.get_tag`'s current behaviour.
* **`branch.create_time` falls back to epoch 0** when the underlying
store cannot provide an mtime (some remote object stores via
`PyArrowFileIO`). Local filesystem catalogs always populate the
real time.
* **`partitions.created_at / created_by / updated_by / options / done`**
are filled with placeholders for the filesystem path. REST-managed
catalogs that expose those fields will be wired in a follow-up.
* **`list_tables` does not enumerate system tables.** System tables
remain accessible through `get_table('db.t$name')`.

## Supported via Catalogs

* `FilesystemCatalog` — fully supported.
* `RESTCatalog` — fully supported; columns that depend on catalog
metadata (such as `$partitions.created_by`) are populated via the
REST API where the server exposes them.
21 changes: 21 additions & 0 deletions paimon-python/pypaimon/catalog/filesystem_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ def list_tables(self, database_name: str) -> list:
def get_table(self, identifier: Union[str, Identifier]) -> Table:
if not isinstance(identifier, Identifier):
identifier = Identifier.from_string(identifier)
if identifier.is_system_table():
return self._load_system_table(identifier)
return self._load_data_table(identifier)

def _load_data_table(self, identifier: Identifier) -> FileStoreTable:
if self.catalog_options.contains(CoreOptions.SCAN_FALLBACK_BRANCH):
raise ValueError(f"Unsupported CoreOption {CoreOptions.SCAN_FALLBACK_BRANCH}")
table_path = self.get_table_path(identifier)
Expand All @@ -147,6 +152,22 @@ def get_table(self, identifier: Union[str, Identifier]) -> Table:

return FileStoreTable(self.file_io, identifier, table_path, table_schema, catalog_environment)

def _load_system_table(self, identifier: Identifier) -> Table:
from pypaimon.table.system import system_table_loader

base_identifier = Identifier.create(
identifier.get_database_name(),
identifier.get_table_name(),
branch=identifier.get_branch_name(),
)
base_table = self._load_data_table(base_identifier)
sys_table = system_table_loader.load(
identifier.get_system_table_name(), base_table
)
if sys_table is None:
raise TableNotExistException(identifier)
return sys_table

def create_table(self, identifier: Union[str, Identifier], schema: 'Schema', ignore_if_exists: bool):
if schema.options and schema.options.get(CoreOptions.AUTO_CREATE.key()):
raise ValueError(f"The value of {CoreOptions.AUTO_CREATE.key()} property should be False.")
Expand Down
21 changes: 21 additions & 0 deletions paimon-python/pypaimon/catalog/rest/rest_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,34 @@ def list_tables_paged(
def get_table(self, identifier: Union[str, Identifier]):
if not isinstance(identifier, Identifier):
identifier = Identifier.from_string(identifier)
if identifier.is_system_table():
return self._load_system_table(identifier)
return self._load_data_table(identifier)

def _load_data_table(self, identifier: Identifier):
return self.load_table(
identifier,
lambda path: self.file_io_for_data(path, identifier),
self.file_io_from_options,
self.load_table_metadata,
)

def _load_system_table(self, identifier: Identifier):
from pypaimon.table.system import system_table_loader

base_identifier = Identifier.create(
identifier.get_database_name(),
identifier.get_table_name(),
branch=identifier.get_branch_name(),
)
base_table = self._load_data_table(base_identifier)
sys_table = system_table_loader.load(
identifier.get_system_table_name(), base_table
)
if sys_table is None:
raise TableNotExistException(identifier)
return sys_table

def create_table(self, identifier: Union[str, Identifier], schema: Schema, ignore_if_exists: bool):
if not isinstance(identifier, Identifier):
identifier = Identifier.from_string(identifier)
Expand Down
14 changes: 14 additions & 0 deletions paimon-python/pypaimon/schema/schema_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,20 @@ def latest(self) -> Optional['TableSchema']:
except Exception as e:
raise RuntimeError(f"Failed to load schema from path: {self.schema_path}") from e

def list_all(self) -> List['TableSchema']:
"""Return every committed schema in ascending ID order.

Missing IDs (deleted on disk after expiry, for instance) are
skipped.
"""
ids = sorted(self._list_versioned_files())
schemas: List['TableSchema'] = []
for schema_id in ids:
schema = self.get_schema(schema_id)
if schema is not None:
schemas.append(schema)
return schemas

def create_table(self, schema: Schema) -> TableSchema:
while True:
latest = self.latest()
Expand Down
33 changes: 33 additions & 0 deletions paimon-python/pypaimon/snapshot/snapshot_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,39 @@ def earlier_or_equal_time_mills(self, timestamp: int) -> Optional[Snapshot]:

return final_snapshot

def list_snapshots(self) -> List[Snapshot]:
"""Return every persisted snapshot in ascending ID order.

Scans ``snapshot_dir`` for ``snapshot-N`` files and decodes
each. IDs whose file is missing (because a previous expire
cleaned them) are skipped silently, so the result reflects
only snapshots that can still be inspected.
"""
import re

if not self.file_io.exists(self.snapshot_dir):
return []

file_infos = self.file_io.list_status(self.snapshot_dir)
if file_infos is None:
return []

pattern = re.compile(r'^snapshot-(\d+)$')
ids = []
for file_info in file_infos:
name = file_info.path.split('/')[-1]
match = pattern.match(name)
if match:
ids.append(int(match.group(1)))
ids.sort()

snapshots: List[Snapshot] = []
for snapshot_id in ids:
snapshot = self.get_snapshot_by_id(snapshot_id)
if snapshot is not None:
snapshots.append(snapshot)
return snapshots

def get_snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
"""
Get a snapshot by its ID.
Expand Down
20 changes: 20 additions & 0 deletions paimon-python/pypaimon/table/system/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from pypaimon.table.system.system_table import SystemTable

__all__ = ["SystemTable"]
Loading
Loading