Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide consumer DAGs with context about the dag run(s) of its producer(s) #33088

Open
1 of 2 tasks
RNHTTR opened this issue Aug 3, 2023 · 3 comments
Open
1 of 2 tasks
Labels
area:datasets Issues related to the datasets feature kind:feature Feature Requests

Comments

@RNHTTR
Copy link
Contributor

RNHTTR commented Aug 3, 2023

Description

As far as I can tell, there's currently no way for a consumer DAG to access information of the producer DAG(s) that triggered it. It'd be helpful if there was a producers context variable available to task instances in a consumer that had information such as the producer(s)' dag run and task instance (i.e. the task instance in the producer DAG that wrote to the outlet) objects.

Use case/motivation

See the associated discussion.

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@RNHTTR RNHTTR added kind:feature Feature Requests needs-triage label for new issues that we didn't triage yet area:datasets Issues related to the datasets feature and removed needs-triage label for new issues that we didn't triage yet labels Aug 3, 2023
@scr-oath
Copy link

scr-oath commented Aug 4, 2023

Yes, I agree - and if there are multiple triggers such as schedule=[Dataset("one"), Dataset("two")] it would be quite generally useful to get information about each triggering event - I'm not sure exactly what "coordinates" exist for all the information about a dag or task "instance" but if a task has an outlet of a Dataset, then the "downstream" / "consumer" DAGs and their tasks should be able to learn about the information about the task instance that triggered them and its XCOM information.

@scr-oath
Copy link

scr-oath commented Aug 4, 2023

It seems like - I could be wrong - task_id could be enough to get the exact xcom information.

So… if a TaskInstance is able to know how it was triggered - and, when scheduled is datasets, have corresponding list or map of dataset to the task_id that had it as an "outlet" that would be a reasonable solution to hook up the guts that could use the existing constructs to just xcom_pull from there as needed.

Pull XComs that optionally meet certain criteria.

        :param key: A key for the XCom. If provided, only XComs with matching
            keys will be returned. The default key is ``'return_value'``, also
            available as constant ``XCOM_RETURN_KEY``. This key is automatically
            given to XComs returned by tasks (as opposed to being pushed
            manually). To remove the filter, pass *None*.
        :param task_ids: Only XComs from tasks with matching ids will be
            pulled. Pass *None* to remove the filter.
        :param dag_id: If provided, only pulls XComs from this DAG. If *None*
            (default), the DAG of the calling task is used.
        :param map_indexes: If provided, only pull XComs with matching indexes.
            If *None* (default), this is inferred from the task(s) being pulled
            (see below for details).
        :param include_prior_dates: If False, only XComs from the current
            execution_date are returned. If *True*, XComs from previous dates
            are returned as well.

@scr-oath
Copy link

scr-oath commented Aug 4, 2023

Just thinking out loud, but if TaskInstance could have a copy of the schedule from its dag and a field (say outlet_task_id added to the Dataset class and populated when a task is triggered by a dataset, that might also be a good place to put it…

class Dataset:
    """A Dataset is used for marking data dependencies between workflows."""

    uri: str = attr.field(validator=[attr.validators.min_len(1), attr.validators.max_len(3000)])
    extra: dict[str, Any] | None = None
    outlet_task_id: Optional[str] = None

    version: ClassVar[int] = 1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:datasets Issues related to the datasets feature kind:feature Feature Requests
Projects
None yet
Development

No branches or pull requests

2 participants