Skip to content

Commit

Permalink
Add the dataset_expression as part of DagModel and DAGDetailSchema.
Browse files Browse the repository at this point in the history
Add the dataset_expression as part of DAGDetailSchema.

Add dataset_expression in DAGDetailSchema

Use dataset_triggers on the DAG to create dataset_expression on DagModel

Simplify

Fix dataset_expression default

Fix the test and add database migration

Add tests for dataset_expression
  • Loading branch information
sunank200 committed Mar 7, 2024
1 parent d37d18e commit a2e1101
Show file tree
Hide file tree
Showing 9 changed files with 454 additions and 183 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class DAGDetailSchema(DAGSchema):
orientation = fields.String()
concurrency = fields.Method("get_concurrency") # TODO: Remove in Airflow 3.0
max_active_tasks = fields.Integer()
dataset_expression = fields.Dict(allow_none=True)
start_date = fields.DateTime()
dag_run_timeout = fields.Nested(TimeDeltaSchema, attribute="dagrun_timeout")
doc_md = fields.String()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""add dataset_expression in DagModel
Revision ID: ab34f260b71c
Revises: d75389605139
Create Date: 2024-03-07 19:54:38.316059
"""

import sqlalchemy as sa
from alembic import op
import sqlalchemy_jsonfield
from airflow.settings import json


# revision identifiers, used by Alembic.
revision = 'ab34f260b71c'
down_revision = 'd75389605139'
branch_labels = None
depends_on = None
airflow_version = '2.9.0'


def upgrade():
"""Apply Add dataset_expression to DagModel."""
with op.batch_alter_table("dag") as batch_op:
batch_op.add_column(sa.Column('dataset_expression', sqlalchemy_jsonfield.JSONField(json=json), nullable=True))



def downgrade():
"""Unapply Add dataset_expression to DagModel."""
with op.batch_alter_table("dag") as batch_op:
batch_op.drop_column('dataset_expression')
8 changes: 8 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import jinja2
import pendulum
import re2
import sqlalchemy_jsonfield
from dateutil.relativedelta import relativedelta
from sqlalchemy import (
Boolean,
Expand Down Expand Up @@ -109,6 +110,7 @@
)
from airflow.secrets.local_filesystem import LocalFilesystemBackend
from airflow.security import permissions
from airflow.settings import json
from airflow.stats import Stats
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.timetables.datasets import DatasetOrTimeSchedule
Expand Down Expand Up @@ -3051,6 +3053,8 @@ def bulk_write_to_db(
if not dags:
return

from airflow.serialization.serialized_objects import BaseSerialization # Avoid circular import.

log.info("Sync %s DAGs", len(dags))
dag_by_ids = {dag.dag_id: dag for dag in dags}

Expand Down Expand Up @@ -3115,6 +3119,8 @@ def bulk_write_to_db(
)
orm_dag.schedule_interval = dag.schedule_interval
orm_dag.timetable_description = dag.timetable.description
orm_dag.dataset_expression = BaseSerialization.serialize(dag.dataset_triggers) or None

orm_dag.processor_subdir = processor_subdir

last_automated_run: DagRun | None = latest_runs.get(dag.dag_id)
Expand Down Expand Up @@ -3565,6 +3571,8 @@ class DagModel(Base):
schedule_interval = Column(Interval)
# Timetable/Schedule Interval description
timetable_description = Column(String(1000), nullable=True)
# Dataset expression based on dataset triggers
dataset_expression = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True)
# Tags for view filter
tags = relationship("DagTag", cascade="all, delete, delete-orphan", backref=backref("dag"))
# Dag owner links for DAGs view
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
7b5e83ee2a39b641fb1ff91be8582347e19d7f9c2c4aa6fed48097920c89f92f
92ca2ffdcb35820e840e9514daaa5ca090a48deb51ceda10fdff46b303bea208
365 changes: 184 additions & 181 deletions docs/apache-airflow/img/airflow_erd.svg
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=================================+===================+===================+==============================================================+
| ``d75389605139`` (head) | ``1fd565369930`` | ``2.9.0`` | Add run_id to (Audit) log table |
| ``ab34f260b71c`` (head) | ``d75389605139`` | ``2.9.0`` | add dataset_expression in DagModel |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``d75389605139`` | ``1fd565369930`` | ``2.9.0`` | Add run_id to (Audit) log table |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``1fd565369930`` | ``88344c1d9134`` | ``2.9.0`` | Add rendered_map_index to TaskInstance. |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
Expand Down
112 changes: 112 additions & 0 deletions tests/api_connexion/endpoints/test_dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,36 @@ def _create_dag_model_for_details_endpoint(self, dag_id, session=None):
)
session.add(dag_model)

@provide_session
def _create_dag_model_for_details_endpoint_with_dataset_expression(self, dag_id, session=None):
dag_model = DagModel(
dag_id=dag_id,
fileloc="/tmp/dag.py",
schedule_interval="2 2 * * *",
is_active=True,
is_paused=False,
dataset_expression={
"__type": "dataset_any",
"__var": [
{"__type": "dataset", "__var": {"extra": {"hi": "bye"}, "uri": "s3://dag1/output_1.txt"}},
{
"__type": "dataset_all",
"__var": [
{
"__type": "dataset",
"__var": {"extra": {"hi": "bye"}, "uri": "s3://dag2/output_1.txt"},
},
{
"__type": "dataset",
"__var": {"extra": {"hi": "bye"}, "uri": "s3://dag3/output_3.txt"},
},
],
},
],
},
)
session.add(dag_model)

@provide_session
def _create_deactivated_dag(self, session=None):
dag_model = DagModel(
Expand Down Expand Up @@ -310,6 +340,84 @@ def test_should_respond_200(self, url_safe_serializer):
"concurrency": 16,
"dag_id": "test_dag",
"dag_run_timeout": None,
"dataset_expression": None,
"default_view": None,
"description": None,
"doc_md": "details",
"end_date": None,
"fileloc": "/tmp/dag.py",
"file_token": current_file_token,
"has_import_errors": False,
"has_task_concurrency_limits": True,
"is_active": True,
"is_paused": False,
"is_paused_upon_creation": None,
"is_subdag": False,
"last_expired": None,
"last_parsed": last_parsed,
"last_parsed_time": None,
"last_pickled": None,
"max_active_runs": 16,
"max_active_tasks": 16,
"next_dagrun": None,
"next_dagrun_create_after": None,
"next_dagrun_data_interval_end": None,
"next_dagrun_data_interval_start": None,
"orientation": "LR",
"owners": [],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"description": None,
"schema": {},
"value": 1,
}
},
"pickle_id": None,
"render_template_as_native_obj": False,
"root_dag_id": None,
"schedule_interval": {"__type": "CronExpression", "value": "2 2 * * *"},
"scheduler_lock": None,
"start_date": "2020-06-15T00:00:00+00:00",
"tags": [],
"template_searchpath": None,
"timetable_description": None,
"timezone": UTC_JSON_REPR,
}
assert response.json == expected

def test_should_respond_200_with_dataset_expression(self, url_safe_serializer):
self._create_dag_model_for_details_endpoint_with_dataset_expression(self.dag_id)
current_file_token = url_safe_serializer.dumps("/tmp/dag.py")
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/details", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 200
last_parsed = response.json["last_parsed"]
expected = {
"catchup": True,
"concurrency": 16,
"dag_id": "test_dag",
"dag_run_timeout": None,
"dataset_expression": {
"__type": "dataset_any",
"__var": [
{"__type": "dataset", "__var": {"extra": {"hi": "bye"}, "uri": "s3://dag1/output_1.txt"}},
{
"__type": "dataset_all",
"__var": [
{
"__type": "dataset",
"__var": {"extra": {"hi": "bye"}, "uri": "s3://dag2/output_1.txt"},
},
{
"__type": "dataset",
"__var": {"extra": {"hi": "bye"}, "uri": "s3://dag3/output_3.txt"},
},
],
},
],
},
"default_view": None,
"description": None,
"doc_md": "details",
Expand Down Expand Up @@ -368,6 +476,7 @@ def test_should_response_200_with_doc_md_none(self, url_safe_serializer):
"concurrency": 16,
"dag_id": "test_dag2",
"dag_run_timeout": None,
"dataset_expression": None,
"default_view": None,
"description": None,
"doc_md": None,
Expand Down Expand Up @@ -419,6 +528,7 @@ def test_should_response_200_for_null_start_date(self, url_safe_serializer):
"concurrency": 16,
"dag_id": "test_dag3",
"dag_run_timeout": None,
"dataset_expression": None,
"default_view": None,
"description": None,
"doc_md": None,
Expand Down Expand Up @@ -473,6 +583,7 @@ def test_should_respond_200_serialized(self, url_safe_serializer):
"concurrency": 16,
"dag_id": "test_dag",
"dag_run_timeout": None,
"dataset_expression": None,
"default_view": None,
"description": None,
"doc_md": "details",
Expand Down Expand Up @@ -534,6 +645,7 @@ def test_should_respond_200_serialized(self, url_safe_serializer):
"concurrency": 16,
"dag_id": "test_dag",
"dag_run_timeout": None,
"dataset_expression": None,
"default_view": None,
"description": None,
"doc_md": "details",
Expand Down
57 changes: 57 additions & 0 deletions tests/api_connexion/schemas/test_dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
DAGDetailSchema,
DAGSchema,
)
from airflow.datasets import Dataset
from airflow.models import DagModel, DagTag
from airflow.models.dag import DAG

Expand Down Expand Up @@ -197,3 +198,59 @@ def test_serialize_test_dag_detail_schema(url_safe_serializer):
obj = schema.dump(dag)
expected.update({"last_parsed": obj["last_parsed"]})
assert obj == expected


@pytest.mark.db_test
def test_serialize_test_dag_with_dataset_schedule_detail_schema(url_safe_serializer):
dataset1 = Dataset(uri="s3://bucket/obj1")
dataset2 = Dataset(uri="s3://bucket/obj2")
dag = DAG(
dag_id="test_dag",
start_date=datetime(2020, 6, 19),
doc_md="docs",
orientation="LR",
default_view="duration",
params={"foo": 1},
schedule=dataset1 & dataset2,
tags=["example1", "example2"],
)
schema = DAGDetailSchema()

expected = {
"catchup": True,
"concurrency": 16,
"max_active_tasks": 16,
"dag_id": "test_dag",
"dag_run_timeout": None,
"default_view": "duration",
"description": None,
"doc_md": "docs",
"fileloc": __file__,
"file_token": url_safe_serializer.dumps(__file__),
"is_active": None,
"is_paused": None,
"is_subdag": False,
"orientation": "LR",
"owners": [],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"value": 1,
"description": None,
"schema": {},
}
},
"schedule_interval": {"__type": "CronExpression", "value": "Dataset"},
"start_date": "2020-06-19T00:00:00+00:00",
"tags": [{"name": "example1"}, {"name": "example2"}],
"template_searchpath": None,
"timezone": UTC_JSON_REPR,
"max_active_runs": 16,
"pickle_id": None,
"end_date": None,
"is_paused_upon_creation": None,
"render_template_as_native_obj": False,
}
obj = schema.dump(dag)
expected.update({"last_parsed": obj["last_parsed"]})
assert obj == expected
37 changes: 37 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2878,6 +2878,43 @@ def test_dags_needing_dagruns_dataset_triggered_dag_info_queued_times(self, sess
assert first_queued_time == DEFAULT_DATE
assert last_queued_time == DEFAULT_DATE + timedelta(hours=1)

def test_dataset_expression(self, session):
dataset_expr = {
"__type": "dataset_any",
"__var": [
{"__type": "dataset", "__var": {"extra": {"hi": "bye"}, "uri": "s3://dag1/output_1.txt"}},
{
"__type": "dataset_all",
"__var": [
{
"__type": "dataset",
"__var": {"extra": {"hi": "bye"}, "uri": "s3://dag2/output_1.txt"},
},
{
"__type": "dataset",
"__var": {"extra": {"hi": "bye"}, "uri": "s3://dag3/output_3.txt"},
},
],
},
],
}
dag_id = "test_dag_dataset_expression"
orm_dag = DagModel(
dag_id=dag_id,
dataset_expression=dataset_expr,
is_active=True,
is_paused=False,
next_dagrun=timezone.utcnow(),
next_dagrun_create_after=timezone.utcnow() + timedelta(days=1),
)
session.add(orm_dag)
session.commit()
retrieved_dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).one()
assert retrieved_dag.dataset_expression == dataset_expr

session.rollback()
session.close()


class TestQueries:
def setup_method(self) -> None:
Expand Down

0 comments on commit a2e1101

Please sign in to comment.