Skip to content

Commit

Permalink
Add the dataset_expression as part of DagModel and DAGDetailSchema (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 authored and howardyoo committed Mar 18, 2024
1 parent b7ffec4 commit b39e4d9
Show file tree
Hide file tree
Showing 8 changed files with 435 additions and 180 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class DAGDetailSchema(DAGSchema):
orientation = fields.String()
concurrency = fields.Method("get_concurrency") # TODO: Remove in Airflow 3.0
max_active_tasks = fields.Integer()
dataset_expression = fields.Dict(allow_none=True)
start_date = fields.DateTime()
dag_run_timeout = fields.Nested(TimeDeltaSchema, attribute="dagrun_timeout")
doc_md = fields.String()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""add dataset_expression in DagModel
Revision ID: ab34f260b71c
Revises: d75389605139
Create Date: 2024-03-07 19:54:38.316059
"""

import sqlalchemy as sa
from alembic import op
import sqlalchemy_jsonfield
from airflow.settings import json


# revision identifiers, used by Alembic.
revision = 'ab34f260b71c'
down_revision = 'd75389605139'
branch_labels = None
depends_on = None
airflow_version = '2.9.0'


def upgrade():
"""Apply Add dataset_expression to DagModel."""
with op.batch_alter_table("dag") as batch_op:
batch_op.add_column(sa.Column('dataset_expression', sqlalchemy_jsonfield.JSONField(json=json), nullable=True))



def downgrade():
"""Unapply Add dataset_expression to DagModel."""
with op.batch_alter_table("dag") as batch_op:
batch_op.drop_column('dataset_expression')
20 changes: 20 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import jinja2
import pendulum
import re2
import sqlalchemy_jsonfield
from dateutil.relativedelta import relativedelta
from sqlalchemy import (
Boolean,
Expand Down Expand Up @@ -109,6 +110,7 @@
)
from airflow.secrets.local_filesystem import LocalFilesystemBackend
from airflow.security import permissions
from airflow.settings import json
from airflow.stats import Stats
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.timetables.datasets import DatasetOrTimeSchedule
Expand Down Expand Up @@ -3032,6 +3034,16 @@ def bulk_sync_to_db(
)
return cls.bulk_write_to_db(dags=dags, session=session)

def simplify_dataset_expression(self, dataset_expression) -> dict | None:
"""Simplifies a nested dataset expression into a 'any' or 'all' format with URIs."""
if dataset_expression is None:
return None
if dataset_expression.get("__type") == "dataset":
return dataset_expression["__var"]["uri"]

new_key = "any" if dataset_expression["__type"] == "dataset_any" else "all"
return {new_key: [self.simplify_dataset_expression(item) for item in dataset_expression["__var"]]}

@classmethod
@provide_session
def bulk_write_to_db(
Expand All @@ -3051,6 +3063,8 @@ def bulk_write_to_db(
if not dags:
return

from airflow.serialization.serialized_objects import BaseSerialization # Avoid circular import.

log.info("Sync %s DAGs", len(dags))
dag_by_ids = {dag.dag_id: dag for dag in dags}

Expand Down Expand Up @@ -3115,6 +3129,10 @@ def bulk_write_to_db(
)
orm_dag.schedule_interval = dag.schedule_interval
orm_dag.timetable_description = dag.timetable.description
orm_dag.dataset_expression = dag.simplify_dataset_expression(
BaseSerialization.serialize(dag.dataset_triggers)
)

orm_dag.processor_subdir = processor_subdir

last_automated_run: DagRun | None = latest_runs.get(dag.dag_id)
Expand Down Expand Up @@ -3565,6 +3583,8 @@ class DagModel(Base):
schedule_interval = Column(Interval)
# Timetable/Schedule Interval description
timetable_description = Column(String(1000), nullable=True)
# Dataset expression based on dataset triggers
dataset_expression = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True)
# Tags for view filter
tags = relationship("DagTag", cascade="all, delete, delete-orphan", backref=backref("dag"))
# Dag owner links for DAGs view
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0c48aaf142c2032ba0dd01d3a85d542b7242dc4fb48a8172c947dd28ba62480a
0c48aaf142c2032ba0dd01d3a85d542b7242dc4fb48a8172c947dd28ba62480a

0 comments on commit b39e4d9

Please sign in to comment.