Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
from sqlalchemy import BigInteger, Column, Index, String, and_
from sqlalchemy.orm import Session, backref, foreign, relationship
from sqlalchemy.sql import exists
from sqlalchemy.sql.expression import func

from airflow.models.base import ID_LEN, Base
from airflow.models.dag import DAG, DagModel
from airflow.models.dagcode import DagCode
from airflow.models.dagrun import DagRun
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.serialization.serialized_objects import DagDependency, SerializedDAG
from airflow.settings import MIN_SERIALIZED_DAG_UPDATE_INTERVAL, json
from airflow.utils import timezone
from airflow.utils.session import provide_session
Expand Down Expand Up @@ -272,6 +273,17 @@ def get_last_updated_datetime(cls, dag_id: str, session: Session = None) -> date
"""
return session.query(cls.last_updated).filter(cls.dag_id == dag_id).scalar()

@classmethod
@provide_session
def get_max_last_updated_datetime(cls, session: Session = None) -> datetime:
"""
Get the maximum date when any DAG was last updated in serialized_dag table

:param session: ORM Session
:type session: Session
"""
return session.query(func.max(cls.last_updated)).scalar()

@classmethod
@provide_session
def get_latest_version_hash(cls, dag_id: str, session: Session = None) -> str:
Expand All @@ -286,3 +298,26 @@ def get_latest_version_hash(cls, dag_id: str, session: Session = None) -> str:
:rtype: str
"""
return session.query(cls.dag_hash).filter(cls.dag_id == dag_id).scalar()

@classmethod
@provide_session
def get_dag_dependencies(cls, session: Session = None) -> Dict[str, List['DagDependency']]:
"""
Get the dependencies between DAGs

:param session: ORM Session
:type session: Session
"""
dependencies = {}

if session.bind.dialect.name in ["sqlite", "mysql"]:
for row in session.query(cls.dag_id, func.json_extract(cls.data, "$.dag.dag_dependencies")).all():
dependencies[row[0]] = [DagDependency(**d) for d in json.loads(row[1])]

else:
for row in session.query(
cls.dag_id, func.json_extract_path(cls.data, "dag", "dag_dependencies")
).all():
dependencies[row[0]] = [DagDependency(**d) for d in row[1]]

return dependencies
1 change: 1 addition & 0 deletions airflow/security/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
RESOURCE_DOCS = "Documentation"
RESOURCE_CONFIG = "Configurations"
RESOURCE_CONNECTION = "Connections"
RESOURCE_DAG_DEPENDENCIES = "DAG Dependencies"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Paging @jhtimmins -- need you permissions hat thoughts here :)

