Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 43 additions & 75 deletions python/cocoindex/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@

T = TypeVar("T")

try:
import pydantic, pydantic_core
except ImportError:
pass


class ChildFieldPath:
"""Context manager to append a field to field_path on enter and pop it on exit."""
Expand Down Expand Up @@ -777,106 +782,69 @@ def load_engine_object(expected_type: Any, v: Any) -> Any:
# Structs (dataclass, NamedTuple, or Pydantic)
if isinstance(variant, AnalyzedStructType):
struct_type = variant.struct_type
init_kwargs: dict[str, Any] = {}
missing_fields: list[tuple[str, Any]] = []
if dataclasses.is_dataclass(struct_type):
if not isinstance(v, Mapping):
raise ValueError(f"Expected dict for dataclass, got {type(v)}")
# Drop auxiliary discriminator "kind" if present
dc_init_kwargs: dict[str, Any] = {}
field_types = {f.name: f.type for f in dataclasses.fields(struct_type)}
dataclass_fields = {f.name: f for f in dataclasses.fields(struct_type)}

for name, f_type in field_types.items():
if name in v:
dc_init_kwargs[name] = load_engine_object(f_type, v[name])
for dc_field in dataclasses.fields(struct_type):
if dc_field.name in v:
init_kwargs[dc_field.name] = load_engine_object(
dc_field.type, v[dc_field.name]
)
else:
# Field is missing from input, check if it has a default or can use auto-default
field = dataclass_fields[name]
if field.default is not dataclasses.MISSING:
# Field has an explicit default value
dc_init_kwargs[name] = field.default
elif field.default_factory is not dataclasses.MISSING:
# Field has a default factory
dc_init_kwargs[name] = field.default_factory()
else:
# No explicit default, try to get auto-default
type_info = analyze_type_info(f_type)
auto_default, is_supported = _get_auto_default_for_type(
type_info
)
if is_supported:
dc_init_kwargs[name] = auto_default
# If not supported, skip the field (let dataclass constructor handle the error)
return struct_type(**dc_init_kwargs)
if (
dc_field.default is dataclasses.MISSING
and dc_field.default_factory is dataclasses.MISSING
):
missing_fields.append((dc_field.name, dc_field.type))

elif is_namedtuple_type(struct_type):
if not isinstance(v, Mapping):
raise ValueError(f"Expected dict for NamedTuple, got {type(v)}")
# Dict format (from dump/load functions)
annotations = getattr(struct_type, "__annotations__", {})
field_names = list(getattr(struct_type, "_fields", ()))
field_defaults = getattr(struct_type, "_field_defaults", {})
nt_init_kwargs: dict[str, Any] = {}

for name in field_names:
f_type = annotations.get(name, Any)
if name in v:
nt_init_kwargs[name] = load_engine_object(f_type, v[name])
else:
# Field is missing from input, check if it has a default or can use auto-default
if name in field_defaults:
# Field has an explicit default value
nt_init_kwargs[name] = field_defaults[name]
else:
# No explicit default, try to get auto-default
type_info = analyze_type_info(f_type)
auto_default, is_supported = _get_auto_default_for_type(
type_info
)
if is_supported:
nt_init_kwargs[name] = auto_default
# If not supported, skip the field (let NamedTuple constructor handle the error)
return struct_type(**nt_init_kwargs)
init_kwargs[name] = load_engine_object(f_type, v[name])
elif name not in field_defaults:
missing_fields.append((name, f_type))

elif is_pydantic_model(struct_type):
if not isinstance(v, Mapping):
raise ValueError(f"Expected dict for Pydantic model, got {type(v)}")
# Drop auxiliary discriminator "kind" if present
pydantic_init_kwargs: dict[str, Any] = {}
# Type guard: ensure we have model_fields attribute

model_fields: dict[str, pydantic.fields.FieldInfo]
if hasattr(struct_type, "model_fields"):
model_fields = struct_type.model_fields # type: ignore[attr-defined]
else:
model_fields = {}
field_types = {
name: field.annotation for name, field in model_fields.items()
}

for name, f_type in field_types.items():
for name, pyd_field in model_fields.items():
if name in v:
pydantic_init_kwargs[name] = load_engine_object(f_type, v[name])
else:
# Field is missing from input, check if it has a default or can use auto-default
field = model_fields[name]
if (
hasattr(field, "default") and field.default is not ...
): # ... is Pydantic's sentinel for no default
# Field has an explicit default value
pydantic_init_kwargs[name] = field.default
elif (
hasattr(field, "default_factory")
and field.default_factory is not None
):
# Field has a default factory
pydantic_init_kwargs[name] = field.default_factory()
else:
# No explicit default, try to get auto-default
type_info = analyze_type_info(f_type)
auto_default, is_supported = _get_auto_default_for_type(
type_info
)
if is_supported:
pydantic_init_kwargs[name] = auto_default
# If not supported, skip the field (let Pydantic constructor handle the error)
return struct_type(**pydantic_init_kwargs)
return v
init_kwargs[name] = load_engine_object(
pyd_field.annotation, v[name]
)
elif (
getattr(pyd_field, "default", pydantic_core.PydanticUndefined)
is pydantic_core.PydanticUndefined
and getattr(pyd_field, "default_factory") is None
):
missing_fields.append((name, pyd_field.annotation))
else:
assert False, "Unsupported struct type"

for name, f_type in missing_fields:
type_info = analyze_type_info(f_type)
auto_default, is_supported = _get_auto_default_for_type(type_info)
if is_supported:
init_kwargs[name] = auto_default
return struct_type(**init_kwargs)

# Union with discriminator support via "kind"
if isinstance(variant, AnalyzedUnionType):
Expand Down
40 changes: 40 additions & 0 deletions python/cocoindex/tests/test_load_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,18 @@

import numpy as np
from numpy.typing import NDArray
import pytest

from cocoindex.convert import dump_engine_object, load_engine_object

# Optional Pydantic support for testing
try:
import pydantic

PYDANTIC_AVAILABLE = True
except ImportError:
PYDANTIC_AVAILABLE = False


@dataclasses.dataclass
class LocalTargetFieldMapping:
Expand Down Expand Up @@ -181,3 +190,34 @@ class TestClass:
assert False, "Expected TypeError to be raised"
except TypeError:
pass # Expected behavior


@pytest.mark.skipif(not PYDANTIC_AVAILABLE, reason="Pydantic not available")
def test_pydantic_unsupported_type_still_fails() -> None:
"""Test that fields with unsupported types still cause errors when missing."""

class TestPydantic(pydantic.BaseModel):
required_field1: str
required_field2: int # No auto-default for int
optional_field: str | None
list_field: list[str]
dict_field: dict[str, int]
field_with_default: str = "default_value"

# Input missing required_field2 which has no safe auto-default
input_data = {"required_field1": "test_value"}

# Should still raise an error because int has no safe auto-default
with pytest.raises(pydantic.ValidationError):
load_engine_object(TestPydantic, input_data)

assert load_engine_object(
TestPydantic, {"required_field1": "test_value", "required_field2": 1}
) == TestPydantic(
required_field1="test_value",
required_field2=1,
field_with_default="default_value",
optional_field=None,
list_field=[],
dict_field={},
)
Loading