Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions paimon-python/dev/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ pyroaring<=0.4.5; python_version == "3.7"
pyroaring>=1.0.0; python_version >= "3.8"
readerwriterlock>=1,<2
zstandard>=0.19,<1
backports.zstd>=1.0.0,<1.4.0; python_version >= "3.9" and python_version < "3.14"
cramjam>=1.3.0,<3; python_version>="3.7"
pyyaml>=5.4,<7
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Test helper: subprocess ``ManifestListManager.read``; prints entry count on stdout.

Usage: ``python manifest_list_zstd_read_subprocess.py <warehouse> <table_id> <list_file_name>``
(``list_file_name`` = basename under the table manifest dir). Imports live in ``main()``."""

import sys


def main():
if len(sys.argv) != 4:
print(
'usage: manifest_list_zstd_read_subprocess.py '
'<warehouse_path> <table_id> <manifest_list_name>',
file=sys.stderr,
)
return 2
warehouse_path, catalog_table_id, manifest_list_name = (
sys.argv[1],
sys.argv[2],
sys.argv[3],
)
from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
from pypaimon.common.identifier import Identifier
from pypaimon.common.options import Options
from pypaimon.common.options.config import CatalogOptions
from pypaimon.manifest.manifest_list_manager import ManifestListManager

catalog = FileSystemCatalog(Options({CatalogOptions.WAREHOUSE.key(): warehouse_path}))
table = catalog.get_table(Identifier.from_string(catalog_table_id))
manifest_list_manager = ManifestListManager(table)
metas = manifest_list_manager.read(manifest_list_name)
print(len(metas))
return 0


if __name__ == '__main__':
try:
raise SystemExit(main())
except Exception:
import traceback
traceback.print_exc()
raise SystemExit(1)
217 changes: 217 additions & 0 deletions paimon-python/pypaimon/tests/manifest/manifest_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import shutil
import subprocess
import sys
import tempfile
import threading
import unittest

import pyarrow as pa
Expand All @@ -38,6 +43,141 @@
_EMPTY_STATS = SimpleStats(min_values=_EMPTY_ROW, max_values=_EMPTY_ROW, null_counts=[])


def _paimon_python_root():
return os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))


def _runner_can_write_zstandard_avro():
"""fastavro uses ``backports.zstd`` (Py < 3.14) to *write* zstandard Avro blocks."""
try:
from io import BytesIO
import fastavro
buf = BytesIO()
fastavro.writer(buf, {'type': 'string'}, ['x'], codec='zstandard')
return len(buf.getvalue()) > 0
except Exception:
return False


def _venv_python_executable(venv_dir):
for rel in (('bin', 'python'), ('bin', 'python3'), ('Scripts', 'python.exe')):
path = os.path.join(venv_dir, *rel)
if os.path.isfile(path):
return path
raise FileNotFoundError('no interpreter under venv: ' + venv_dir)


def _subprocess_env_for_pip():
"""Drop proxy variables for venv pip calls (avoids SOCKS optional dependency errors)."""
env = os.environ.copy()
for key in list(env):
if 'PROXY' in key.upper():
env.pop(key, None)
env.setdefault('PIP_DISABLE_PIP_VERSION_CHECK', '1')
return env


_MANIFEST_ZSTD_READ_SUBPROC_VENV_LOCK = threading.Lock()
_MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR = None
_MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON = None


def _manifest_zstd_read_subprocess_venv_python():
"""Disposable venv with editable pypaimon for ``manifest_list_zstd_read_subprocess.py``.

