Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions paimon-python/pypaimon/table/system/buckets_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""The ``$buckets`` system table — per-bucket aggregated stats."""

from typing import List, Optional

import pyarrow

from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.schema.data_types import AtomicType, DataField, RowType
from pypaimon.table.system.system_table import SystemTable


TABLE_TYPE = RowType(False, [
DataField(0, "partition", AtomicType("STRING", nullable=True)),
DataField(1, "bucket", AtomicType("INT", nullable=False)),
DataField(2, "record_count", AtomicType("BIGINT", nullable=False)),
DataField(3, "file_size_in_bytes", AtomicType("BIGINT", nullable=False)),
DataField(4, "file_count", AtomicType("BIGINT", nullable=False)),
DataField(5, "last_update_time", AtomicType("TIMESTAMP(3)", nullable=True)),
])


_TIMESTAMP_TYPE = pyarrow.timestamp("ms")


class BucketsTable(SystemTable):
"""The ``$buckets`` system table.

Aggregates manifest entries by (partition, bucket) to show per-bucket
record counts, file sizes, file counts and last update times.
"""

def system_table_name(self) -> str:
return "buckets"

def row_type(self) -> RowType:
return TABLE_TYPE

def primary_keys(self) -> List[str]:
return ["partition", "bucket"]

def _build_arrow_table(self) -> pyarrow.Table:
snapshot = self.base_table.snapshot_manager().get_latest_snapshot()
if snapshot is None:
return self._empty_table()

manifest_list_manager = ManifestListManager(self.base_table)
manifest_files = manifest_list_manager.read_all(snapshot)
manifest_file_manager = ManifestFileManager(self.base_table)
entries = manifest_file_manager.read_entries_parallel(
manifest_files, drop_stats=True)

_NULL = object()

bucket_map: dict = {}
for entry in entries:
raw_key = tuple(
(field.name, _NULL if value is None else value)
for field, value in zip(
entry.partition.fields, entry.partition.values))
bucket_id = int(entry.bucket)
key = (raw_key, bucket_id)

stats = bucket_map.get(key)
if stats is None:
render_items = tuple(
(name, str(val) if val is not _NULL else None)
for name, val in raw_key)
stats = {
"render_items": render_items,
"bucket": bucket_id,
"record_count": 0,
"file_size_in_bytes": 0,
"file_count": 0,
"last_update_time": None,
}
bucket_map[key] = stats

stats["record_count"] += int(entry.file.row_count)
stats["file_size_in_bytes"] += int(entry.file.file_size)
stats["file_count"] += 1
ct_ms = entry.file.creation_time_epoch_millis()
if ct_ms is not None:
if (stats["last_update_time"] is None
or ct_ms > stats["last_update_time"]):
stats["last_update_time"] = ct_ms

sorted_keys = sorted(
bucket_map.keys(),
key=lambda k: (
_render_partition(bucket_map[k]["render_items"]) or "",
k[1]))

partition_strings: List[Optional[str]] = []
buckets: List[int] = []
record_counts: List[int] = []
file_sizes: List[int] = []
file_counts: List[int] = []
last_update_times: List[Optional[int]] = []

for key in sorted_keys:
stats = bucket_map[key]
partition_strings.append(_render_partition(stats["render_items"]))
buckets.append(stats["bucket"])
record_counts.append(stats["record_count"])
file_sizes.append(stats["file_size_in_bytes"])
file_counts.append(stats["file_count"])
last_update_times.append(stats["last_update_time"])

return pyarrow.table({
"partition": pyarrow.array(
partition_strings, type=pyarrow.string()),
"bucket": pyarrow.array(buckets, type=pyarrow.int32()),
"record_count": pyarrow.array(
record_counts, type=pyarrow.int64()),
"file_size_in_bytes": pyarrow.array(
file_sizes, type=pyarrow.int64()),
"file_count": pyarrow.array(file_counts, type=pyarrow.int64()),
"last_update_time": pyarrow.array(
last_update_times, type=_TIMESTAMP_TYPE),
})

@staticmethod
def _empty_table() -> pyarrow.Table:
return pyarrow.table({
"partition": pyarrow.array([], type=pyarrow.string()),
"bucket": pyarrow.array([], type=pyarrow.int32()),
"record_count": pyarrow.array([], type=pyarrow.int64()),
"file_size_in_bytes": pyarrow.array([], type=pyarrow.int64()),
"file_count": pyarrow.array([], type=pyarrow.int64()),
"last_update_time": pyarrow.array([], type=_TIMESTAMP_TYPE),
})


def _render_partition(spec_items) -> Optional[str]:
"""Render a partition spec as ``pt=v/pt2=v2`` or None when empty.

