Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic dataset definition #40039

Open
2 tasks done
uranusjr opened this issue Jun 4, 2024 · 0 comments
Open
2 tasks done

Dynamic dataset definition #40039

uranusjr opened this issue Jun 4, 2024 · 0 comments
Labels
kind:feature Feature Requests

Comments

@uranusjr
Copy link
Member

uranusjr commented Jun 4, 2024

Description

This proposes a new class DatasetAlias to put in attributes that accept dataset values, including outlets, inlets on a task, and schedule on a DAG. A dataset alias is resolved at runtime (when the task executes) to emit an event of a “real” dataset, with association to the alias. Downstreams can depend on either the resolved dataset, or the alias itself.

Use case/motivation

This is inspired by #34206, but I want to still record some information on the dynamic resolution. It also allows the task to still be depended on as a whole, without needing to declare a separate dataset.

The new class has one single argument name that niquely identifies the group, similar to Dataset.uri, but this does not need to follow any format. (Maybe with the same restrictions as dag_id etc.) Alias names share the same namespace as dataset URIs to avoid user confusion. (Not sure how best to enforce this though.)

DatasetAlias(name: str)

(Note: Better ideas for the class name are welcomed.)

Emitting an event associated to a dataset alias

Events are not emitted against an alias; a task emit “normal” dataset events instead, and they are associated to the alias. The task must first declare the alias as an outlet, and use outlet_events to add events to it:

@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task(*, ds, outlet_events):
    outlet_events["my-task-outputs"].add(Dataset(f"s3://bucket/my-task/{ds}"))

This creates a dataset event against the S3 URI normally, but also associates the event to the alias my-task-outputs. A stub lineage airflow://datasets/my-task-outputs will also be emitted in addition to normal lineage values.

You can also attach extra information when adding the event with the extra parameter:

def add(
    self,
    dataset: Dataset | str,
    extra: dict[str, Any] | None = None,
) -> None:
    ...

This can also be done with Metadata. Note that arguments are arranged a bit differently here:

@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task(*, ds):
    s3_dataset = Dataset(f"s3://bucket/my-task/{ds}")
    yield Metadata(s3_dataset, extra={"k": "v"}, alias="my-task-outputs")

It should also be possible to resolve multiple aliases into one dataset by passing a list of aliases:

@task(outlets=[DatasetAlias("out1"), DatasetAlias("out2")])
def my_task(*, ds):
    s3_dataset = Dataset(f"s3://bucket/my-task/{ds}")
    yield Metadata(s3_dataset, extra={"k": "v"}, alias=["out1", "out2"])

Only one dataset event is emitted for an added dataset, even if it is added to the alias multiple times, or added to multiple aliases. Extra information between each addition is merged.

Adding an event to multiple aliases

A problem arises when multiple aliases are resolved into the same dataset, but with different event extra information. An example:

@task(outlets=[DatasetAlias("out1"), DatasetAlias("out2")])
def my_task(*, ds, outlet_events):
    dataset = Dataset(f"s3://bucket/my-task/{ds}")
    outlet_events["out1"].add(dataset, extra={"foo": 1})
    outlet_events["out2"].add(dataset, extra={"foo": 2})

As mentioned above, only one dataset event is generally emitted from one dataset, even if it is associated with multiple aliases. This is not possible when each alias requires a different extra. Here, two dataset events will be emitted, one with extra {"foo": 1} and associated with alias out1, the other with {"foo": 2} and associated with out2.

Depending on a dataset alias

Since events added to an alias are just simple events, a downstream depending on the actual dataset can read events of it normally, without considering the associated aliases. For example:

DAG(..., schedule=Dataset("s3://bucket/my-task/2024-05-30"))

This DAG can be triggered by a task with outlets=DatasetAlias("xxx") if and only if the alias is resolved into Dataset("s3://bucket/my-task/2024-05-30").

A downstream can also depend on a dataset alias, both to use it for scheduling, or as an inlet data source. In either case, the authoring syntax is straightforward: Reference the DatasetAlias by name, and the associated events are picked up for scheduling:

DAG(..., schedule=DatasetAlias("out"))

The DAG runs whenever a task with outlet DatasetAlias("out") gets associated with at least one dataset at runtime, regardless of the dataset’s identity. The downstream DAG is not triggered if no datasets are associated to the alias for a particular given task run. This also means we can do conditional dataset-triggering!

Database changes

A new table will be added to store the aliases similar to datasets. When an alias is resolved, the corresponding dataset event (and the dataset, if needed) is created. A through-table will be added so dataset events and dataset aliases have a many-to-many relationship to track what aliases are associated to a given event.

UI and API considerations

Since it is not possible to know what an alias resolves into at DAG-parsing time, we can’t connect an upstream declaring a DatasetAlias outlet to a downstream declaring dependency on a “real” dataset. (It is of course not a problem if the downstream depends on the alias itself.) Do we want to do something with this in the Datasets View?

It is possible to look into the upstream’s history to know which datasets it has emitted previously, but that could be potentially resource-intensive to fetch the information. It is also unclear how the information can be best displayed; simply adding lines for all past events can be unclear (and confusing with the non-alias dependencies). Not sure how this can be best handled. I’m leaning toward just not bothering with this for now, and discuss with users about possible presentations later after we have real-world use cases.

I have not thought this through, to be honest. We’ll need to figure this out as we go.

Related issues

See also #34206.

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@uranusjr uranusjr added kind:feature Feature Requests needs-triage label for new issues that we didn't triage yet labels Jun 4, 2024
@uranusjr uranusjr added this to the Airflow 2.10.0 milestone Jun 4, 2024
@uranusjr uranusjr removed the needs-triage label for new issues that we didn't triage yet label Jun 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:feature Feature Requests
Projects
None yet
Development

No branches or pull requests

1 participant