Does not install ``backports.zstd`` so the first worker run can hit fastavro's missing zstd
codec path when reading zstandard-compressed manifest lists.
"""
global _MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR, _MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON
with _MANIFEST_ZSTD_READ_SUBPROC_VENV_LOCK:
if _MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON is not None:
return _MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON
repo = _paimon_python_root()
venv_dir = tempfile.mkdtemp(prefix='paimon-zstd-read-subprocess-')
pip_install_env = _subprocess_env_for_pip()
uv_bin = shutil.which('uv')
try:
if uv_bin:
subprocess.check_call(
[uv_bin, 'venv', venv_dir],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env=pip_install_env,
)
isolated_venv_python = _venv_python_executable(venv_dir)
subprocess.check_call(
[uv_bin, 'pip', 'install', '-q', '--python', isolated_venv_python, '-e', repo, 'requests'],
env=pip_install_env,
)
else:
subprocess.check_call(
[sys.executable, '-m', 'venv', venv_dir],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
isolated_venv_python = _venv_python_executable(venv_dir)
subprocess.check_call(
[isolated_venv_python, '-m', 'pip', 'install', '-q', '-e', repo, 'requests'],
env=pip_install_env,
)
except Exception:
shutil.rmtree(venv_dir, ignore_errors=True)
raise
_MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR = venv_dir
_MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON = isolated_venv_python
return isolated_venv_python


def _tear_down_manifest_zstd_read_subprocess_venv():
global _MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR, _MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON
with _MANIFEST_ZSTD_READ_SUBPROC_VENV_LOCK:
if _MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR:
shutil.rmtree(_MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR, ignore_errors=True)
_MANIFEST_ZSTD_READ_SUBPROC_VENV_DIR = None
_MANIFEST_ZSTD_READ_SUBPROC_VENV_PYTHON = None


def tearDownModule():
_tear_down_manifest_zstd_read_subprocess_venv()


def _write_zstandard_manifest_list_file(table, list_name):
"""Write a manifest list Avro object file using the zstandard codec (same as Java may emit)."""
from io import BytesIO
import fastavro
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.manifest.schema.manifest_file_meta import (
MANIFEST_FILE_META_SCHEMA, ManifestFileMeta)
from pypaimon.table.row.generic_row import GenericRowSerializer

meta = ManifestFileMeta(
file_name='manifest.avro', file_size=1024,
num_added_files=1, num_deleted_files=0,
partition_stats=SimpleStats.empty_stats(), schema_id=0,
)
avro_record = {
'_VERSION': 2,
'_FILE_NAME': meta.file_name,
'_FILE_SIZE': meta.file_size,
'_NUM_ADDED_FILES': meta.num_added_files,
'_NUM_DELETED_FILES': meta.num_deleted_files,
'_PARTITION_STATS': {
'_MIN_VALUES': GenericRowSerializer.to_bytes(meta.partition_stats.min_values),
'_MAX_VALUES': GenericRowSerializer.to_bytes(meta.partition_stats.max_values),
'_NULL_COUNTS': meta.partition_stats.null_counts,
},
'_SCHEMA_ID': meta.schema_id,
'_MIN_ROW_ID': meta.min_row_id,
'_MAX_ROW_ID': meta.max_row_id,
}
buf = BytesIO()
fastavro.writer(buf, MANIFEST_FILE_META_SCHEMA, [avro_record], codec='zstandard')
mlm = ManifestListManager(table)
list_path = '{}/{}'.format(mlm.manifest_path, list_name)
with table.file_io.new_output_stream(list_path) as out:
out.write(buf.getvalue())


class _ManifestManagerSetup(unittest.TestCase):
"""Shared setup for manifest manager tests.

Expand Down Expand Up @@ -191,6 +331,83 @@ def test_read_base_returns_only_base_manifest(self):
self.assertEqual(len(result), 1)
self.assertEqual(result[0].file_name, "manifest-base.avro")

@unittest.skipIf(
sys.version_info < (3, 9),
'PyPI backports.zstd only supports Python 3.9–3.13',
)
@unittest.skipIf(
sys.version_info >= (3, 14),
'fastavro uses stdlib compression.zstd on Python 3.14+, not backports.zstd',
)
def test_zstd_manifest_list_fastavro_requires_backports_zstd(self):
"""Child venv runs ``manifest_list_zstd_read_subprocess`` (argv: warehouse, table id, list file name).

No ``backports.zstd`` in the venv → read fails; after ``pip install`` → read succeeds.
"""
if not _runner_can_write_zstandard_avro():
self.skipTest('runner cannot write zstandard Avro')

zstd_manifest_list_file_name = 'zstd-manifest-list-backports-integration'
_write_zstandard_manifest_list_file(self.table, zstd_manifest_list_file_name)

manifest_list_manager = ManifestListManager(self.table)
self.assertEqual(len(manifest_list_manager.read(zstd_manifest_list_file_name)), 1)

zstd_manifest_list_abspath = os.path.join(
manifest_list_manager.manifest_path, zstd_manifest_list_file_name)
self.assertTrue(os.path.isfile(zstd_manifest_list_abspath), msg=zstd_manifest_list_abspath)

zstd_read_subprocess_script = os.path.join(
os.path.dirname(__file__), 'manifest_list_zstd_read_subprocess.py')
catalog_table_id = 'default.{}'.format(self._table_name)
isolated_venv_python = _manifest_zstd_read_subprocess_venv_python()
pip_install_env = _subprocess_env_for_pip()
subprocess.run(
[isolated_venv_python, '-m', 'pip', 'uninstall', '-y', 'backports.zstd'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env=pip_install_env,
)

read_without_zstd_backend = subprocess.run(
[
isolated_venv_python,
zstd_read_subprocess_script,
self.catalog.warehouse,
catalog_table_id,
zstd_manifest_list_file_name,
],
capture_output=True,
text=True,
)
self.assertNotEqual(
read_without_zstd_backend.returncode, 0,
msg=read_without_zstd_backend.stdout + read_without_zstd_backend.stderr)
stderr_and_stdout = (
read_without_zstd_backend.stdout + read_without_zstd_backend.stderr)
self.assertIn('zstandard codec is supported but you need to install', stderr_and_stdout)
self.assertIn('backports.zstd', stderr_and_stdout)

subprocess.check_call(
[isolated_venv_python, '-m', 'pip', 'install', '-q', 'backports.zstd'],
env=pip_install_env,
)
read_with_zstd_backend = subprocess.run(
[
isolated_venv_python,
zstd_read_subprocess_script,
self.catalog.warehouse,
catalog_table_id,
zstd_manifest_list_file_name,
],
capture_output=True,
text=True,
)
self.assertEqual(
read_with_zstd_backend.returncode, 0,
msg=read_with_zstd_backend.stdout + read_with_zstd_backend.stderr)
self.assertEqual(read_with_zstd_backend.stdout.strip(), '1')


if __name__ == '__main__':
unittest.main()
Loading