Skip to content

Flows annotations in python 3.9 #17520

@pietrodantuono

Description

@pietrodantuono

Bug summary

When annotating a flow in Python 3.9 I would expect that, if importing annotations from __future__, I would be able to use the pipe instead of typing.Union, instead an error is thrown.

Note

No issues in prefect.task instead.

See this example:

from __future__ import annotations
from prefect import task, flow

@task
def convert_to_int(var: int | float | str) -> int:  # pipe annotations in tasks work as expected
    """Convert a variable to integer based on its type."""
    if isinstance(var, (int, float)):
        return int(var)
    elif isinstance(var, str):
        try:
            if var.startswith('0x'):
                return int(var, 16)
            else:
                return int(var)
        except ValueError:
            # Convert string to its ASCII/Unicode values and sum them
            return sum(ord(c) for c in var)
    else:
        raise TypeError("Input must be numeric or string")

@task
def calculate_hex_sum(val1: int, val2: int) -> str:
    """Calculate the sum of two integers and return as hex without '0x' prefix."""
    return hex(val1 + val2)[2:]  # Remove '0x' prefix

@flow(name="Hex Sum Flow")
def hex_sum(var1: int | float | str, var2: int | float | str) -> str:  # works with typing.Union[int, float, str], raises with int | float | str
    """
    Convert two variables to hexadecimal and return their sum in hex format.
    
    Args:
        var1: First variable (string or numeric)
        var2: Second variable (string or numeric)
    
    Returns:
        String: Hexadecimal sum without '0x' prefix
    """
    val1 = convert_to_int(var1)
    val2 = convert_to_int(var2)
    return calculate_hex_sum(val1, val2)

# Example usage
if __name__ == "__main__":
    print("Sum of 10 and 20:", hex_sum(10, 20))
    print("Sum of 'abc' and 123:", hex_sum('abc', 123))
    print("Sum of '0x1a' and 10:", hex_sum('0x1a', 10))
    print("Sum of 'hello' and 'world':", hex_sum('hello', 'world'))

Error message

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[1], line 27
     23     """Calculate the sum of two integers and return as hex without '0x' prefix."""
     24     return hex(val1 + val2)[2:]  # Remove '0x' prefix
     26 @flow(name="Hex Sum Flow")
---> 27 def hex_sum(var1: int | float | str, var2: int | float | str) -> str:
     28     """
     29     Convert two variables to hexadecimal and return their sum in hex format.
     30     
   (...)
     36         String: Hexadecimal sum without '0x' prefix
     37     """
     38     val1 = convert_to_int(var1)

File ~/example/.venv/lib/python3.9/site-packages/prefect/flows.py:1904, in FlowDecorator.__call__(self, _FlowDecorator__fn, name, version, flow_run_name, retries, retry_delay_seconds, task_runner, description, timeout_seconds, validate_parameters, persist_result, result_storage, result_serializer, cache_result_in_memory, log_prints, on_completion, on_failure, on_cancellation, on_crashed, on_running)
   1900         method_decorator = type(__fn).__name__
   1901         raise TypeError(
   1902             f"@{method_decorator} should be applied on top of @flow"
   1903         )
-> 1904     return Flow(
   1905         fn=__fn,
   1906         name=name,
   1907         version=version,
   1908         flow_run_name=flow_run_name,
   1909         task_runner=task_runner,
   1910         description=description,
   1911         timeout_seconds=timeout_seconds,
   1912         validate_parameters=validate_parameters,
   1913         retries=retries,
   1914         retry_delay_seconds=retry_delay_seconds,
   1915         persist_result=persist_result,
   1916         result_storage=result_storage,
   1917         result_serializer=result_serializer,
   1918         cache_result_in_memory=cache_result_in_memory,
   1919         log_prints=log_prints,
   1920         on_completion=on_completion,
   1921         on_failure=on_failure,
   1922         on_cancellation=on_cancellation,
   1923         on_crashed=on_crashed,
   1924         on_running=on_running,
   1925     )
   1926 else:
   1927     return cast(
   1928         Callable[[Callable[P, R]], Flow[P, R]],
   1929         partial(
   (...)
   1950         ),
   1951     )

