Skip to content

Commit

Permalink
Add DagRunNote
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish committed Nov 23, 2022
1 parent 6f9a21b commit 078d37e
Show file tree
Hide file tree
Showing 8 changed files with 1,384 additions and 1,421 deletions.
9 changes: 8 additions & 1 deletion airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,13 @@ def set_dag_run_notes(*, dag_id: str, dag_run_id: str, session: Session = NEW_SE
except ValidationError as err:
raise BadRequest(detail=str(err))

dag_run.notes = new_value_for_notes or None
from flask_login import current_user

current_user_id = getattr(current_user, "id", None)
if dag_run.dag_run_note is None:
dag_run.notes = (new_value_for_notes, current_user_id)
else:
dag_run.dag_run_note.content = new_value_for_notes
dag_run.dag_run_note.user_id = current_user_id
session.commit()
return dagrun_schema.dump(dag_run)

This file was deleted.

This file was deleted.

55 changes: 54 additions & 1 deletion airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
Boolean,
Column,
ForeignKey,
ForeignKeyConstraint,
Index,
Integer,
PickleType,
Expand All @@ -40,6 +41,7 @@
text,
)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import joinedload, relationship, synonym
from sqlalchemy.orm.session import Session
Expand All @@ -66,6 +68,7 @@
from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, skip_locked, tuple_in_condition, with_row_locks
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.types import NOTSET, ArgNotSet, DagRunType
from airflow.www.fab_security.sqla.models import User

if TYPE_CHECKING:
from airflow.models.dag import DAG
Expand All @@ -85,6 +88,17 @@ class TISchedulingDecision(NamedTuple):
finished_tis: list[TI]


def _creator_note(val):
"""Custom creator for the ``note`` association proxy."""
# breakpoint()
if isinstance(val, str):
return DagRunNote(content=val)
elif isinstance(val, dict):
return DagRunNote(**val)
else:
return DagRunNote(*val)


class DagRun(Base, LoggingMixin):
"""
DagRun describes an instance of a Dag. It can be created
Expand All @@ -111,7 +125,6 @@ class DagRun(Base, LoggingMixin):
# When a scheduler last attempted to schedule TIs for this DagRun
last_scheduling_decision = Column(UtcDateTime)
dag_hash = Column(String(32))
notes = Column(String(1000).with_variant(Text(1000), "mysql"))
# Foreign key to LogTemplate. DagRun rows created prior to this column's
# existence have this set to NULL. Later rows automatically populate this on
# insert to point to the latest LogTemplate entry.
Expand Down Expand Up @@ -163,6 +176,8 @@ class DagRun(Base, LoggingMixin):
uselist=False,
viewonly=True,
)
dag_run_note = relationship("DagRunNote", back_populates="dag_run", uselist=False)
notes = association_proxy("dag_run_note", "content", creator=_creator_note)

DEFAULT_DAGRUNS_TO_EXAMINE = airflow_conf.getint(
"scheduler",
Expand Down Expand Up @@ -1295,3 +1310,41 @@ def get_log_filename_template(self, *, session: Session = NEW_SESSION) -> str:
stacklevel=2,
)
return self.get_log_template(session=session).filename


class DagRunNote(Base):
"""For storage of arbitrary notes concerning the dagrun instance."""

__tablename__ = "dag_run_note"

user_id = Column(Integer, nullable=True)
dag_run_id = Column(Integer, primary_key=True, nullable=False)
content = Column(String(1000).with_variant(Text(1000), "mysql"))
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)

dag_run = relationship("DagRun", back_populates="dag_run_note")

__table_args__ = (
ForeignKeyConstraint(
(dag_run_id,),
["dag_run.id"],
name="dag_run_note_dr_fkey",
ondelete="CASCADE",
),
ForeignKeyConstraint(
(user_id,),
[User.id],
name="dag_run_note_user_fkey",
),
)

def __init__(self, content, user_id=None):
self.content = content
self.user_id = user_id

def __repr__(self):
prefix = f"<{self.__class__.__name__}: {self.dag_id}.{self.dagrun_id} {self.run_id}"
if self.map_index != -1:
prefix += f" map_index={self.map_index}"
return prefix + ">"
4 changes: 1 addition & 3 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2691,20 +2691,18 @@ class TaskNote(Base):

__tablename__ = "task_note"

# id = Column(Integer, primary_key=True)
user_id = Column(Integer, nullable=True)
task_id = Column(StringID(), primary_key=True, nullable=False)
dag_id = Column(StringID(), primary_key=True, nullable=False)
run_id = Column(StringID(), primary_key=True, nullable=False)
map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1"))
content = Column(String(1000).with_variant(Text(1000), "mysql"))
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)

task_instance = relationship("TaskInstance", back_populates="task_note")

__table_args__ = (
Index("idx_task_notes_task_instance", dag_id, task_id, run_id, map_index),
ForeignKeyConstraint(
[dag_id, task_id, run_id, map_index],
[
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
9637f13dd2d27a13e1afe467eea0aa2d16c7c89e7f94134ecd454fc9bd4cbad0
e4edc339d44491c4c3a47502dd4bedce872b3c10e0bea1c71713408184501432
Loading

0 comments on commit 078d37e

Please sign in to comment.