Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic task mapping zip() iterates unexpected number of times #26499

Closed
1 of 2 tasks
BasPH opened this issue Sep 19, 2022 · 8 comments · Fixed by #26636
Closed
1 of 2 tasks

Dynamic task mapping zip() iterates unexpected number of times #26499

BasPH opened this issue Sep 19, 2022 · 8 comments · Fixed by #26636
Assignees
Labels
affected_version:2.4 Issues Reported for 2.4 area:core kind:bug This is a clearly a bug

Comments

@BasPH
Copy link
Contributor

BasPH commented Sep 19, 2022

Apache Airflow version

2.4.0

What happened

When running zip() with different-length lists, I get an unexpected result:

from datetime import datetime

from airflow import DAG
from airflow.decorators import task

with DAG(
    dag_id="demo_dynamic_task_mapping_zip",
    start_date=datetime(2022, 1, 1),
    schedule=None,
):

    @task
    def push_letters():
        return ["a", "b", "c"]

    @task
    def push_numbers():
        return [1, 2, 3, 4]

    @task
    def pull(value):
        print(value)

    pull.expand(value=push_letters().zip(push_numbers()))

Iterates over [("a", 1), ("b", 2), ("c", 3), ("a", 1)], so it iterates for the length of the longest collection, but restarts iterating elements when reaching the length of the shortest collection.

I would expect it to behave like Python's builtin zip and iterate for the length of the shortest collection, so 3x in the example above, i.e. [("a", 1), ("b", 2), ("c", 3)].

Additionally, I went digging in the source code and found the fillvalue argument which works as expected:

pull.expand(value=push_letters().zip(push_numbers(), fillvalue="foo"))

Iterates over [("a", 1), ("b", 2), ("c", 3), ("foo", 4)].

However, with fillvalue not set, I would expect it to iterate only for the length of the shortest collection.

What you think should happen instead

I expect zip() to iterate over the number of elements of the shortest collection (without fillvalue set).

How to reproduce

See above.

Operating System

MacOS

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

OSS Airflow

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@BasPH BasPH added kind:bug This is a clearly a bug area:core labels Sep 19, 2022
@uranusjr uranusjr self-assigned this Sep 20, 2022
@tirkarthi
Copy link
Contributor

As per my debugging I guess the NOTSET object being set is different across task runs in the scheduler is different causing conditionals to return False. The exact case reported is present as test case but I guess since it's run with pytest as single process this bug is exposed only when using the scheduler.

if self.fillvalue is NOTSET:

@pytest.mark.parametrize(
"fillvalue, expected_results",
[
(NOTSET, {("a", 1), ("b", 2), ("c", 3)}),
(None, {("a", 1), ("b", 2), ("c", 3), (None, 4)}),
],
)
def test_xcom_zip(dag_maker, session, fillvalue, expected_results):
results = set()
with dag_maker(session=session) as dag:
@dag.task
def push_letters():
return ["a", "b", "c"]
@dag.task
def push_numbers():
return [1, 2, 3, 4]
@dag.task
def pull(value):
results.add(value)
pull.expand(value=push_letters().zip(push_numbers(), fillvalue=fillvalue))
dr = dag_maker.create_dagrun()
# Run "push_letters" and "push_numbers".
decision = dr.task_instance_scheduling_decisions(session=session)
assert decision.schedulable_tis and all(ti.task_id.startswith("push_") for ti in decision.schedulable_tis)
for ti in decision.schedulable_tis:
ti.run(session=session)
session.commit()
# Run "pull".
decision = dr.task_instance_scheduling_decisions(session=session)
assert decision.schedulable_tis and all(ti.task_id == "pull" for ti in decision.schedulable_tis)
for ti in decision.schedulable_tis:
ti.run(session=session)
assert results == expected_results

@rjmcginness
Copy link
Contributor

rjmcginness commented Sep 21, 2022

Hello, I am new here, but I have been wanting to contribute to this project. I do not want to create a random pull request, so I am showing my changes here. I did run the pre-commit and unit test shown above. I was not able to run the DAG above in Docker, however on macOS. My thought on this is that there may be different instances of NOTSET and, therefore, comparison is not working. It compares against the object, not a value. In order to solve this, perhaps testing against the class ArgNotSet would be more reliable. I would create a PR, but I would like to test against the failed use case and do not want to violate the contribution decorum.

class _ZipResult(Sequence):
    def __init__(self, values: Sequence[Sequence | dict], *, fillvalue: Any = NOTSET) -> None:
        self.values = values
        self.fillvalue = fillvalue
        # use the generator here, rather than in __len__ to improve efficiency
        lengths = (len(v) for v in self.values)
        self.length = min(lengths) if isinstance(self.fillvalue, ArgNotSet) else max(lengths)

    @staticmethod
    def _get_or_fill(container: Sequence | dict, index: Any, fillvalue: Any) -> Any:
        try:
            return container[index]
        except (IndexError, KeyError):
            return fillvalue

    def __getitem__(self, index: Any) -> Any:
        if index >= len(self):
            raise IndexError(index)
        return tuple(self._get_or_fill(value, index, self.fillvalue) for value in self.values)

    def __len__(self) -> int:
        return self.length

@ashb ashb added the affected_version:2.4 Issues Reported for 2.4 label Sep 21, 2022
rjmcginness added a commit to rjmcginness/airflow that referenced this issue Sep 22, 2022
@uranusjr
Copy link
Member

@rjmcginness You patch would break fillvalue and needs some additional work.

@tirkarthi I would expect the resulting value to contain NOTSET instances instead of wrapping over to the beginning, if the NOTSET identity is the cause. (I may be missing something.) The issue not being observed in tests does seem to indicate there’s something related to serialisation though.

@rjmcginness
Copy link
Contributor

rjmcginness commented Sep 22, 2022

@uranusjr Thank you. I was thinking that fillvalue would remain the same. It still receives an instance of NOTSET or a value. My code changes the test against the class, rather than against any instance of ArgNotSet. I did test this against the use case successfully. It seems strange that there would be multiple instances of NOTSET. Serialization/deserialization makes sense, as this may lead to creation of new instances. What was your thought on breaking fillvalue?

@uranusjr
Copy link
Member

Ah, I missed the self.length part you changed in __init__. That could be the solution (if the identity is indeed the issue), although I’d prefer the logic to live in __len__ instead of __init__. But if serialisation is the issue (and again, I am not yet entirely convinced that’s the case), the pre-commit checks and unit tests would not be enough to verify the solution. The easiest way to check would be for someone to actually run a DAG against this patch.

@rjmcginness
Copy link
Contributor

I did. I asked on slack to find where the DAG file should go. I reproduced the error in v2.4.0. Then I ran it with my new code and received the expected output. I committed the changes back to my forked repository, but have not made a pull request.

@uranusjr
Copy link
Member

A pull request would be a good idea then. Maybe it’d be easier to figure out what exactly triggered the error if the full diff is presented.

@rjmcginness
Copy link
Contributor

@uranusjr Ok. Thank you. I have to look how to do the rebasing, then I will request the pull. I appreciate your help.

rjmcginness added a commit to rjmcginness/airflow that referenced this issue Sep 23, 2022
rjmcginness added a commit to rjmcginness/airflow that referenced this issue Sep 23, 2022
rjmcginness added a commit to rjmcginness/airflow that referenced this issue Sep 26, 2022
rjmcginness added a commit to rjmcginness/airflow that referenced this issue Sep 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.4 Issues Reported for 2.4 area:core kind:bug This is a clearly a bug
Projects
None yet
5 participants