Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dataset event timestamp to dataset dag run queue #26376

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 48 additions & 24 deletions airflow/datasets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from datetime import datetime

from airflow.models.taskinstance import TaskInstance


Expand All @@ -53,22 +55,30 @@ def register_dataset_change(
if not dataset_model:
self.log.warning("DatasetModel %s not found", dataset_model)
return
session.add(
DatasetEvent(
dataset_id=dataset_model.id,
source_task_id=task_instance.task_id,
source_dag_id=task_instance.dag_id,
source_run_id=task_instance.run_id,
source_map_index=task_instance.map_index,
extra=extra,
)
dataset_event = DatasetEvent(
dataset_id=dataset_model.id,
source_task_id=task_instance.task_id,
source_dag_id=task_instance.dag_id,
source_run_id=task_instance.run_id,
source_map_index=task_instance.map_index,
extra=extra,
)
session.add(dataset_event)
session.flush()
if dataset_model.consuming_dags:
self._queue_dagruns(dataset_model, session)

downstream_dag_ids = [x.dag_id for x in dataset_model.consuming_dags]
if downstream_dag_ids:
self._queue_dagruns(
dag_ids=downstream_dag_ids,
dataset_id=dataset_model.id,
event_timestamp=dataset_event.timestamp,
session=session,
)
session.flush()

def _queue_dagruns(self, dataset: DatasetModel, session: Session) -> None:
def _queue_dagruns(
self, *, dag_ids: list[str], dataset_id: int, event_timestamp: datetime, session: Session
) -> None:
# Possible race condition: if multiple dags or multiple (usually
# mapped) tasks update the same dataset, this can fail with a unique
# constraint violation.
Expand All @@ -79,31 +89,45 @@ def _queue_dagruns(self, dataset: DatasetModel, session: Session) -> None:
# where `ti.state` is changed.

if session.bind.dialect.name == "postgresql":
return self._postgres_queue_dagruns(dataset, session)
return self._slow_path_queue_dagruns(dataset, session)
return self._postgres_queue_dagruns(
dag_ids=dag_ids,
dataset_id=dataset_id,
event_timestamp=event_timestamp,
session=session,
)
return self._slow_path_queue_dagruns(
dag_ids=dag_ids,
dataset_id=dataset_id,
event_timestamp=event_timestamp,
session=session,
)

def _slow_path_queue_dagruns(self, dataset: DatasetModel, session: Session) -> None:
consuming_dag_ids = [x.dag_id for x in dataset.consuming_dags]
self.log.debug("consuming dag ids %s", consuming_dag_ids)
def _slow_path_queue_dagruns(self, *, dag_ids, dataset_id, event_timestamp, session: Session) -> None:
self.log.debug("consuming dag ids %s", dag_ids)

# Don't error whole transaction when a single RunQueue item conflicts.
# https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#using-savepoint
for dag_id in consuming_dag_ids:
item = DatasetDagRunQueue(target_dag_id=dag_id, dataset_id=dataset.id)
for dag_id in dag_ids:
item = DatasetDagRunQueue(
target_dag_id=dag_id,
dataset_id=dataset_id,
event_timestamp=event_timestamp,
)
try:
with session.begin_nested():
session.merge(item)
except exc.IntegrityError:
self.log.debug("Skipping record %s", item, exc_info=True)

def _postgres_queue_dagruns(self, dataset: DatasetModel, session: Session) -> None:
def _postgres_queue_dagruns(self, *, dag_ids, dataset_id, event_timestamp, session: Session) -> None:
from sqlalchemy.dialects.postgresql import insert

stmt = insert(DatasetDagRunQueue).values(dataset_id=dataset.id).on_conflict_do_nothing()
session.execute(
stmt,
[{'target_dag_id': target_dag.dag_id} for target_dag in dataset.consuming_dags],
stmt = (
insert(DatasetDagRunQueue)
.values(dataset_id=dataset_id, event_timestamp=event_timestamp)
.on_conflict_do_nothing()
)
session.execute(stmt, [{'target_dag_id': x} for x in dag_ids])


def resolve_dataset_manager() -> DatasetManager:
Expand Down
30 changes: 16 additions & 14 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1087,15 +1087,14 @@ def _create_dag_runs_dataset_triggered(
"""For DAGs that are triggered by datasets, create dag runs."""
# Bulk Fetch DagRuns with dag_id and execution_date same
# as DagModel.dag_id and DagModel.next_dagrun
# This list is used to verify if the DagRun already exist so that we don't attempt to create
# duplicate dag runs
exec_dates = {
dag_id: timezone.coerce_datetime(last_time)
for dag_id, (_, last_time) in dataset_triggered_dag_info.items()
# This list is used to verify if the DagRun already exists
# so that we don't attempt to create duplicate dag runs
dag_event_timestamps = {
dag_id: last_event_time for dag_id, (_, last_event_time) in dataset_triggered_dag_info.items()
}
existing_dagruns: set[tuple[str, timezone.DateTime]] = set(
existing_dagruns: set[tuple[str, datetime]] = set(
session.query(DagRun.dag_id, DagRun.execution_date).filter(
tuple_in_condition((DagRun.dag_id, DagRun.execution_date), exec_dates.items())
tuple_in_condition((DagRun.dag_id, DagRun.execution_date), dag_event_timestamps.items())
)
)

Expand All @@ -1122,22 +1121,22 @@ def _create_dag_runs_dataset_triggered(
# we need to set dag.next_dagrun_info if the Dag Run already exists or if we
# create a new one. This is so that in the next Scheduling loop we try to create new runs
# instead of falling in a loop of Integrity Error.
exec_date = exec_dates[dag.dag_id]
if (dag.dag_id, exec_date) not in existing_dagruns:
last_event_timestamp = dag_event_timestamps[dag.dag_id]
if (dag.dag_id, last_event_timestamp) not in existing_dagruns:

previous_dag_run = (
session.query(DagRun)
.filter(
DagRun.dag_id == dag.dag_id,
DagRun.execution_date < exec_date,
DagRun.execution_date < last_event_timestamp,
DagRun.run_type == DagRunType.DATASET_TRIGGERED,
)
.order_by(DagRun.execution_date.desc())
.first()
)
dataset_event_filters = [
DagScheduleDatasetReference.dag_id == dag.dag_id,
DatasetEvent.timestamp <= exec_date,
DatasetEvent.timestamp <= last_event_timestamp,
]
if previous_dag_run:
dataset_event_filters.append(DatasetEvent.timestamp > previous_dag_run.execution_date)
Expand All @@ -1153,10 +1152,13 @@ def _create_dag_runs_dataset_triggered(
.all()
)

data_interval = dag.timetable.data_interval_for_events(exec_date, dataset_events)
data_interval = dag.timetable.data_interval_for_events(
last_event_timestamp, # type: ignore
dataset_events,
)
run_id = dag.timetable.generate_run_id(
run_type=DagRunType.DATASET_TRIGGERED,
logical_date=exec_date,
logical_date=last_event_timestamp, # type: ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm what is wrong with the type of this?

Copy link
Contributor Author

@dstandish dstandish Sep 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it thinks it wants DateTime
but, what we actually get when we read timestamp values from the database is datetime
i think we've encountered this recently, and it was a bit of a rabbit hole trying to fix it... but fundamentally i think that those timetable functions that say DateTime are more often / more likely / maybe always getting datetime

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully SQLAlchemy will improve this in 2.0. (IIRC they have plan to provide better typing support for ORM models.)

data_interval=data_interval,
session=session,
events=dataset_events,
Expand All @@ -1165,7 +1167,7 @@ def _create_dag_runs_dataset_triggered(
dag_run = dag.create_dagrun(
run_id=run_id,
run_type=DagRunType.DATASET_TRIGGERED,
execution_date=exec_date,
execution_date=last_event_timestamp,
data_interval=data_interval,
state=DagRunState.QUEUED,
external_trigger=False,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add event timestamp to DDRQ

Revision ID: 2b72b0fd20ef
Revises: ee8d93fcc81e
Create Date: 2022-09-21 21:28:23.896961

"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

from airflow.migrations.db_types import TIMESTAMP

# revision identifiers, used by Alembic.
revision = '2b72b0fd20ef'
down_revision = 'ecb43d2a1842'
branch_labels = None
depends_on = None
airflow_version = '2.4.2'


def upgrade():
"""Apply Add event timestamp to DDRQ"""
op.add_column('dataset_dag_run_queue', sa.Column('event_timestamp', TIMESTAMP, nullable=False))


def downgrade():
"""Unapply Add event timestamp to DDRQ"""
with op.batch_alter_table('dataset_dag_run_queue', schema=None) as batch_op:
batch_op.drop_column('event_timestamp')
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

# revision identifiers, used by Alembic.
revision = 'ee8d93fcc81e'
down_revision = 'ecb43d2a1842'
down_revision = '2b72b0fd20ef'
branch_labels = None
depends_on = None
airflow_version = '2.5.0'
Expand Down
1 change: 1 addition & 0 deletions airflow/models/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ class DatasetDagRunQueue(Base):

dataset_id = Column(Integer, primary_key=True, nullable=False)
target_dag_id = Column(StringID(), primary_key=True, nullable=False)
event_timestamp = Column(UtcDateTime, nullable=False)
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)

__tablename__ = "dataset_dag_run_queue"
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5b101dceaef5d9343cddbfab0db6087b646d031eb8a1c3157f79e6cd811e14f4
25a09e524c0b06c3762da8f08e4850822f9ed40d5d400a634963678b365f3642
Loading