RESOURCE_DAG_CODE = "DAG Code"
RESOURCE_DAG_RUN = "DAG Runs"
RESOURCE_IMPORT_ERROR = "ImportError"
Expand Down
9 changes: 8 additions & 1 deletion airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@
"maxProperties": 1
}
},
"dag_dependencies": {
"type": "array",
"items": {
"type": "object"
}
},
"dag": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -103,7 +109,8 @@
{ "type": "null" },
{ "$ref": "#/definitions/task_group" }
]},
"edge_info": { "$ref": "#/definitions/edge_info" }
"edge_info": { "$ref": "#/definitions/edge_info" },
"dag_dependencies": { "$ref": "#/definitions/dag_dependencies" }
},
"required": [
"_dag_id",
Expand Down
57 changes: 55 additions & 2 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import datetime
import enum
import logging
from dataclasses import dataclass
Comment thread
ashb marked this conversation as resolved.
from inspect import Parameter, signature
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Union

Expand Down Expand Up @@ -293,8 +294,7 @@ def _deserialize(cls, encoded_var: Any) -> Any: # pylint: disable=too-many-retu
elif type_ == DAT.SET:
return {cls._deserialize(v) for v in var}
elif type_ == DAT.TUPLE:
# pylint: disable=consider-using-generator
return tuple([cls._deserialize(v) for v in var])
return tuple(cls._deserialize(v) for v in var)
else:
raise TypeError(f'Invalid type {type_!s} in deserialization.')

Expand Down Expand Up @@ -335,6 +335,30 @@ def _value_is_hardcoded_default(cls, attrname: str, value: Any, instance: Any) -
return False


class DependencyDetector:
Comment thread
ashb marked this conversation as resolved.
"""Detects dependencies between DAGs."""

@staticmethod
def detect_task_dependencies(task: BaseOperator) -> Optional['DagDependency']:
"""Detects dependencies caused by tasks"""
if task.task_type == "TriggerDagRunOperator":
return DagDependency(
source=task.dag_id,
target=getattr(task, "trigger_dag_id"),
dependency_type="trigger",
dependency_id=task.task_id,
)
elif task.task_type == "ExternalTaskSensor":
return DagDependency(
source=getattr(task, "external_dag_id"),
target=task.dag_id,
dependency_type="sensor",
dependency_id=task.task_id,
)

return None


class SerializedBaseOperator(BaseOperator, BaseSerialization):
"""A JSON serializable representation of operator.

Expand All @@ -350,6 +374,8 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
if v.default is not v.empty
}

dependency_detector = DependencyDetector

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# task_type is used by UI to display the correct class type, because UI only
Expand Down Expand Up @@ -494,6 +520,11 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:

return op

@classmethod
def detect_dependencies(cls, op: BaseOperator) -> Optional['DagDependency']:
"""Detects between DAG dependencies for the operator."""
return cls.dependency_detector.detect_task_dependencies(op)

@classmethod
def _is_excluded(cls, var: Any, attrname: str, op: BaseOperator):
if var is not None and op.has_dag() and attrname.endswith("_date"):
Expand Down Expand Up @@ -648,6 +679,11 @@ def serialize_dag(cls, dag: DAG) -> dict:
serialize_dag = cls.serialize_to_json(dag, cls._decorated_fields)

serialize_dag["tasks"] = [cls._serialize(task) for _, task in dag.task_dict.items()]
serialize_dag["dag_dependencies"] = [
vars(t)
for t in (SerializedBaseOperator.detect_dependencies(task) for task in dag.task_dict.values())
if t is not None
]
serialize_dag['_task_group'] = SerializedTaskGroup.serialize_task_group(dag.task_group)

# Edge info in the JSON exactly matches our internal structure
Expand Down Expand Up @@ -812,3 +848,20 @@ def deserialize_task_group(
group.upstream_task_ids = set(cls._deserialize(encoded_group["upstream_task_ids"]))
group.downstream_task_ids = set(cls._deserialize(encoded_group["downstream_task_ids"]))
return group


@dataclass
class DagDependency:
"""Dataclass for representing dependencies between DAGs.
These are calculated during serialization and attached to serialized DAGs.
"""

source: str
target: str
dependency_type: str
dependency_id: str

@property
def node_id(self):
"""Node ID for graph rendering"""
return f"{self.dependency_type}:{self.source}:{self.target}:{self.dependency_id}"
5 changes: 5 additions & 0 deletions airflow/www/extensions/init_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ def init_appbuilder_views(app):
appbuilder.add_view(
views.XComModelView, permissions.RESOURCE_XCOM, category=permissions.RESOURCE_ADMIN_MENU
)
appbuilder.add_view(
views.DagDependenciesView,
permissions.RESOURCE_DAG_DEPENDENCIES,
category=permissions.RESOURCE_BROWSE_MENU,
)
# add_view_no_menu to change item position.
# I added link in extensions.init_appbuilder_links.init_appbuilder_links
appbuilder.add_view_no_menu(views.RedocView)
Expand Down
1 change: 1 addition & 0 deletions airflow/www/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin): # pylint: disable=
VIEWER_PERMISSIONS = [
(permissions.ACTION_CAN_READ, permissions.RESOURCE_AUDIT_LOG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_DEPENDENCIES),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_CODE),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR),
Expand Down
27 changes: 27 additions & 0 deletions airflow/www/static/css/graph.css
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,30 @@ g.node.removed rect {
background-color: #f0f0f0;
cursor: move;
}

.legend-item.dag {
float: left;
background-color: #e8f7e4;
}

.legend-item.trigger {
float: left;
background-color: #ffefeb;
}

.legend-item.sensor {
float: left;
background-color: #e6f1f2;
}

g.node.dag rect {
fill: #e8f7e4;
}

g.node.trigger rect {
fill: #ffefeb;
}

g.node.sensor rect {
fill: #e6f1f2;
}
Loading