diff --git a/paimon-python/pypaimon/table/system/buckets_table.py b/paimon-python/pypaimon/table/system/buckets_table.py new file mode 100644 index 000000000000..ddd5e8c6c6f0 --- /dev/null +++ b/paimon-python/pypaimon/table/system/buckets_table.py @@ -0,0 +1,165 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""The ``$buckets`` system table — per-bucket aggregated stats.""" + +from typing import List, Optional + +import pyarrow + +from pypaimon.manifest.manifest_file_manager import ManifestFileManager +from pypaimon.manifest.manifest_list_manager import ManifestListManager +from pypaimon.schema.data_types import AtomicType, DataField, RowType +from pypaimon.table.system.system_table import SystemTable + + +TABLE_TYPE = RowType(False, [ + DataField(0, "partition", AtomicType("STRING", nullable=True)), + DataField(1, "bucket", AtomicType("INT", nullable=False)), + DataField(2, "record_count", AtomicType("BIGINT", nullable=False)), + DataField(3, "file_size_in_bytes", AtomicType("BIGINT", nullable=False)), + DataField(4, "file_count", AtomicType("BIGINT", nullable=False)), + DataField(5, "last_update_time", AtomicType("TIMESTAMP(3)", nullable=True)), +]) + + +_TIMESTAMP_TYPE = pyarrow.timestamp("ms") + + +class BucketsTable(SystemTable): + """The ``$buckets`` system table. + + Aggregates manifest entries by (partition, bucket) to show per-bucket + record counts, file sizes, file counts and last update times. + """ + + def system_table_name(self) -> str: + return "buckets" + + def row_type(self) -> RowType: + return TABLE_TYPE + + def primary_keys(self) -> List[str]: + return ["partition", "bucket"] + + def _build_arrow_table(self) -> pyarrow.Table: + snapshot = self.base_table.snapshot_manager().get_latest_snapshot() + if snapshot is None: + return self._empty_table() + + manifest_list_manager = ManifestListManager(self.base_table) + manifest_files = manifest_list_manager.read_all(snapshot) + manifest_file_manager = ManifestFileManager(self.base_table) + entries = manifest_file_manager.read_entries_parallel( + manifest_files, drop_stats=True) + + _NULL = object() + + bucket_map: dict = {} + for entry in entries: + raw_key = tuple( + (field.name, _NULL if value is None else value) + for field, value in zip( + entry.partition.fields, entry.partition.values)) + bucket_id = int(entry.bucket) + key = (raw_key, bucket_id) + + stats = bucket_map.get(key) + if stats is None: + render_items = tuple( + (name, str(val) if val is not _NULL else None) + for name, val in raw_key) + stats = { + "render_items": render_items, + "bucket": bucket_id, + "record_count": 0, + "file_size_in_bytes": 0, + "file_count": 0, + "last_update_time": None, + } + bucket_map[key] = stats + + stats["record_count"] += int(entry.file.row_count) + stats["file_size_in_bytes"] += int(entry.file.file_size) + stats["file_count"] += 1 + ct_ms = entry.file.creation_time_epoch_millis() + if ct_ms is not None: + if (stats["last_update_time"] is None + or ct_ms > stats["last_update_time"]): + stats["last_update_time"] = ct_ms + + sorted_keys = sorted( + bucket_map.keys(), + key=lambda k: ( + _render_partition(bucket_map[k]["render_items"]) or "", + k[1])) + + partition_strings: List[Optional[str]] = [] + buckets: List[int] = [] + record_counts: List[int] = [] + file_sizes: List[int] = [] + file_counts: List[int] = [] + last_update_times: List[Optional[int]] = [] + + for key in sorted_keys: + stats = bucket_map[key] + partition_strings.append(_render_partition(stats["render_items"])) + buckets.append(stats["bucket"]) + record_counts.append(stats["record_count"]) + file_sizes.append(stats["file_size_in_bytes"]) + file_counts.append(stats["file_count"]) + last_update_times.append(stats["last_update_time"]) + + return pyarrow.table({ + "partition": pyarrow.array( + partition_strings, type=pyarrow.string()), + "bucket": pyarrow.array(buckets, type=pyarrow.int32()), + "record_count": pyarrow.array( + record_counts, type=pyarrow.int64()), + "file_size_in_bytes": pyarrow.array( + file_sizes, type=pyarrow.int64()), + "file_count": pyarrow.array(file_counts, type=pyarrow.int64()), + "last_update_time": pyarrow.array( + last_update_times, type=_TIMESTAMP_TYPE), + }) + + @staticmethod + def _empty_table() -> pyarrow.Table: + return pyarrow.table({ + "partition": pyarrow.array([], type=pyarrow.string()), + "bucket": pyarrow.array([], type=pyarrow.int32()), + "record_count": pyarrow.array([], type=pyarrow.int64()), + "file_size_in_bytes": pyarrow.array([], type=pyarrow.int64()), + "file_count": pyarrow.array([], type=pyarrow.int64()), + "last_update_time": pyarrow.array([], type=_TIMESTAMP_TYPE), + }) + + +def _render_partition(spec_items) -> Optional[str]: + """Render a partition spec as ``pt=v/pt2=v2`` or None when empty. + + Null partition values are rendered as ``__NULL__`` to distinguish them + from the literal string ``"None"``. A partition whose value is + literally ``"__NULL__"`` will produce the same rendered string — + aggregation keys are still distinct, but the displayed partition + column will collide. This is a display-only limitation. + """ + if not spec_items: + return None + return "/".join( + "{}={}".format(name, "__NULL__" if value is None else value) + for name, value in spec_items) diff --git a/paimon-python/pypaimon/table/system/system_table_loader.py b/paimon-python/pypaimon/table/system/system_table_loader.py index 9d0576aec1c3..72b758947d2e 100644 --- a/paimon-python/pypaimon/table/system/system_table_loader.py +++ b/paimon-python/pypaimon/table/system/system_table_loader.py @@ -24,7 +24,7 @@ The following short names are intentionally not registered here yet: audit_log, binlog, read_optimized, consumers, statistics, - aggregation_fields, buckets, file_key_ranges, table_indexes, + aggregation_fields, file_key_ranges, table_indexes, row_tracking, all_tables, all_partitions, all_table_options, catalog_options """ @@ -44,6 +44,7 @@ "manifests", "files", "partitions", + "buckets", "tags", "branches", ) @@ -66,6 +67,7 @@ def factory(base_table: "FileStoreTable") -> "SystemTable": "manifests": _lazy("pypaimon.table.system.manifests_table", "ManifestsTable"), "files": _lazy("pypaimon.table.system.files_table", "FilesTable"), "partitions": _lazy("pypaimon.table.system.partitions_table", "PartitionsTable"), + "buckets": _lazy("pypaimon.table.system.buckets_table", "BucketsTable"), "tags": _lazy("pypaimon.table.system.tags_table", "TagsTable"), "branches": _lazy("pypaimon.table.system.branches_table", "BranchesTable"), } diff --git a/paimon-python/pypaimon/tests/system/buckets_table_test.py b/paimon-python/pypaimon/tests/system/buckets_table_test.py new file mode 100644 index 000000000000..57a352392a24 --- /dev/null +++ b/paimon-python/pypaimon/tests/system/buckets_table_test.py @@ -0,0 +1,136 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""End-to-end tests for the ``$buckets`` system table.""" + +import os +import shutil +import tempfile +import unittest + +import pyarrow as pa + +from pypaimon import CatalogFactory, Schema +from pypaimon.schema.data_types import DataField +from pypaimon.table.system.buckets_table import BucketsTable + + +def _read(table): + rb = table.new_read_builder() + return rb.new_read().to_arrow(rb.new_scan().plan().splits()) + + +class BucketsTableTest(unittest.TestCase): + + def setUp(self): + self.tmp = tempfile.mkdtemp(prefix="buckets_sys_") + warehouse = os.path.join(self.tmp, "warehouse") + self.catalog = CatalogFactory.create({"warehouse": warehouse}) + self.catalog.create_database("db", False) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def _create_partitioned_table(self, num_buckets=2): + fields = [ + DataField.from_dict({"id": 0, "name": "id", "type": "INT"}), + DataField.from_dict({"id": 1, "name": "v", "type": "STRING"}), + DataField.from_dict({"id": 2, "name": "dt", "type": "STRING"}), + ] + self.catalog.create_table( + "db.t", + Schema( + fields=fields, + partition_keys=["dt"], + options={"bucket": str(num_buckets)}, + ), + False, + ) + + def _write_data(self): + table = self.catalog.get_table("db.t") + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write() + commit = write_builder.new_commit() + writer.write_arrow(pa.table({ + "id": pa.array([1, 2, 3, 4], type=pa.int32()), + "v": ["a", "b", "c", "d"], + "dt": ["2024-01-01", "2024-01-01", "2024-01-02", "2024-01-02"], + })) + commit.commit(writer.prepare_commit()) + writer.close() + commit.close() + + def test_buckets_table_loaded_via_catalog(self): + self._create_partitioned_table() + table = self.catalog.get_table("db.t$buckets") + self.assertIsInstance(table, BucketsTable) + + def test_schema_column_layout(self): + self._create_partitioned_table() + table = self.catalog.get_table("db.t$buckets") + row_type = table.row_type() + expected = [ + ("partition", True), ("bucket", False), + ("record_count", False), ("file_size_in_bytes", False), + ("file_count", False), ("last_update_time", True), + ] + self.assertEqual([n for n, _ in expected], + [f.name for f in row_type.fields]) + for field, (_, expected_nullable) in zip(row_type.fields, expected): + self.assertEqual(expected_nullable, field.type.nullable, + "field {} nullability".format(field.name)) + self.assertEqual(["partition", "bucket"], table.primary_keys()) + + def test_empty_when_no_snapshot_exists(self): + self._create_partitioned_table() + arrow_table = _read(self.catalog.get_table("db.t$buckets")) + self.assertEqual(0, arrow_table.num_rows) + + def test_aggregates_by_partition_and_bucket(self): + self._create_partitioned_table(num_buckets=2) + self._write_data() + + arrow_table = _read(self.catalog.get_table("db.t$buckets")) + self.assertGreater(arrow_table.num_rows, 0) + + partitions = arrow_table.column("partition").to_pylist() + self.assertTrue(all(p in ("dt=2024-01-01", "dt=2024-01-02") + for p in partitions)) + + for size in arrow_table.column("file_size_in_bytes").to_pylist(): + self.assertGreater(size, 0) + for count in arrow_table.column("file_count").to_pylist(): + self.assertGreaterEqual(count, 1) + + total_records = sum(arrow_table.column("record_count").to_pylist()) + self.assertEqual(4, total_records) + + def test_rows_sorted_by_partition_then_bucket(self): + self._create_partitioned_table(num_buckets=2) + self._write_data() + + arrow_table = _read(self.catalog.get_table("db.t$buckets")) + partitions = arrow_table.column("partition").to_pylist() + buckets = arrow_table.column("bucket").to_pylist() + + rows = list(zip(partitions, buckets)) + self.assertEqual(rows, sorted(rows)) + + +if __name__ == "__main__": + unittest.main() diff --git a/paimon-python/pypaimon/tests/system/system_table_loader_test.py b/paimon-python/pypaimon/tests/system/system_table_loader_test.py index 1c8690a2bcb8..07b959a2c6c2 100644 --- a/paimon-python/pypaimon/tests/system/system_table_loader_test.py +++ b/paimon-python/pypaimon/tests/system/system_table_loader_test.py @@ -28,6 +28,7 @@ "manifests", "files", "partitions", + "buckets", "tags", "branches", ) @@ -41,7 +42,6 @@ "consumers", "statistics", "aggregation_fields", - "buckets", "file_key_ranges", "table_indexes", "row_tracking",