Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] map_task / ArrayNode workflows cannot be used with FlyteRemote #5359

Closed
2 tasks done
cosmicBboy opened this issue May 13, 2024 · 1 comment
Closed
2 tasks done
Assignees
Labels
backlogged For internal use. Reserved for contributor team workflow. bug Something isn't working flytekit FlyteKit Python related issue

Comments

@cosmicBboy
Copy link
Contributor

cosmicBboy commented May 13, 2024

Describe the bug

Doing remote.wait(ex) on an execution that contains an ArrayNode errors out:

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/remote/remote.py:1891: in wait
    execution = self.sync_execution(execution, sync_nodes=sync_nodes)
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/remote/remote.py:1986: in sync_execution
    node_execs[n.id.node_id] = self.sync_node_execution(n, node_mapping)  # noqa
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/remote/remote.py:2116: in sync_node_execution
    logger.error(f"NE {execution} undeterminable, {type(execution._node)}, {execution._node}")
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/models/common.py:54: in __str__
    return self.verbose_string()
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/models/common.py:71: in verbose_string
    return self.short_string()
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/models/common.py:63: in short_string
    literal_str = re.sub(r"\s+", " ", str(self.to_flyte_idl())).strip()
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/models/core/workflow.py:569: in to_flyte_idl
    array_node=self.array_node.to_flyte_idl() if self.array_node else None,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <[AttributeError("'int' object has no attribute 'to_flyte_idl'") raised in repr()] FlyteArrayNode object at 0x111393750>

    def to_flyte_idl(self) -> _core_workflow.ArrayNode:
        return _core_workflow.ArrayNode(
>           node=self._node.to_flyte_idl() if self._node is not None else None,
            parallelism=self._parallelism,
            min_successes=self._min_successes,
            min_success_ratio=self._min_success_ratio,
        )
E       AttributeError: 'int' object has no attribute 'to_flyte_idl'

../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/models/core/workflow.py:400: AttributeError

This should work.

Expected behavior

Users should be able to use FlyteRemote to fetch/sync/wait on executions containing ArrayNodes.

Additional context to reproduce

Workflow file:

from functools import partial
from flytekit import task, map_task, workflow


@task
def fn(x: int, y: int) -> int:
    return x + y


@workflow
def workflow_with_maptask(data: list[int], y: int) -> list[int]:
    partial_fn = partial(fn, y=y)
    return map_task(partial_fn)(x=data)

pytest test to reproduce:

import re
from datetime import timedelta
from subprocess import run
from time import sleep

from flytekit import WorkflowExecutionPhase
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config


def test_workflow_with_maptask(workflows_dir):
    """Check simple hello world example.

    1. Run map_tasks.py
    2. Checks output is a list of [x + y, ...]
    """
    result = run(
        [
            "unionai",
            "--config",
            "<path/to/config.yaml>",
            "run",
            "--remote",
            "map_tasks.py",
            "workflow_with_maptask",
            "--data",
            "[1, 2, 3, 4, 5]",
            "--y",
            "1",
        ],
        cwd=workflows_dir, text=True, check=True, capture_output=True
    )
    match = re.search(r"executions/(\w+)", result.stdout)

    execution_id = match.group(1)
    remote = FlyteRemote(
        config=Config.for_endpoint("dns:///demo.hosted.unionai.cloud"),
        default_project="flytesnacks",
        default_domain="development",
    )

    ex = remote.fetch_execution(name=execution_id)
    ex = remote.wait(ex, poll_interval=timedelta(seconds=1))  # 👈 error happens here

    assert ex.closure.phase == WorkflowExecutionPhase.SUCCEEDED
    assert ex.outputs["o0"] == [2, 3, 4, 5, 6]

Screenshots

No response

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@cosmicBboy cosmicBboy added bug Something isn't working untriaged This issues has not yet been looked at by the Maintainers labels May 13, 2024
Copy link

runllm bot commented May 13, 2024

The issue you're encountering with FlyteRemote when using ArrayNode workflows is related to a known limitation in handling these node types. The error arises because the ArrayNode object is not correctly handling its to_flyte_idl method, which is expected to serialize the node for communication with the Flyte backend. This method attempts to call to_flyte_idl on an integer, which is not valid and results in an AttributeError.

This problem is indicative of a bug in the FlyteKit's handling of ArrayNode objects, particularly in their serialization process. The error traceback you provided suggests that the ArrayNode class or its usage is assuming that all properties of the node can be directly serialized, which is not the case here.

To address this issue, it would require a code change in the FlyteKit library to ensure that ArrayNode objects are correctly handled during serialization and deserialization processes. This might involve checking the types of properties and ensuring they are instances of classes that have a to_flyte_idl method before calling it.

Would you like more detailed information on potential fixes or workarounds for this issue?

Sources:


This is an AI-generated response and your feedback is appreciated! Please leave a 👍 if this is helpful and 👎 if it is not.

Mention @RunLLM to ask follow-up questions. See runllm.com for more!

@thomasjpfan thomasjpfan added flytekit FlyteKit Python related issue backlogged For internal use. Reserved for contributor team workflow. and removed untriaged This issues has not yet been looked at by the Maintainers labels May 16, 2024
@wild-endeavor wild-endeavor self-assigned this May 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backlogged For internal use. Reserved for contributor team workflow. bug Something isn't working flytekit FlyteKit Python related issue
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants