Skip to content

Commit

Permalink
Fix based on PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed Mar 7, 2024
1 parent 65a0c8e commit 28cddb0
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 17 deletions.
1 change: 0 additions & 1 deletion airflow/api_connexion/endpoints/dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ def get_dag_details(
for key, value in dag.__dict__.items():
if not key.startswith("_") and not hasattr(dag_model, key):
setattr(dag_model, key, value)

try:
dag_detail_schema = DAGDetailSchema(only=fields) if fields else DAGDetailSchema()
except ValueError as e:
Expand Down
7 changes: 1 addition & 6 deletions airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class DAGDetailSchema(DAGSchema):
orientation = fields.String()
concurrency = fields.Method("get_concurrency") # TODO: Remove in Airflow 3.0
max_active_tasks = fields.Integer()
dataset_expression = fields.Method("get_dataset_expression", dump_only=True)
dataset_expression = fields.String(allow_none=True)
start_date = fields.DateTime()
dag_run_timeout = fields.Nested(TimeDeltaSchema, attribute="dagrun_timeout")
doc_md = fields.String()
Expand Down Expand Up @@ -149,11 +149,6 @@ def get_params(obj: DAG):
params = obj.params
return {k: v.dump() for k, v in params.items()}

@staticmethod
def get_dataset_expression(obj: DagModel):
"""Get the dataset expression."""
return obj.get_dataset_expression(obj=obj)


class DAGCollection(NamedTuple):
"""List of DAGs with metadata."""
Expand Down
26 changes: 16 additions & 10 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import copy
import functools
import itertools
import json
import logging
import os
import pathlib
Expand Down Expand Up @@ -1279,6 +1280,16 @@ def max_active_tasks(self, value: int):
def access_control(self):
return self._access_control

@property
def get_dataset_expression(self) -> Any | None:
"""Serialize the dataset_triggers structure from DAG into a JSON-compatible format."""
from airflow.serialization.serialized_objects import BaseSerialization # avoid circular import

dataset_triggers = self.dataset_triggers
if dataset_triggers:
return BaseSerialization.serialize(dataset_triggers)
return None

@access_control.setter
def access_control(self, value):
self._access_control = DAG._upgrade_outdated_dag_access_control(value)
Expand Down Expand Up @@ -3115,6 +3126,9 @@ def bulk_write_to_db(
)
orm_dag.schedule_interval = dag.schedule_interval
orm_dag.timetable_description = dag.timetable.description
orm_dag.dataset_expression = (
json.dumps(dag.get_dataset_expression) if dag.get_dataset_expression else None
)
orm_dag.processor_subdir = processor_subdir

last_automated_run: DagRun | None = latest_runs.get(dag.dag_id)
Expand Down Expand Up @@ -3565,6 +3579,8 @@ class DagModel(Base):
schedule_interval = Column(Interval)
# Timetable/Schedule Interval description
timetable_description = Column(String(1000), nullable=True)
# Dataset expression based on dataset triggers
dataset_expression = Column(Text, nullable=True)
# Tags for view filter
tags = relationship("DagTag", cascade="all, delete, delete-orphan", backref=backref("dag"))
# Dag owner links for DAGs view
Expand Down Expand Up @@ -3901,16 +3917,6 @@ def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> dict[st
return None
return get_dataset_triggered_next_run_info([self.dag_id], session=session)[self.dag_id]

@staticmethod
def get_dataset_expression(obj: DAG) -> Any | None:
"""Deserialize the dataset_triggers structure from DAG into a dataset_expression."""
from airflow.serialization.serialized_objects import BaseSerialization # avoid circular import

dataset_triggers = obj.dataset_triggers
if dataset_triggers:
return BaseSerialization.serialize(dataset_triggers)
return None


# NOTE: Please keep the list of arguments in sync with DAG.__init__.
# Only exception: dag_id here should have a default value, but not in DAG.
Expand Down

0 comments on commit 28cddb0

Please sign in to comment.