Null partition values are rendered as ``__NULL__`` to distinguish them
from the literal string ``"None"``. A partition whose value is
literally ``"__NULL__"`` will produce the same rendered string —
aggregation keys are still distinct, but the displayed partition
column will collide. This is a display-only limitation.
"""
if not spec_items:
return None
return "/".join(
"{}={}".format(name, "__NULL__" if value is None else value)
for name, value in spec_items)
4 changes: 3 additions & 1 deletion paimon-python/pypaimon/table/system/system_table_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
The following short names are intentionally not registered here yet:

audit_log, binlog, read_optimized, consumers, statistics,
aggregation_fields, buckets, file_key_ranges, table_indexes,
aggregation_fields, file_key_ranges, table_indexes,
row_tracking, all_tables, all_partitions, all_table_options,
catalog_options
"""
Expand All @@ -44,6 +44,7 @@
"manifests",
"files",
"partitions",
"buckets",
"tags",
"branches",
)
Expand All @@ -66,6 +67,7 @@ def factory(base_table: "FileStoreTable") -> "SystemTable":
"manifests": _lazy("pypaimon.table.system.manifests_table", "ManifestsTable"),
"files": _lazy("pypaimon.table.system.files_table", "FilesTable"),
"partitions": _lazy("pypaimon.table.system.partitions_table", "PartitionsTable"),
"buckets": _lazy("pypaimon.table.system.buckets_table", "BucketsTable"),
"tags": _lazy("pypaimon.table.system.tags_table", "TagsTable"),
"branches": _lazy("pypaimon.table.system.branches_table", "BranchesTable"),
}
Expand Down
136 changes: 136 additions & 0 deletions paimon-python/pypaimon/tests/system/buckets_table_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""End-to-end tests for the ``$buckets`` system table."""

import os
import shutil
import tempfile
import unittest

import pyarrow as pa

from pypaimon import CatalogFactory, Schema
from pypaimon.schema.data_types import DataField
from pypaimon.table.system.buckets_table import BucketsTable


def _read(table):
rb = table.new_read_builder()
return rb.new_read().to_arrow(rb.new_scan().plan().splits())


class BucketsTableTest(unittest.TestCase):

def setUp(self):
self.tmp = tempfile.mkdtemp(prefix="buckets_sys_")
warehouse = os.path.join(self.tmp, "warehouse")
self.catalog = CatalogFactory.create({"warehouse": warehouse})
self.catalog.create_database("db", False)

def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)

def _create_partitioned_table(self, num_buckets=2):
fields = [
DataField.from_dict({"id": 0, "name": "id", "type": "INT"}),
DataField.from_dict({"id": 1, "name": "v", "type": "STRING"}),
DataField.from_dict({"id": 2, "name": "dt", "type": "STRING"}),
]
self.catalog.create_table(
"db.t",
Schema(
fields=fields,
partition_keys=["dt"],
options={"bucket": str(num_buckets)},
),
False,
)

def _write_data(self):
table = self.catalog.get_table("db.t")
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
commit = write_builder.new_commit()
writer.write_arrow(pa.table({
"id": pa.array([1, 2, 3, 4], type=pa.int32()),
"v": ["a", "b", "c", "d"],
"dt": ["2024-01-01", "2024-01-01", "2024-01-02", "2024-01-02"],
}))
commit.commit(writer.prepare_commit())
writer.close()
commit.close()

def test_buckets_table_loaded_via_catalog(self):
self._create_partitioned_table()
table = self.catalog.get_table("db.t$buckets")
self.assertIsInstance(table, BucketsTable)

def test_schema_column_layout(self):
self._create_partitioned_table()
table = self.catalog.get_table("db.t$buckets")
row_type = table.row_type()
expected = [
("partition", True), ("bucket", False),
("record_count", False), ("file_size_in_bytes", False),
("file_count", False), ("last_update_time", True),
]
self.assertEqual([n for n, _ in expected],
[f.name for f in row_type.fields])
for field, (_, expected_nullable) in zip(row_type.fields, expected):
self.assertEqual(expected_nullable, field.type.nullable,
"field {} nullability".format(field.name))
self.assertEqual(["partition", "bucket"], table.primary_keys())

def test_empty_when_no_snapshot_exists(self):
self._create_partitioned_table()
arrow_table = _read(self.catalog.get_table("db.t$buckets"))
self.assertEqual(0, arrow_table.num_rows)

def test_aggregates_by_partition_and_bucket(self):
self._create_partitioned_table(num_buckets=2)
self._write_data()

arrow_table = _read(self.catalog.get_table("db.t$buckets"))
self.assertGreater(arrow_table.num_rows, 0)

partitions = arrow_table.column("partition").to_pylist()
self.assertTrue(all(p in ("dt=2024-01-01", "dt=2024-01-02")
for p in partitions))

for size in arrow_table.column("file_size_in_bytes").to_pylist():
self.assertGreater(size, 0)
for count in arrow_table.column("file_count").to_pylist():
self.assertGreaterEqual(count, 1)

total_records = sum(arrow_table.column("record_count").to_pylist())
self.assertEqual(4, total_records)

def test_rows_sorted_by_partition_then_bucket(self):
self._create_partitioned_table(num_buckets=2)
self._write_data()

arrow_table = _read(self.catalog.get_table("db.t$buckets"))
partitions = arrow_table.column("partition").to_pylist()
buckets = arrow_table.column("bucket").to_pylist()

rows = list(zip(partitions, buckets))
self.assertEqual(rows, sorted(rows))


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"manifests",
"files",
"partitions",
"buckets",
"tags",
"branches",
)
Expand All @@ -41,7 +42,6 @@
"consumers",
"statistics",
"aggregation_fields",
"buckets",
"file_key_ranges",
"table_indexes",
"row_tracking",
Expand Down
Loading