Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions bindings/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ classifiers = [
"Programming Language :: Rust",
]

[project.optional-dependencies]
video = [
"av>=17.0,<18.0",
"pillow>=12.0,<13.0",
]

[tool.maturin]
module-name = "pypaimon_rust.pypaimon_rust"
python-source = "python"
Expand All @@ -54,4 +60,6 @@ dev = [
"pytest>=8.0",
"pyarrow>=17.0,<24.0",
"datafusion==53.0.0",
"av>=17.0,<18.0",
"pillow>=12.0,<13.0",
]
44 changes: 43 additions & 1 deletion bindings/python/python/pypaimon_rust/datafusion.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,55 @@
# specific language governing permissions and limitations
# under the License.

from typing import Any, Dict, List
from typing import Any, Callable, Dict, List, Optional, Sequence, TypeAlias, Union

import pyarrow

ArrowTypeLike: TypeAlias = Union[pyarrow.DataType, pyarrow.Field, str]
InputFieldsLike: TypeAlias = Union[ArrowTypeLike, Sequence[ArrowTypeLike]]
VolatilityLike: TypeAlias = Union[str, Any]

class PaimonCatalog:
def __init__(self, catalog_options: Dict[str, str]) -> None: ...
def __datafusion_catalog_provider__(self, session: Any) -> object: ...

class PythonScalarUDF:
def __init__(
self,
name: str,
func: Callable[..., pyarrow.Array],
input_fields: InputFieldsLike,
return_field: ArrowTypeLike,
volatility: VolatilityLike,
) -> None: ...
@staticmethod
def udf(
func: Callable[..., pyarrow.Array],
input_fields: InputFieldsLike,
return_field: ArrowTypeLike,
volatility: VolatilityLike,
name: Optional[str] = None,
) -> "PythonScalarUDF": ...
@property
def name(self) -> str: ...

def udf(
func: Callable[..., pyarrow.Array],
input_fields: InputFieldsLike,
return_field: ArrowTypeLike,
volatility: VolatilityLike,
name: Optional[str] = None,
) -> PythonScalarUDF:
"""
Create a scalar UDF.

This mirrors DataFusion Python's function-style API:
``udf(func, input_fields, return_field, volatility, name)``.
``input_fields`` and ``return_field`` accept PyArrow DataType or Field
values. String type names remain accepted for compatibility.
"""
...

class SQLContext:
def __init__(self) -> None: ...
def register_catalog(
Expand All @@ -31,4 +72,5 @@ class SQLContext:
def set_current_catalog(self, catalog_name: str) -> None: ...
def set_current_database(self, database_name: str) -> None: ...
def register_batch(self, name: str, batch: pyarrow.RecordBatch) -> None: ...
def register_udf(self, udf: PythonScalarUDF) -> None: ...
def sql(self, sql: str) -> List[pyarrow.RecordBatch]: ...
179 changes: 179 additions & 0 deletions bindings/python/python/pypaimon_rust/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import io
import logging
import struct
from typing import Any, BinaryIO

logger = logging.getLogger(__name__)
_STILL_IMAGE_FORMATS = {
"apng",
"bmp_pipe",
"gif",
"ico",
"image2",
"image2pipe",
"jpeg_pipe",
"png_pipe",
"tiff_pipe",
"webp_pipe",
}


class _BlobDescriptorProbe:
CURRENT_VERSION = 2
MAGIC = 0x424C4F4244455343

@classmethod
def is_blob_descriptor(cls, data: Any) -> bool:
if not isinstance(data, (bytes, bytearray, memoryview)):
return False
raw = bytes(data)
if len(raw) < 9:
return False

version = raw[0]
# Version 1 has no magic header, so it cannot be distinguished safely
# from arbitrary inline video bytes in this heuristic.
if version == 1 or version > cls.CURRENT_VERSION:
return False

try:
return struct.unpack("<Q", raw[1:9])[0] == cls.MAGIC
except Exception:
return False


def is_blob_descriptor(data: Any) -> bool:
return _BlobDescriptorProbe.is_blob_descriptor(data)


def open_blob_descriptor_stream(
raw_value: bytes,
blob_reader_registry=None,
) -> BinaryIO:
if blob_reader_registry is not None:
stream = blob_reader_registry.open_blob_descriptor_stream(raw_value)
if stream is not None:
return stream

if _BlobDescriptorProbe.is_blob_descriptor(raw_value):
raise RuntimeError(
"BlobDescriptor input requires a registered Paimon table FileIO"
)
return io.BytesIO(bytes(raw_value))


def _decode_video_snapshot(
stream: BinaryIO,
image_format: str,
timestamp_ms: int = 0,
) -> bytes | None:
try:
import av
except ImportError as e:
raise ImportError("PyAV is required to decode video snapshots") from e

with av.open(stream, mode="r") as container:
format_names = set((container.format.name or "").split(","))
if format_names & _STILL_IMAGE_FORMATS:
logger.debug(
"video_snapshot input is a still image format: %s",
container.format.name,
)
return None
if not container.streams.video:
return None

target_seconds = timestamp_ms / 1000
if timestamp_ms > 0:
container.seek(timestamp_ms * 1000, backward=True, any_frame=False)

candidate = None
for frame in container.decode(video=0):
if (
timestamp_ms > 0
and frame.time is not None
and frame.time < target_seconds
):
candidate = frame
continue
candidate = frame
break

if candidate is not None:
try:
image = candidate.to_image()
except ImportError as e:
raise ImportError(
"Pillow is required to encode video_snapshot images"
) from e
output = io.BytesIO()
image.save(output, format=image_format)
return output.getvalue()
return None


def _make_video_snapshot(image_format: str = "PNG", blob_reader_registry=None):
image_format = image_format.upper()

def video_snapshot(values, timestamps_ms=None):
try:
import pyarrow as pa
except ImportError as e:
raise ImportError("pyarrow is required to return video_snapshot results") from e

frames = []
raw_values = values.to_pylist()
if timestamps_ms is None:
timestamp_values = [0] * len(raw_values)
else:
timestamp_values = timestamps_ms.to_pylist()
if len(timestamp_values) != len(raw_values):
raise ValueError(
"video_snapshot timestamp argument must have the same row count"
)

# v1 intentionally decodes rows serially; callers should filter or limit
# large scans before applying video_snapshot.
for raw_value, timestamp_ms in zip(raw_values, timestamp_values):
if raw_value is None or timestamp_ms is None:
frames.append(None)
continue

try:
timestamp_ms = int(timestamp_ms)
if timestamp_ms < 0:
frames.append(None)
continue
stream = open_blob_descriptor_stream(raw_value, blob_reader_registry)
try:
frames.append(
_decode_video_snapshot(stream, image_format, timestamp_ms)
)
finally:
stream.close()
except ImportError:
raise
except Exception as e:
logger.warning("Failed to decode video snapshot: %s", e)
frames.append(None)

return pa.array(frames, type=pa.binary())

return video_snapshot
24 changes: 24 additions & 0 deletions bindings/python/python/pypaimon_rust/functions.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Any, BinaryIO

def is_blob_descriptor(data: Any) -> bool: ...
def open_blob_descriptor_stream(
raw_value: bytes,
blob_reader_registry: Any | None = None,
) -> BinaryIO: ...
Loading
Loading