Skip to content

Commit

Permalink
Merge pull request #83 from epoch8/fix-duplicate-help
Browse files Browse the repository at this point in the history
Fix for duplicated help entries
  • Loading branch information
elephantum committed Jun 9, 2021
2 parents 3c628f3 + 08e4515 commit a878afa
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 74 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ tests/data
dist/
build/
*.egg-info
.venv/
.mypy_cache/
pythonenv*/
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
# Changelog
All notable changes to this project will be documented in this file.

## 1.4.2

- Fix for duplicated #HELP entries

## 1.4.0 + 1.4.1

- Add support for Airflow 2.0 [#90](https://github.com/epoch8/airflow-exporter/pull/90) by @dimon222

## 1.3.2

- Remove 'hostname' from airflow_task_status by @cansjt see https://github.com/epoch8/airflow-exporter/issues/77 for details
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ Labels:
* `task_id`
* `owner`
* `status`
* `hostname`

Value: number of tasks in specific status.

Expand Down
238 changes: 166 additions & 72 deletions airflow_exporter/prometheus_exporter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
import typing
from typing import List, Tuple, Optional, Generator, NamedTuple, Dict

from dataclasses import dataclass
from contextlib import contextmanager
import itertools

from sqlalchemy import func
from sqlalchemy import text

Expand All @@ -12,41 +19,85 @@

# Importing base classes that we need to derive
from prometheus_client import generate_latest, REGISTRY
from prometheus_client.core import GaugeMetricFamily
from prometheus_client.core import GaugeMetricFamily, Metric
from prometheus_client.samples import Sample

import itertools

@dataclass
class DagStatusInfo:
dag_id: str
status: str
cnt: int
owner: str

def get_dag_state_info():
def get_dag_status_info() -> List[DagStatusInfo]:
'''get dag info
:return dag_info
'''
dag_status_query = Session.query(
DagRun.dag_id, DagRun.state, func.count(DagRun.state).label('count')
dag_status_query = Session.query( # pylint: disable=no-member
DagRun.dag_id, DagRun.state, func.count(DagRun.state).label('cnt')
).group_by(DagRun.dag_id, DagRun.state).subquery()

return Session.query(
dag_status_query.c.dag_id, dag_status_query.c.state, dag_status_query.c.count,
sql_res = Session.query( # pylint: disable=no-member
dag_status_query.c.dag_id, dag_status_query.c.state, dag_status_query.c.cnt,
DagModel.owners
).join(DagModel, DagModel.dag_id == dag_status_query.c.dag_id).all()

res = [
DagStatusInfo(
dag_id = i.dag_id,
status = i.state,
cnt = i.cnt,
owner = i.owners
)
for i in sql_res
]

return res

def get_task_state_info():

@dataclass
class TaskStatusInfo:
dag_id: str
task_id: str
status: str
cnt: int
owner: str

def get_task_status_info() -> List[TaskStatusInfo]:
'''get task info
:return task_info
'''
task_status_query = Session.query(
task_status_query = Session.query( # pylint: disable=no-member
TaskInstance.dag_id, TaskInstance.task_id,
TaskInstance.state, func.count(TaskInstance.dag_id).label('value')
TaskInstance.state, func.count(TaskInstance.dag_id).label('cnt')
).group_by(TaskInstance.dag_id, TaskInstance.task_id, TaskInstance.state).subquery()

return Session.query(
sql_res = Session.query( # pylint: disable=no-member
task_status_query.c.dag_id, task_status_query.c.task_id,
task_status_query.c.state, task_status_query.c.value, DagModel.owners
task_status_query.c.state, task_status_query.c.cnt, DagModel.owners
).join(DagModel, DagModel.dag_id == task_status_query.c.dag_id).order_by(task_status_query.c.dag_id).all()


def get_dag_duration_info():
res = [
TaskStatusInfo(
dag_id = i.dag_id,
task_id = i.task_id,
status = i.state or 'none',
cnt = i.cnt,
owner = i.owners
)
for i in sql_res
]

return res

@dataclass
class DagDurationInfo:
dag_id: str
duration: float

def get_dag_duration_info() -> List[DagDurationInfo]:
'''get duration of currently running DagRuns
:return dag_info
'''
Expand All @@ -59,7 +110,7 @@ def get_dag_duration_info():
}
duration = durations.get(driver, durations['default'])

return Session.query(
sql_res = Session.query( # pylint: disable=no-member
DagRun.dag_id,
func.max(duration).label('duration')
).group_by(
Expand All @@ -68,21 +119,41 @@ def get_dag_duration_info():
DagRun.state == State.RUNNING
).all()

res = []

def get_dag_labels(dag_id):
for i in sql_res:
if driver == 'mysqldb' or driver == 'pysqlite':
dag_duration = i.duration
else:
dag_duration = i.duration.seconds

res.append(DagDurationInfo(
dag_id = i.dag_id,
duration = dag_duration
))

return res


def get_dag_labels(dag_id: str) -> Dict[str, str]:
# reuse airflow webserver dagbag
dag = current_app.dag_bag.get_dag(dag_id)

if dag is None:
return [], []
return dict()

labels = dag.params.get('labels')
labels = dag.params.get('labels', {})
labels = labels.get('__var', {})

if labels is None:
return [], []
return labels

labels = {k:v for k,v in labels.items() if not k.startswith('__')}
return list(labels.keys()), list(labels.values())

def _add_gauge_metric(metric, labels, value):
metric.samples.append(Sample(
metric.name, labels,
value,
None
))


class MetricsCollector(object):
Expand All @@ -91,54 +162,80 @@ class MetricsCollector(object):
def describe(self):
return []

def collect(self):
def collect(self) -> Generator[Metric, None, None]:
'''collect metrics'''

# Task metrics
# Each *MetricFamily generates two lines of comments in /metrics, try to minimize noise
# by creating new group for each dag
task_info = get_task_state_info()
for dag_id, tasks in itertools.groupby(task_info, lambda x: x.dag_id):
k, v = get_dag_labels(dag_id)

t_state = GaugeMetricFamily(
'airflow_task_status',
'Shows the number of task starts with this status',
labels=['dag_id', 'task_id', 'owner', 'status'] + k
)
for task in tasks:
t_state.add_metric([task.dag_id, task.task_id, task.owners, task.state or 'none'] + v, task.value)
# Dag Metrics and collect all labels
dag_info = get_dag_status_info()

yield t_state
dag_status_metric = GaugeMetricFamily(
'airflow_dag_status',
'Shows the number of dag starts with this status',
labels=['dag_id', 'owner', 'status']
)

# Dag Metrics
dag_info = get_dag_state_info()
for dag in dag_info:
k, v = get_dag_labels(dag.dag_id)

d_state = GaugeMetricFamily(
'airflow_dag_status',
'Shows the number of dag starts with this status',
labels=['dag_id', 'owner', 'status'] + k
labels = get_dag_labels(dag.dag_id)

_add_gauge_metric(
dag_status_metric,
{
'dag_id': dag.dag_id,
'owner': dag.owner,
'status': dag.status,
**labels
},
dag.cnt,
)
d_state.add_metric([dag.dag_id, dag.owners, dag.state] + v, dag.count)
yield d_state

yield dag_status_metric

# DagRun metrics
driver = Session.bind.driver # pylint: disable=no-member
for dag in get_dag_duration_info():
k, v = get_dag_labels(dag.dag_id)

dag_duration = GaugeMetricFamily(
'airflow_dag_run_duration',
'Maximum duration of currently running dag_runs for each DAG in seconds',
labels=['dag_id'] + k
dag_duration_metric = GaugeMetricFamily(
'airflow_dag_run_duration',
'Maximum duration of currently running dag_runs for each DAG in seconds',
labels=['dag_id']
)
for dag_duration in get_dag_duration_info():
labels = get_dag_labels(dag_duration.dag_id)

_add_gauge_metric(
dag_duration_metric,
{
'dag_id': dag_duration.dag_id,
**labels
},
dag_duration.duration
)
if driver == 'mysqldb' or driver == 'pysqlite':
dag_duration.add_metric([dag.dag_id] + v, dag.duration)
else:
dag_duration.add_metric([dag.dag_id] + v, dag.duration.seconds)
yield dag_duration

yield dag_duration_metric

# Task metrics
# Each *MetricFamily generates two lines of comments in /metrics, try to minimize noise
# by creating new group for each dag
task_status_metric = GaugeMetricFamily(
'airflow_task_status',
'Shows the number of task starts with this status',
labels=['dag_id', 'task_id', 'owner', 'status']
)

for dag_id, tasks in itertools.groupby(get_task_status_info(), lambda x: x.dag_id):
labels = get_dag_labels(dag_id)

for task in tasks:
_add_gauge_metric(
task_status_metric,
{
'dag_id': task.dag_id,
'task_id': task.task_id,
'owner': task.owner,
'status': task.status,
**labels
},
task.cnt
)

yield task_status_metric


REGISTRY.register(MetricsCollector())
Expand All @@ -157,19 +254,16 @@ def list(self):
"category": "Admin"
}

www_views = []
www_rbac_views = [RBACmetricsView]


class AirflowPrometheusPlugins(AirflowPlugin):
'''plugin for show metrics'''
name = "airflow_prometheus_plugin"
operators = []
hooks = []
executors = []
macros = []
admin_views = www_views
flask_blueprints = []
menu_links = []
appbuilder_views = www_rbac_views
appbuilder_menu_items = []
operators = [] # type: ignore
hooks = [] # type: ignore
executors = [] # type: ignore
macros = [] # type: ignore
admin_views = [] # type: ignore
flask_blueprints = [] # type: ignore
menu_links = [] # type: ignore
appbuilder_views = [RBACmetricsView]
appbuilder_menu_items = [] # type: ignore
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
keywords="airflow plugin prometheus exporter metrics",
url="https://github.com/epoch8/airflow-exporter",
packages=["airflow_exporter"],
setup_requires=['setuptools_scm'],
setup_requires=["setuptools_scm"],
install_requires=[
"apache-airflow>=2.0.0",
"prometheus_client>=0.4.2",
Expand Down

0 comments on commit a878afa

Please sign in to comment.