Description
GenericResult[int] is a runtime-created Pydantic generic specialization. When
that specialization is created at module top level, Pydantic registers it in
ccflow.result.generic under the generated name GenericResult[int] so pickle
can serialize it by global reference.
That global registration is process-local. If one process cloudpickles a
GenericResult[int] instance by reference, and a fresh process or Ray worker
tries to unpickle it before that worker has created the same specialization,
unpickle fails with:
AttributeError: Can't get attribute 'GenericResult[int]' on
<module 'ccflow.result.generic' ...>
It is a base serialization issue for
runtime-created Pydantic generic result classes. @Flow.model (from #206) and Ray make the
bug much easier to hit because generated model state or distributed worker
payloads can contain GenericResult[T] instances without importing/re-evaluating
the source module in a way that materializes the specialization first.
The behavior is sensitive to where GenericResult[int] is first evaluated:
- If
GenericResult[int] is evaluated at module top level, Pydantic installs a
module global and cloudpickle may serialize by global reference. This fails in
a fresh process that lacks that process-local global.
- If
GenericResult[int] is created only inside a function, cloudpickle may
serialize the dynamic class by value instead. That can pass, which masks the
issue and makes the failure look import-order dependent.
Steps to Reproduce
Repro 1: plain cloudpickle, no Ray, no @Flow.model
- Save or run this script:
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "ccflow",
# "cloudpickle",
# ]
# ///
import base64
import subprocess
import sys
def run_case(label, dumps_kwargs):
creator_script = f"""
import base64
import cloudpickle
from ccflow import GenericResult
print(base64.b64encode(cloudpickle.dumps(GenericResult[int](value=5), **{dumps_kwargs!r})).decode())
"""
creator = subprocess.run([sys.executable, "-c", creator_script], capture_output=True, text=True)
blob = creator.stdout.strip()
loader_script = f"""
import base64
import cloudpickle
import ccflow.result.generic as generic_module
from ccflow import GenericResult
print("has GenericResult[int] before unpickle:", hasattr(generic_module, "GenericResult[int]"))
obj = cloudpickle.loads(base64.b64decode({blob!r}))
print("unpickled:", obj)
"""
result = subprocess.run([sys.executable, "-c", loader_script], capture_output=True, text=True)
print(f"\\n=== {label} ===")
print("returncode:", result.returncode)
print("stdout:", result.stdout.strip() or "<empty>")
print("stderr:", result.stderr.strip() or "<empty>")
run_case("cloudpickle default protocol", {})
run_case("cloudpickle protocol=5", {"protocol": 5})
- Run it with uv:
uv run python repro_generic_result_cloudpickle.py
- Observe that both default cloudpickle protocol and
protocol=5 fail in the
fresh loader process.
Repro 2: Ray worker, no @Flow.model
- Save or run this script:
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "ccflow",
# "cloudpickle",
# "ray",
# ]
# ///
import base64
from typing import Any
import cloudpickle
import ray
from ccflow import GenericResult
# Toggle this flag:
# - True: materializes GenericResult[int] at module top level and reproduces the failure.
# - False: avoids the top-level global-reference path and can pass, which shows why
# this bug is sensitive to where the specialization is first evaluated.
top_level_eval = True
if top_level_eval:
SPECIALIZED_RESULT = GenericResult[int]
@ray.remote
def load_payload(label: str, encoded_payload: str) -> tuple[str, bool, str]:
import base64
import traceback
import ccflow.result.generic as generic_module
import cloudpickle
from ccflow import GenericResult # noqa: F401
has_specialization = hasattr(generic_module, "GenericResult[int]")
try:
obj = cloudpickle.loads(base64.b64decode(encoded_payload))
except Exception:
return label, has_specialization, traceback.format_exc()
return label, has_specialization, f"unpickled: {obj!r}"
def make_payload(*, dumps_kwargs: dict[str, Any]) -> str:
if top_level_eval:
payload = cloudpickle.dumps(SPECIALIZED_RESULT(value=5), **dumps_kwargs)
else:
payload = cloudpickle.dumps(GenericResult[int](value=5), **dumps_kwargs)
return base64.b64encode(payload).decode()
ray.init(num_cpus=1)
try:
cases = {
"cloudpickle default protocol": make_payload(dumps_kwargs={}),
"cloudpickle protocol=5": make_payload(dumps_kwargs={"protocol": 5}),
}
refs = [load_payload.remote(label, payload) for label, payload in cases.items()]
for label, has_specialization, result in ray.get(refs):
print(f"\\n=== {label} ===")
print("worker has GenericResult[int] before unpickle:", has_specialization)
print(result.strip())
finally:
ray.shutdown()
- In this repo, run Ray repros with uv targeting the active venv so Ray workers
inherit ray correctly:
source .venv/bin/activate
uv run --active python repro_generic_result_ray_cloudpickle.py
-
With top_level_eval = True, observe the worker-side unpickle failure.
-
Set top_level_eval = False and run again. The script can pass because the
specialization is no longer serialized through the same top-level
global-reference path. This confirms the bug is tied to Pydantic's
process-local global registration side effect, not to @Flow.model.
Expected Behavior
A GenericResult[T] instance should be portable through cloudpickle across a
fresh process or Ray worker.
In particular, this should work regardless of whether GenericResult[int] has
already been materialized in the receiving process:
payload = cloudpickle.dumps(GenericResult[int](value=5))
restored = cloudpickle.loads(payload)
assert restored == GenericResult[int](value=5)
The same should hold when the payload is sent to a Ray worker.
Actual Behavior
The receiving process imports GenericResult, but GenericResult[int] is not
present in ccflow.result.generic before unpickle:
has GenericResult[int] before unpickle: False
Plain cloudpickle/fresh-process output:
=== cloudpickle default protocol ===
returncode: 1
stdout: has GenericResult[int] before unpickle: False
stderr: Traceback (most recent call last):
File "<string>", line 11, in <module>
AttributeError: Can't get attribute 'GenericResult[int]' on <module 'ccflow.result.generic' from '/Users/neej/dev/ccflow/ccflow/result/generic.py'>
=== cloudpickle protocol=5 ===
returncode: 1
stdout: has GenericResult[int] before unpickle: False
stderr: Traceback (most recent call last):
File "<string>", line 11, in <module>
AttributeError: Can't get attribute 'GenericResult[int]' on <module 'ccflow.result.generic' from '/Users/neej/dev/ccflow/ccflow/result/generic.py'>
Ray worker output with top_level_eval = True:
=== cloudpickle default protocol ===
worker has GenericResult[int] before unpickle: False
Traceback (most recent call last):
File "/Users/neej/dev/ccflow/repro_generic_result_ray_cloudpickle.py", line 55, in load_payload
obj = cloudpickle.loads(base64.b64decode(encoded_payload))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'GenericResult[int]' on <module 'ccflow.result.generic' from '/Users/neej/dev/ccflow/ccflow/result/generic.py'>
=== cloudpickle protocol=5 ===
worker has GenericResult[int] before unpickle: False
Traceback (most recent call last):
File "/Users/neej/dev/ccflow/repro_generic_result_ray_cloudpickle.py", line 55, in load_payload
obj = cloudpickle.loads(base64.b64decode(encoded_payload))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'GenericResult[int]' on <module 'ccflow.result.generic' from '/Users/neej/dev/ccflow/ccflow/result/generic.py'>
This reproduces with both cloudpickle's default protocol and protocol=5, so
the failure is not caused by protocol 5.
Environment
- OS: macOS 26.2, build 25C56
- Python version: Python 3.12.12 (
uv run python --version)
- Package version: ccflow 0.8.3
- cloudpickle: 3.1.2
- ray: 2.55.1
- pydantic: 2.13.4
- cclfow: main (commit: 17905fb )
Package versions were collected with:
uv run python -c "import importlib.metadata as m; print('ccflow', m.version('ccflow')); print('cloudpickle', m.version('cloudpickle')); print('ray', m.version('ray')); print('pydantic', m.version('pydantic'))"
Additional Context
Pydantic's generic specialization creation mutates module globals only in some
call contexts. This can be observed directly:
import ccflow.result.generic as generic_module
from ccflow import GenericResult
assert not hasattr(generic_module, "GenericResult[int]")
cls = GenericResult[int]
assert hasattr(generic_module, "GenericResult[int]")
assert generic_module.__dict__["GenericResult[int]"] is cls
That side effect is process-local. A fresh Ray worker may import
ccflow.result.generic and GenericResult without ever evaluating
GenericResult[int], so the module global expected by pickle does not exist.
The robust fix should avoid relying on Pydantic's process-local global
registration for ccflow generic result/context instances. Instead, generic
instances should be serialized by contract:
- origin class, e.g.
GenericResult;
- generic args, e.g.
(int,);
- Pydantic instance state.
Conceptually:
def restore_generic_pydantic_instance(origin, args, state):
cls = origin[args] if args else origin
obj = cls.__new__(cls)
obj.__setstate__(state)
return obj
This should probably be fixed at the ResultBase/ContextBase serialization
layer.
One uv/Ray caveat from this repo: plain uv run python repro_generic_result_ray_cloudpickle.py may cause Ray workers to bootstrap a
temporary uv environment that does not include ray, producing
ModuleNotFoundError: No module named 'ray' before the intended repro executes.
Using uv run --active from the repo venv reaches the actual cloudpickle
failure shown above.
Description
GenericResult[int]is a runtime-created Pydantic generic specialization. Whenthat specialization is created at module top level, Pydantic registers it in
ccflow.result.genericunder the generated nameGenericResult[int]so picklecan serialize it by global reference.
That global registration is process-local. If one process cloudpickles a
GenericResult[int]instance by reference, and a fresh process or Ray workertries to unpickle it before that worker has created the same specialization,
unpickle fails with:
It is a base serialization issue for
runtime-created Pydantic generic result classes.
@Flow.model(from #206) and Ray make thebug much easier to hit because generated model state or distributed worker
payloads can contain
GenericResult[T]instances without importing/re-evaluatingthe source module in a way that materializes the specialization first.
The behavior is sensitive to where
GenericResult[int]is first evaluated:GenericResult[int]is evaluated at module top level, Pydantic installs amodule global and cloudpickle may serialize by global reference. This fails in
a fresh process that lacks that process-local global.
GenericResult[int]is created only inside a function, cloudpickle mayserialize the dynamic class by value instead. That can pass, which masks the
issue and makes the failure look import-order dependent.
Steps to Reproduce
Repro 1: plain cloudpickle, no Ray, no
@Flow.modelprotocol=5fail in thefresh loader process.
Repro 2: Ray worker, no
@Flow.modelinherit
raycorrectly:source .venv/bin/activate uv run --active python repro_generic_result_ray_cloudpickle.pyWith
top_level_eval = True, observe the worker-side unpickle failure.Set
top_level_eval = Falseand run again. The script can pass because thespecialization is no longer serialized through the same top-level
global-reference path. This confirms the bug is tied to Pydantic's
process-local global registration side effect, not to
@Flow.model.Expected Behavior
A
GenericResult[T]instance should be portable through cloudpickle across afresh process or Ray worker.
In particular, this should work regardless of whether
GenericResult[int]hasalready been materialized in the receiving process:
The same should hold when the payload is sent to a Ray worker.
Actual Behavior
The receiving process imports
GenericResult, butGenericResult[int]is notpresent in
ccflow.result.genericbefore unpickle:Plain cloudpickle/fresh-process output:
Ray worker output with
top_level_eval = True:This reproduces with both cloudpickle's default protocol and
protocol=5, sothe failure is not caused by protocol 5.
Environment
uv run python --version)Package versions were collected with:
uv run python -c "import importlib.metadata as m; print('ccflow', m.version('ccflow')); print('cloudpickle', m.version('cloudpickle')); print('ray', m.version('ray')); print('pydantic', m.version('pydantic'))"Additional Context
Pydantic's generic specialization creation mutates module globals only in some
call contexts. This can be observed directly:
That side effect is process-local. A fresh Ray worker may import
ccflow.result.genericandGenericResultwithout ever evaluatingGenericResult[int], so the module global expected by pickle does not exist.The robust fix should avoid relying on Pydantic's process-local global
registration for ccflow generic result/context instances. Instead, generic
instances should be serialized by contract:
GenericResult;(int,);Conceptually:
This should probably be fixed at the
ResultBase/ContextBaseserializationlayer.
One uv/Ray caveat from this repo: plain
uv run python repro_generic_result_ray_cloudpickle.pymay cause Ray workers to bootstrap atemporary uv environment that does not include
ray, producingModuleNotFoundError: No module named 'ray'before the intended repro executes.Using
uv run --activefrom the repo venv reaches the actual cloudpicklefailure shown above.