diff --git a/paimon-python/pypaimon/stats/__init__.py b/paimon-python/pypaimon/stats/__init__.py new file mode 100644 index 000000000000..e2a622f6df32 --- /dev/null +++ b/paimon-python/pypaimon/stats/__init__.py @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pypaimon.stats.col_stats import ColStats +from pypaimon.stats.statistics import Statistics +from pypaimon.stats.stats_file_handler import StatsFileHandler +from pypaimon.stats.statistics_collector import StatisticsCollector + +__all__ = [ + "ColStats", + "Statistics", + "StatsFileHandler", + "StatisticsCollector", +] diff --git a/paimon-python/pypaimon/stats/col_stats.py b/paimon-python/pypaimon/stats/col_stats.py new file mode 100644 index 000000000000..c455cfc76cb0 --- /dev/null +++ b/paimon-python/pypaimon/stats/col_stats.py @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from dataclasses import dataclass +from typing import Any, Dict, Optional + + +@dataclass +class ColStats: + """Per-column statistics. Mirrors Java org.apache.paimon.stats.ColStats.""" + + col_id: int + distinct_count: Optional[int] = None + min: Optional[str] = None + max: Optional[str] = None + null_count: Optional[int] = None + avg_len: Optional[int] = None + max_len: Optional[int] = None + + def to_dict(self) -> Dict[str, Any]: + result: Dict[str, Any] = {"colId": self.col_id} + if self.distinct_count is not None: + result["distinctCount"] = self.distinct_count + if self.min is not None: + result["min"] = self.min + if self.max is not None: + result["max"] = self.max + if self.null_count is not None: + result["nullCount"] = self.null_count + if self.avg_len is not None: + result["avgLen"] = self.avg_len + if self.max_len is not None: + result["maxLen"] = self.max_len + return result + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'ColStats': + return cls( + col_id=data["colId"], + distinct_count=data.get("distinctCount"), + min=data.get("min"), + max=data.get("max"), + null_count=data.get("nullCount"), + avg_len=data.get("avgLen"), + max_len=data.get("maxLen"), + ) diff --git a/paimon-python/pypaimon/stats/statistics.py b/paimon-python/pypaimon/stats/statistics.py new file mode 100644 index 000000000000..b6b9dd1605dd --- /dev/null +++ b/paimon-python/pypaimon/stats/statistics.py @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +from dataclasses import dataclass, field +from typing import Any, Dict, Optional + +from pypaimon.stats.col_stats import ColStats + + +@dataclass +class Statistics: + """Table-level statistics. Mirrors Java org.apache.paimon.stats.Statistics.""" + + snapshot_id: int + schema_id: int + merged_record_count: Optional[int] = None + merged_record_size: Optional[int] = None + col_stats: Dict[str, ColStats] = field(default_factory=dict) + + def to_json(self) -> str: + return json.dumps(self.to_dict(), ensure_ascii=False) + + def to_dict(self) -> Dict[str, Any]: + result: Dict[str, Any] = { + "snapshotId": self.snapshot_id, + "schemaId": self.schema_id, + } + if self.merged_record_count is not None: + result["mergedRecordCount"] = self.merged_record_count + if self.merged_record_size is not None: + result["mergedRecordSize"] = self.merged_record_size + result["colStats"] = { + name: cs.to_dict() for name, cs in self.col_stats.items() + } + return result + + @classmethod + def from_json(cls, json_str: str) -> 'Statistics': + return cls.from_dict(json.loads(json_str)) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'Statistics': + col_stats = { + name: ColStats.from_dict(cs_data) + for name, cs_data in data.get("colStats", {}).items() + } + return cls( + snapshot_id=data["snapshotId"], + schema_id=data["schemaId"], + merged_record_count=data.get("mergedRecordCount"), + merged_record_size=data.get("mergedRecordSize"), + col_stats=col_stats, + ) diff --git a/paimon-python/pypaimon/stats/statistics_collector.py b/paimon-python/pypaimon/stats/statistics_collector.py new file mode 100644 index 000000000000..987a8663feca --- /dev/null +++ b/paimon-python/pypaimon/stats/statistics_collector.py @@ -0,0 +1,254 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +from datetime import date, datetime, time +from decimal import Decimal +from typing import Any, Dict, List, Optional, Set + +import pyarrow as pa +import pyarrow.compute as pc + +from pypaimon.stats.col_stats import ColStats +from pypaimon.stats.statistics import Statistics + +logger = logging.getLogger(__name__) + + +def _is_binary_like(arrow_type: pa.DataType) -> bool: + return (pa.types.is_string(arrow_type) or pa.types.is_large_string(arrow_type) + or pa.types.is_binary(arrow_type) or pa.types.is_large_binary(arrow_type) + or pa.types.is_fixed_size_binary(arrow_type)) + + +def _is_binary_only(arrow_type: pa.DataType) -> bool: + """Binary (not string) — min/max not meaningful for Java interop.""" + return pa.types.is_binary(arrow_type) or pa.types.is_large_binary(arrow_type) + + +def _skip_min_max(arrow_type: pa.DataType) -> bool: + """String and binary types do not get min/max (matches Java/Spark hasMinMax).""" + return _is_binary_like(arrow_type) + + +def _is_analyzable_type(arrow_type: pa.DataType) -> bool: + """Return True if the type supports column statistics (matches Spark's gate).""" + return not (pa.types.is_struct(arrow_type) or pa.types.is_list(arrow_type) or + pa.types.is_large_list(arrow_type) or pa.types.is_map(arrow_type) or + pa.types.is_nested(arrow_type)) + + +def _scalar_to_str(scalar: Any) -> Optional[str]: + """Convert a pyarrow scalar to string in Java-compatible format. + + Java serialization: + - DATE: epoch days (int) + - TIME: millis of day (int) + - TIMESTAMP: micros since epoch (long) for TIMESTAMP, or formatted string + - Numeric types: str(value) + - Binary: hex string + """ + if scalar is None or not scalar.is_valid: + return None + val = scalar.as_py() + if val is None: + return None + if isinstance(val, datetime): + # Timestamp as formatted string (matches Java TimestampSerializer.serializeToString) + if val.microsecond: + return val.strftime("%Y-%m-%d %H:%M:%S.") + f"{val.microsecond:06d}".rstrip('0') + return val.strftime("%Y-%m-%d %H:%M:%S") + if isinstance(val, date): + # Date as epoch days (matches Java IntSerializer for DATE) + from datetime import date as date_cls + epoch = date_cls(1970, 1, 1) + return str((val - epoch).days) + if isinstance(val, time): + # Time as millis of day (matches Java IntSerializer for TIME) + millis = (val.hour * 3600 + val.minute * 60 + val.second) * 1000 + val.microsecond // 1000 + return str(millis) + if isinstance(val, bytes): + return val.hex() + if isinstance(val, Decimal): + return str(val) + return str(val) + + +class StatisticsCollector: + """Collects column statistics by reading merged table data via the read path.""" + + def __init__(self, table): + self._table = table + + def collect(self, columns: Optional[List[str]] = None) -> Statistics: + """Compute statistics by reading the full merged table. + + Args: + columns: Column names to compute stats for. If None, all columns. + + Returns: + A Statistics object ready to commit. + """ + snapshot = self._table.snapshot_manager().get_latest_snapshot() + if snapshot is None: + return Statistics( + snapshot_id=0, schema_id=self._table.table_schema.id, + merged_record_count=0, merged_record_size=0, col_stats={}, + ) + + schema = self._table.table_schema + all_fields = schema.fields + field_name_to_id = {f.name: f.id for f in all_fields} + + target_columns: List[str] + if columns: + for col in columns: + if col not in field_name_to_id: + raise ValueError(f"Column '{col}' not found in table schema. " + f"Available: {list(field_name_to_id.keys())}") + target_columns = columns + else: + target_columns = [f.name for f in all_fields] + + # Read merged data + read_builder = self._table.new_read_builder() + scan = read_builder.new_scan() + plan = scan.plan() + splits = plan.splits() + + if not splits: + return Statistics( + snapshot_id=snapshot.id, + schema_id=schema.id, + merged_record_count=0, + merged_record_size=0, + col_stats={}, + ) + + read = read_builder.new_read() + accumulators: Dict[str, _ColumnAccumulator] = {} + total_rows = 0 + total_size = 0 + + arrow_table = read.to_arrow(splits) + total_rows = arrow_table.num_rows + total_size = arrow_table.nbytes + + for col_name in target_columns: + if col_name not in arrow_table.column_names: + continue + col_array = arrow_table.column(col_name) + if not _is_analyzable_type(col_array.type): + logger.debug("Skipping column '%s' with unsupported type %s", col_name, col_array.type) + continue + acc = _ColumnAccumulator(col_name) + acc.add_array(col_array) + accumulators[col_name] = acc + + # Build ColStats + col_stats: Dict[str, ColStats] = {} + for col_name, acc in accumulators.items(): + col_id = field_name_to_id[col_name] + col_stats[col_name] = acc.finalize(col_id) + + return Statistics( + snapshot_id=snapshot.id, + schema_id=schema.id, + merged_record_count=total_rows, + merged_record_size=total_size, + col_stats=col_stats, + ) + + +class _ColumnAccumulator: + """Accumulates statistics for a single column.""" + + def __init__(self, name: str): + self.name = name + self._null_count = 0 + self._min_scalar = None + self._max_scalar = None + self._distinct_values: Optional[Set] = set() + self._total_len = 0 + self._max_len_value = 0 + self._row_count = 0 + self._is_binary_like = False + self._skip_min_max = False + self._too_many_distinct = False + + def add_array(self, array: pa.ChunkedArray): + self._is_binary_like = _is_binary_like(array.type) + self._skip_min_max = _skip_min_max(array.type) + + for chunk in array.chunks: + self._null_count += chunk.null_count + self._row_count += len(chunk) + + valid = pc.drop_null(chunk) + if len(valid) == 0: + continue + + # min/max (skip for string and binary — matches Java/Spark hasMinMax) + if not self._skip_min_max: + chunk_min = pc.min(valid) + chunk_max = pc.max(valid) + if self._min_scalar is None or (chunk_min.is_valid and chunk_min < self._min_scalar): + self._min_scalar = chunk_min + if self._max_scalar is None or (chunk_max.is_valid and chunk_max > self._max_scalar): + self._max_scalar = chunk_max + + # distinct count (exact up to 10M values, then give up) + if not self._too_many_distinct and self._distinct_values is not None: + unique_vals = pc.unique(valid) + for val in unique_vals: + self._distinct_values.add(val.as_py()) + if len(self._distinct_values) > 10_000_000: + self._too_many_distinct = True + self._distinct_values = None + + # binary/string length stats + if self._is_binary_like: + lengths = pc.binary_length(valid) + self._total_len += pc.sum(lengths).as_py() or 0 + chunk_max_len = pc.max(lengths) + if chunk_max_len.is_valid: + self._max_len_value = max(self._max_len_value, chunk_max_len.as_py()) + + def finalize(self, col_id: int) -> ColStats: + distinct_count = None + if self._distinct_values is not None: + distinct_count = len(self._distinct_values) + + avg_len = None + max_len = None + if self._is_binary_like: + non_null_count = self._row_count - self._null_count + if non_null_count > 0: + avg_len = self._total_len // non_null_count + else: + avg_len = 0 + max_len = self._max_len_value + + return ColStats( + col_id=col_id, + distinct_count=distinct_count, + min=_scalar_to_str(self._min_scalar), + max=_scalar_to_str(self._max_scalar), + null_count=self._null_count, + avg_len=avg_len, + max_len=max_len, + ) diff --git a/paimon-python/pypaimon/stats/stats_file_handler.py b/paimon-python/pypaimon/stats/stats_file_handler.py new file mode 100644 index 000000000000..13649bab0b40 --- /dev/null +++ b/paimon-python/pypaimon/stats/stats_file_handler.py @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import uuid +from typing import Optional + +from pypaimon.common.file_io import FileIO +from pypaimon.snapshot.snapshot import Snapshot +from pypaimon.stats.statistics import Statistics + + +class StatsFileHandler: + """Read/write statistics JSON files under /statistics/.""" + + def __init__(self, file_io: FileIO, table_path: str): + self._file_io = file_io + self._statistics_path = f"{table_path.rstrip('/')}/statistics" + + def read_stats(self, snapshot: Snapshot) -> Optional[Statistics]: + file_name = snapshot.statistics + if file_name is None: + return None + return self._read_file(file_name) + + def write_stats(self, stats: Statistics) -> str: + file_name = f"statistics-{uuid.uuid4().hex}" + path = f"{self._statistics_path}/{file_name}" + self._file_io.write_file(path, stats.to_json(), overwrite=False) + return file_name + + def _read_file(self, file_name: str) -> Statistics: + path = f"{self._statistics_path}/{file_name}" + json_str = self._file_io.read_file_utf8(path) + return Statistics.from_json(json_str) diff --git a/paimon-python/pypaimon/tests/test_statistics.py b/paimon-python/pypaimon/tests/test_statistics.py new file mode 100644 index 000000000000..ae79ce3d6870 --- /dev/null +++ b/paimon-python/pypaimon/tests/test_statistics.py @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import shutil +import tempfile +import unittest + +from pypaimon.stats.col_stats import ColStats +from pypaimon.stats.statistics import Statistics +from pypaimon.stats.stats_file_handler import StatsFileHandler + + +class TestColStatsSerialization(unittest.TestCase): + + def test_roundtrip(self): + cs = ColStats(col_id=3, distinct_count=100, min="1", max="999", + null_count=5, avg_len=8, max_len=12) + d = cs.to_dict() + restored = ColStats.from_dict(d) + self.assertEqual(cs, restored) + + def test_optional_fields(self): + cs = ColStats(col_id=1, null_count=10) + d = cs.to_dict() + self.assertNotIn("distinctCount", d) + self.assertNotIn("min", d) + restored = ColStats.from_dict(d) + self.assertIsNone(restored.distinct_count) + self.assertEqual(restored.null_count, 10) + + +class TestStatisticsSerialization(unittest.TestCase): + + def test_roundtrip(self): + stats = Statistics( + snapshot_id=5, + schema_id=0, + merged_record_count=1000, + merged_record_size=4096, + col_stats={ + "id": ColStats(col_id=0, distinct_count=1000, min="1", max="1000", null_count=0), + "name": ColStats(col_id=1, distinct_count=500, null_count=3, avg_len=10, max_len=50), + } + ) + json_str = stats.to_json() + restored = Statistics.from_json(json_str) + self.assertEqual(restored.snapshot_id, 5) + self.assertEqual(restored.merged_record_count, 1000) + self.assertEqual(len(restored.col_stats), 2) + self.assertEqual(restored.col_stats["id"].distinct_count, 1000) + + def test_java_compatible_keys(self): + stats = Statistics(snapshot_id=1, schema_id=0, merged_record_count=10) + d = stats.to_dict() + self.assertIn("snapshotId", d) + self.assertIn("schemaId", d) + self.assertIn("mergedRecordCount", d) + + def test_empty_col_stats_always_emitted(self): + stats = Statistics(snapshot_id=1, schema_id=0) + d = stats.to_dict() + self.assertIn("colStats", d) + self.assertEqual(d["colStats"], {}) + + +class TestStatsFileHandler(unittest.TestCase): + + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + self.table_path = os.path.join(self.temp_dir, "test_table") + os.makedirs(os.path.join(self.table_path, "statistics")) + + def tearDown(self): + shutil.rmtree(self.temp_dir) + + def test_write_and_read(self): + from pypaimon.filesystem.local_file_io import LocalFileIO + file_io = LocalFileIO() + handler = StatsFileHandler(file_io, self.table_path) + + stats = Statistics( + snapshot_id=1, schema_id=0, merged_record_count=42, + col_stats={"x": ColStats(col_id=0, min="0", max="100", null_count=0)} + ) + file_name = handler.write_stats(stats) + self.assertTrue(file_name.startswith("statistics-")) + + from pypaimon.snapshot.snapshot import Snapshot + mock_snapshot = Snapshot( + version=3, id=1, schema_id=0, + base_manifest_list="", delta_manifest_list="", + total_record_count=42, delta_record_count=42, + commit_user="test", commit_identifier=0, + commit_kind="ANALYZE", time_millis=0, + statistics=file_name, + ) + read_back = handler.read_stats(mock_snapshot) + self.assertEqual(read_back.merged_record_count, 42) + self.assertEqual(read_back.col_stats["x"].max, "100") + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index d88beb164e03..673d1c23e580 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -239,6 +239,53 @@ def truncate_table(self, commit_identifier: int) -> None: allow_rollback=False, ) + def commit_statistics(self, stats, commit_identifier: int) -> None: + """Commit statistics without data changes (ANALYZE commit). + + Creates a new snapshot that references the stats file but carries + the same manifest lists as the previous snapshot. + """ + from pypaimon.stats.stats_file_handler import StatsFileHandler + + latest_snapshot = self.snapshot_manager.get_latest_snapshot() + if latest_snapshot is None: + raise RuntimeError("Cannot commit statistics: table has no snapshots yet.") + + stats_file_handler = StatsFileHandler(self.table.file_io, self.table.table_path) + stats_file_name = stats_file_handler.write_stats(stats) + + new_snapshot_id = latest_snapshot.id + 1 + snapshot_data = Snapshot( + version=3, + id=new_snapshot_id, + schema_id=self.table.table_schema.id, + base_manifest_list=latest_snapshot.base_manifest_list, + delta_manifest_list=latest_snapshot.delta_manifest_list, + total_record_count=latest_snapshot.total_record_count, + delta_record_count=0, + commit_user=self.commit_user, + commit_identifier=commit_identifier, + commit_kind="ANALYZE", + time_millis=int(time.time() * 1000), + index_manifest=latest_snapshot.index_manifest, + statistics=stats_file_name, + watermark=latest_snapshot.watermark, + next_row_id=latest_snapshot.next_row_id, + ) + + with self.snapshot_commit: + success = self.snapshot_commit.commit(snapshot_data, []) + if not success: + raise RuntimeError( + f"Failed to commit ANALYZE snapshot #{new_snapshot_id}. " + "Concurrent commit conflict detected." + ) + + logger.info( + "Successfully committed ANALYZE snapshot #%d with statistics file '%s'.", + new_snapshot_id, stats_file_name, + ) + def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan, detect_conflicts=False, allow_rollback=False): @@ -382,6 +429,12 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str if latest_snapshot and commit_kind == "APPEND": index_manifest = latest_snapshot.index_manifest + # Inherit statistics from previous snapshot if schema unchanged + inherited_statistics = None + if latest_snapshot and latest_snapshot.statistics: + if latest_snapshot.schema_id == self.table.table_schema.id: + inherited_statistics = latest_snapshot.statistics + snapshot_data = Snapshot( version=3, id=new_snapshot_id, @@ -396,6 +449,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str time_millis=int(time.time() * 1000), next_row_id=next_row_id, index_manifest=index_manifest, + statistics=inherited_statistics, ) # Generate partition statistics for the commit statistics = self._generate_partition_statistics(commit_entries) diff --git a/paimon-python/pypaimon/write/table_commit.py b/paimon-python/pypaimon/write/table_commit.py index 54298f3b19f5..f3449f68f698 100644 --- a/paimon-python/pypaimon/write/table_commit.py +++ b/paimon-python/pypaimon/write/table_commit.py @@ -107,6 +107,11 @@ def truncate_partitions(self, partitions: List[Dict[str, str]]) -> None: self._check_committed() self.file_store_commit.drop_partitions(partitions, BATCH_COMMIT_IDENTIFIER) + def update_statistics(self, stats) -> None: + """Commit statistics (ANALYZE) without data changes.""" + self._check_committed() + self.file_store_commit.commit_statistics(stats, BATCH_COMMIT_IDENTIFIER) + def _check_committed(self): if self.batch_committed: raise RuntimeError("BatchTableCommit only supports one-time committing.")