File ~/example/.venv/lib/python3.9/site-packages/prefect/flows.py:356, in Flow.__init__(self, fn, name, version, flow_run_name, retries, retry_delay_seconds, task_runner, description, timeout_seconds, validate_parameters, persist_result, result_storage, result_serializer, cache_result_in_memory, log_prints, on_completion, on_failure, on_cancellation, on_crashed, on_running)
    350 if self.should_validate_parameters:
    351     # Try to create the validated function now so that incompatibility can be
    352     # raised at declaration time rather than at runtime
    353     # We cannot, however, store the validated function on the flow because it
    354     # is not picklable in some environments
    355     try:
--> 356         ValidatedFunction(self.fn, config={"arbitrary_types_allowed": True})
    357     except ConfigError as exc:
    358         raise ValueError(
    359             "Flow function is not compatible with `validate_parameters`. "
    360             "Disable validation or change the argument names."
    361         ) from exc

File ~/example/.venv/lib/python3.9/site-packages/pydantic/v1/decorator.py:78, in ValidatedFunction.__init__(self, function, config)
     75 self.v_args_name = 'args'
     76 self.v_kwargs_name = 'kwargs'
---> 78 type_hints = get_all_type_hints(function)
     79 takes_args = False
     80 takes_kwargs = False

File ~/example/.venv/lib/python3.9/site-packages/pydantic/v1/typing.py:80, in get_all_type_hints(obj, globalns, localns)
     79 def get_all_type_hints(obj: Any, globalns: Any = None, localns: Any = None) -> Any:
---> 80     return get_type_hints(obj, globalns, localns, include_extras=True)

File ~/.local/share/uv/python/cpython-3.9.21-linux-x86_64-gnu/lib/python3.9/typing.py:1497, in get_type_hints(obj, globalns, localns, include_extras)
   1489 if isinstance(value, str):
   1490     # class-level forward refs were handled above, this must be either
   1491     # a module-level annotation or a function argument annotation
   1492     value = ForwardRef(
   1493         value,
   1494         is_argument=not isinstance(obj, types.ModuleType),
   1495         is_class=False,
   1496     )
-> 1497 value = _eval_type(value, globalns, localns)
   1498 if name in defaults and defaults[name] is None:
   1499     value = Optional[value]

File ~/.local/share/uv/python/cpython-3.9.21-linux-x86_64-gnu/lib/python3.9/typing.py:292, in _eval_type(t, globalns, localns, recursive_guard)
    286 """Evaluate all forward references in the given type t.
    287 For use of globalns and localns see the docstring for get_type_hints().
    288 recursive_guard is used to prevent infinite recursion with a recursive
    289 ForwardRef.
    290 """
    291 if isinstance(t, ForwardRef):
--> 292     return t._evaluate(globalns, localns, recursive_guard)
    293 if isinstance(t, (_GenericAlias, GenericAlias)):
    294     ev_args = tuple(_eval_type(a, globalns, localns, recursive_guard) for a in t.__args__)

File ~/.local/share/uv/python/cpython-3.9.21-linux-x86_64-gnu/lib/python3.9/typing.py:554, in ForwardRef._evaluate(self, globalns, localns, recursive_guard)
    549 if self.__forward_module__ is not None:
    550     globalns = getattr(
    551         sys.modules.get(self.__forward_module__, None), '__dict__', globalns
    552     )
    553 type_ = _type_check(
--> 554     eval(self.__forward_code__, globalns, localns),
    555     "Forward references must evaluate to types.",
    556     is_argument=self.__forward_is_argument__,
    557     allow_special_forms=self.__forward_is_class__,
    558 )
    559 self.__forward_value__ = _eval_type(
    560     type_, globalns, localns, recursive_guard | {self.__forward_arg__}
    561 )
    562 self.__forward_evaluated__ = True

File <string>:1

TypeError: unsupported operand type(s) for |: 'type' and 'type'

Version info

Version:             3.2.11
API version:         0.8.4
Python version:      3.9.21
Git commit:          9481694f
Built:               Wed, Mar 5, 2025 10:00 PM
OS/Arch:             linux/x86_64
Profile:             local
Server type:         ephemeral
Pydantic version:    2.10.6
Server:
  Database:          sqlite
  SQLite version:    3.47.1

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingupstream dependencyAn upstream issue caused by a bug in one of our dependencies

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions