Skip to content

Commit

Permalink
clean up the API
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed Mar 8, 2024
1 parent 68c2cc8 commit d78ce00
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 35 deletions.
14 changes: 13 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3034,6 +3034,16 @@ def bulk_sync_to_db(
)
return cls.bulk_write_to_db(dags=dags, session=session)

def simplify_dataset_expression(self, dataset_expression) -> dict | None:
"""Simplifies a nested dataset expression into a 'any' or 'all' format with URIs."""
if dataset_expression is None:
return None
if dataset_expression.get("__type") == "dataset":
return dataset_expression["__var"]["uri"]

new_key = "any" if dataset_expression["__type"] == "dataset_any" else "all"
return {new_key: [self.simplify_dataset_expression(item) for item in dataset_expression["__var"]]}

@classmethod
@provide_session
def bulk_write_to_db(
Expand Down Expand Up @@ -3119,7 +3129,9 @@ def bulk_write_to_db(
)
orm_dag.schedule_interval = dag.schedule_interval
orm_dag.timetable_description = dag.timetable.description
orm_dag.dataset_expression = BaseSerialization.serialize(dag.dataset_triggers) or None
orm_dag.dataset_expression = dag.simplify_dataset_expression(
BaseSerialization.serialize(dag.dataset_triggers)
)

orm_dag.processor_subdir = processor_subdir

Expand Down
42 changes: 8 additions & 34 deletions tests/api_connexion/endpoints/test_dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,23 +154,10 @@ def _create_dag_model_for_details_endpoint_with_dataset_expression(self, dag_id,
is_active=True,
is_paused=False,
dataset_expression={
"__type": "dataset_any",
"__var": [
{"__type": "dataset", "__var": {"extra": {"hi": "bye"}, "uri": "s3://dag1/output_1.txt"}},
{
"__type": "dataset_all",
"__var": [
{
"__type": "dataset",
"__var": {"extra": {"hi": "bye"}, "uri": "s3://dag2/output_1.txt"},
},
{
"__type": "dataset",
"__var": {"extra": {"hi": "bye"}, "uri": "s3://dag3/output_3.txt"},
},
],
},
],
"any": [
"s3://dag1/output_1.txt",
{"all": ["s3://dag2/output_1.txt", "s3://dag3/output_3.txt"]},
]
},
)
session.add(dag_model)
Expand Down Expand Up @@ -401,23 +388,10 @@ def test_should_respond_200_with_dataset_expression(self, url_safe_serializer):
"dag_id": "test_dag",
"dag_run_timeout": None,
"dataset_expression": {
"__type": "dataset_any",
"__var": [
{"__type": "dataset", "__var": {"extra": {"hi": "bye"}, "uri": "s3://dag1/output_1.txt"}},
{
"__type": "dataset_all",
"__var": [
{
"__type": "dataset",
"__var": {"extra": {"hi": "bye"}, "uri": "s3://dag2/output_1.txt"},
},
{
"__type": "dataset",
"__var": {"extra": {"hi": "bye"}, "uri": "s3://dag3/output_3.txt"},
},
],
},
],
"any": [
"s3://dag1/output_1.txt",
{"all": ["s3://dag2/output_1.txt", "s3://dag3/output_3.txt"]},
]
},
"default_view": None,
"description": None,
Expand Down

0 comments on commit d78ce00

Please sign in to comment.