Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion airflow-core/tests/unit/always/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ def example_not_excluded_dags(xfail_db_exception: bool = False):
example_dirs = [
"airflow-core/**/example_dags/example_*.py",
"tests/system/**/example_*.py",
"providers/**/example_*.py",
"providers/**/tests/system/**/example_*.py",
"providers/**/example_dags/example_*.py",
]

default_branch = os.environ.get("DEFAULT_BRANCH", "main")
Expand Down
1 change: 1 addition & 0 deletions airflow-core/tests/unit/always/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def test_providers_modules_should_have_tests(self):
"providers/common/compat/tests/unit/common/compat/standard/test_utils.py",
"providers/common/messaging/tests/unit/common/messaging/providers/test_base_provider.py",
"providers/common/messaging/tests/unit/common/messaging/providers/test_sqs.py",
"providers/edge3/tests/unit/edge3/cli/test_example_extended_sysinfo.py",
"providers/edge3/tests/unit/edge3/models/test_edge_job.py",
"providers/edge3/tests/unit/edge3/models/test_edge_logs.py",
"providers/edge3/tests/unit/edge3/models/test_edge_worker.py",
Expand Down
42 changes: 42 additions & 0 deletions providers/edge3/docs/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,45 @@ instance. The commands are:

Workers are identified by hostname. See the :doc:`cli-ref` for the full list of
arguments.

Worker Monitoring
-----------------

The workers send a regular heartbeat to the central site with their status and the status of the tasks running on them.
This information is stored in the database and can be used to monitor the workers and their tasks as well as is displayed
in the web UI.

Basic information that is provided by the workers includes:

- Time of being first online
- Time of last heartbeat
- Airflow version
- Edge provider version
- Python version
- Worker start time
- Concurrency (Jobs that the worker can run in parallel)
- Free concurrency (Jobs that the worker can run in parallel but are currently free)

In order to extend the basic information provided by the worker, you can implement a custom function and register it via the
``[edge]`` section property ``extended_system_info_function`` setting in your Airflow configuration.
The function needs to be implemented in Python asyncio and returns a dictionary with string keys and values that are JSON serializable.
The returned information will be merged with the basic system information and transported to the central site where it is stored
in the database and extend the worker's system information.

All numeric values (int and float) that are returned by the function will be populated to the telemetry subsystem (StatsD or OTel)
in the form ``edge_worker.{key}``. Note this requires to add the respective keys to the list of monitored metrics in your telemetry
configuration. This allows you to create custom metrics based on the information returned by the function.

In the returned dictionary there are two special keys that are used to display the health:

- ``status``: This is a numeric value that is used to determine the health status of the worker. It is expected to be one of the
logging levels defined in the logging module (e.g. logging.INFO, logging.WARNING, logging.ERROR). Based on this value the health
status of the worker is determined and displayed in the web UI.
- ``status_text``: This is a string value that is used to display additional information about the health status of the worker in
the web UI. It can be used to provide more context about the health status of the worker and overrides the default status text.

See https://github.com/apache/airflow/blob/main/providers/edge3/src/airflow/providers/edge3/cli/example_extended_sysinfo.py
for an example implementation of such a function.

.. image:: img/edge_sysinfo.png
:alt: Example of the extended system information provided by the worker on the UI plugin
Binary file added providers/edge3/docs/img/edge_sysinfo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 4 additions & 1 deletion providers/edge3/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-----------------+----------------------------------------------------------+
| Revision ID | Revises ID | Edge3 Version | Description |
+=========================+==================+=================+==========================================================+
| ``a09c3ee8e1d3`` (head) | ``8c275b6fbaa8`` | ``3.4.0`` | Add team_name column to edge_job and edge_worker tables. |
| ``c6b3c3d093fd`` (head) | ``a09c3ee8e1d3`` | ``3.5.0`` | Replace individual counters with extended JSON based |
| | | | sysinfo. |
+-------------------------+------------------+-----------------+----------------------------------------------------------+
| ``a09c3ee8e1d3`` | ``8c275b6fbaa8`` | ``3.4.0`` | Add team_name column to edge_job and edge_worker tables. |
+-------------------------+------------------+-----------------+----------------------------------------------------------+
| ``8c275b6fbaa8`` | ``b3c4d5e6f7a8`` | ``3.2.0`` | Fix migration file/ORM inconsistencies. |
+-------------------------+------------------+-----------------+----------------------------------------------------------+
Expand Down
16 changes: 16 additions & 0 deletions providers/edge3/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,19 @@ config:
type: string
default: ~
example: ~
extended_system_info_function:
description: |
The function to call to get extended system information for the worker.

The function must be async and return a ``dict[str, str | int | float | datetime]``.
The information will be sent to the central site with each heartbeat and can be used for monitoring
and debugging purposes. All int and float values will also be published to metric collection systems
like statsd or otel.

Function must be provided as a string with the full path to the function. See
https://github.com/apache/airflow/blob/main/providers/edge3/src/airflow/providers/edge3/cli/example_extended_sysinfo.py
for an example implementation.
version_added: 3.5.0
type: string
default: ~
example: airflow.providers.edge3.cli.example_extended_sysinfo.get_example_extended_sysinfo
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ async def worker_set_state(
state: EdgeWorkerState,
jobs_active: int,
queues: list[str] | None,
sysinfo: dict,
sysinfo: dict[str, str | int | float | datetime],
maintenance_comments: str | None = None,
team_name: str | None = None,
) -> WorkerSetStateReturn:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example of an extended sysinfo function that can be used in the Edge Worker.

To enable this set the airflow config [edge] extended_system_info_function to
airflow.providers.edge3.cli.example_extended_sysinfo.get_example_extended_sysinfo
"""

from __future__ import annotations

import asyncio
import logging
import os
import shutil
import sys
from datetime import datetime

import psutil


async def get_example_extended_sysinfo() -> dict[str, str | int | float | datetime]:
"""Provide an example extended sysinfo function that can be used in the Edge Worker."""
disk_usage, cpu_usage, loadavg = await asyncio.gather(
asyncio.to_thread(shutil.disk_usage, "/"),
asyncio.to_thread(psutil.cpu_percent, None),
asyncio.to_thread(os.getloadavg),
)
disk_free_gb = round(disk_usage.free / (1024**3), 2)

load_1 = loadavg[0]

status = logging.INFO
status_text = "I am good, sun is shining 🌞"
if cpu_usage > 95 or disk_free_gb < 5:
status = logging.ERROR
status_text = "Critical condition!"
elif cpu_usage > 70 or disk_free_gb < 20:
status = logging.WARNING
status_text = "Warning condition!"

return {
"status": status,
"status_text": status_text,
"platform": sys.platform,
"disk_free_gb": disk_free_gb,
"cpu_usage": cpu_usage,
"sys_load": round(load_1, 2),
}
45 changes: 40 additions & 5 deletions providers/edge3/src/airflow/providers/edge3/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import sys
import traceback
from asyncio import Task, create_task, get_running_loop, sleep
from collections.abc import Awaitable, Callable
from datetime import datetime
from functools import cached_property
from http import HTTPStatus
Expand Down Expand Up @@ -106,6 +107,8 @@ def __init__(
self.daemon = daemon
self.team_name = team_name

self.worker_start_time: datetime = datetime.now()

if TYPE_CHECKING:
self.conf: ExecutorConf | AirflowConfigParser

Expand All @@ -125,6 +128,20 @@ def __init__(
self.push_logs = self.conf.getboolean("edge", "push_logs")
self.push_log_chunk_size = self.conf.getint("edge", "push_log_chunk_size")

self.extended_sysinfo: Callable[[], Awaitable[dict[str, str | int | float | datetime]]] | None = None
extended_sysinfo_func_path = self.conf.get("edge", "extended_system_info_function", fallback=None)
if extended_sysinfo_func_path:
module_path, func_name = extended_sysinfo_func_path.rsplit(".", 1)
try:
module = __import__(module_path, fromlist=[func_name])
self.extended_sysinfo = getattr(module, func_name)
logger.info("Using extended sysinfo function: %s", extended_sysinfo_func_path)
except Exception:
logger.exception(
"Failed to import extended sysinfo function %s, skipping it.",
extended_sysinfo_func_path,
)

@cached_property
def _execution_api_server_url(self) -> str | None:
"""Get the execution api server url from config or environment."""
Expand Down Expand Up @@ -185,14 +202,24 @@ def shutdown_handler(self):
os.setpgid(job.process.pid, 0)
os.kill(job.process.pid, signal.SIGTERM)

def _get_sysinfo(self) -> dict:
async def _get_sysinfo(self) -> dict[str, str | int | float | datetime]:
"""Produce the sysinfo from worker to post to central site."""
return {
sysinfo: dict[str, str | int | float | datetime] = {
"status": logging.INFO,
"airflow_version": airflow_version,
"edge_provider_version": edge_provider_version,
"python_version": sys.version,
"worker_start_time": self.worker_start_time,
"concurrency": self.concurrency,
"free_concurrency": self.free_concurrency,
}
if self.extended_sysinfo:
try:
sysinfo.update(await self.extended_sysinfo())
except Exception:
logger.exception("Failed to get extended sysinfo, skipping it.")

return sysinfo

def _get_state(self) -> EdgeWorkerState:
"""State of the Edge Worker."""
Expand Down Expand Up @@ -280,7 +307,11 @@ async def start(self):
"""Start the execution in a loop until terminated."""
try:
await worker_register(
self.hostname, EdgeWorkerState.STARTING, self.queues, self._get_sysinfo(), self.team_name
self.hostname,
EdgeWorkerState.STARTING,
self.queues,
await self._get_sysinfo(),
self.team_name,
)
except EdgeWorkerVersionException as e:
logger.info("Version mismatch of Edge worker and Core. Shutting down worker.")
Expand Down Expand Up @@ -309,12 +340,16 @@ async def start(self):

logger.info("Quitting worker, signal being offline.")
try:
sysinfo = await self._get_sysinfo()
sysinfo["status"] = logging.NOTSET
if "status_text" in sysinfo:
del sysinfo["status_text"] # Remove old status text if exists
await worker_set_state(
self.hostname,
EdgeWorkerState.OFFLINE_MAINTENANCE if self.maintenance_mode else EdgeWorkerState.OFFLINE,
0,
self.queues,
self._get_sysinfo(),
sysinfo,
team_name=self.team_name,
)
except EdgeWorkerVersionException:
Expand Down Expand Up @@ -408,7 +443,7 @@ async def fetch_and_run_job(self) -> None:
async def heartbeat(self, new_maintenance_comments: str | None = None) -> bool:
"""Report liveness state of worker to central site with stats."""
state = self._get_state()
sysinfo = self._get_sysinfo()
sysinfo = await self._get_sysinfo()
worker_state_changed: bool = False
try:
worker_info = await worker_set_state(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from __future__ import annotations

import logging
from collections.abc import Sequence
from copy import deepcopy
from datetime import datetime, timedelta
Expand Down Expand Up @@ -153,7 +154,7 @@ def _check_worker_liveness(self, session: Session) -> bool:
"""Reset worker state if heartbeat timed out."""
changed = False
heartbeat_interval: int = self.conf.getint("edge", "heartbeat_interval")
lifeless_workers: Sequence[EdgeWorkerModel] = session.scalars(
lifeless_workers = session.scalars(
select(EdgeWorkerModel)
.with_for_update(skip_locked=True)
.where(
Expand Down Expand Up @@ -182,14 +183,20 @@ def _check_worker_liveness(self, session: Session) -> bool:
)
else EdgeWorkerState.UNKNOWN
)
# Reset presented status
sysinfo = dict(worker.sysinfo or {}) # copy needed to have alembic detect change in content
sysinfo["status"] = logging.NOTSET
sysinfo.pop("status_text", None) # Remove old status text if exists
worker.sysinfo = sysinfo
self.log.warning("Worker %s is lifeless. Setting state to %s", worker.worker_name, worker.state)
reset_metrics(worker.worker_name)

return changed

def _update_orphaned_jobs(self, session: Session) -> bool:
"""Update status ob jobs when workers die and don't update anymore."""
heartbeat_interval: int = self.conf.getint("scheduler", "task_instance_heartbeat_timeout")
lifeless_jobs: Sequence[EdgeJobModel] = session.scalars(
lifeless_jobs = session.scalars(
Comment thread
jscheffl marked this conversation as resolved.
select(EdgeJobModel)
.with_for_update(skip_locked=True)
.where(
Expand Down Expand Up @@ -231,7 +238,7 @@ def _purge_jobs(self, session: Session) -> bool:
purged_marker = False
job_success_purge = self.conf.getint("edge", "job_success_purge")
job_fail_purge = self.conf.getint("edge", "job_fail_purge")
jobs: Sequence[EdgeJobModel] = session.scalars(
jobs = session.scalars(
select(EdgeJobModel)
.with_for_update(skip_locked=True)
.where(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ def get_provider_info():
"default": None,
"example": None,
},
"extended_system_info_function": {
"description": "The function to call to get extended system information for the worker.\n\nThe function must be async and return a ``dict[str, str | int | float | datetime]``.\nThe information will be sent to the central site with each heartbeat and can be used for monitoring\nand debugging purposes. All int and float values will also be published to metric collection systems\nlike statsd or otel.\n\nFunction must be provided as a string with the full path to the function. See\nhttps://github.com/apache/airflow/blob/main/providers/edge3/src/airflow/providers/edge3/cli/example_extended_sysinfo.py\nfor an example implementation.\n",
"version_added": "3.5.0",
"type": "string",
"default": None,
"example": "airflow.providers.edge3.cli.example_extended_sysinfo.get_example_extended_sysinfo",
},
},
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Replace individual counters with extended JSON based sysinfo.

Revision ID: c6b3c3d093fd
Revises: a09c3ee8e1d3
Create Date: 2026-04-15 21:57:07.662359
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "c6b3c3d093fd"
down_revision = "a09c3ee8e1d3"
branch_labels = None
depends_on = None
edge3_version = "3.5.0"


def upgrade() -> None:
with op.batch_alter_table("edge_worker", schema=None) as batch_op:
# Can not easuly convert old sysinfo string to new JSON structure, just clear and re-populate by workers on next heartbeat
batch_op.drop_column("sysinfo")
batch_op.add_column(sa.Column("sysinfo", sa.JSON(), nullable=True))
batch_op.drop_column("jobs_failed")
batch_op.drop_column("jobs_taken")
batch_op.drop_column("jobs_success")


def downgrade() -> None:
with op.batch_alter_table("edge_worker", schema=None) as batch_op:
batch_op.add_column(
sa.Column("jobs_success", sa.INTEGER(), autoincrement=False, default=0, nullable=False)
)
batch_op.add_column(
sa.Column("jobs_taken", sa.INTEGER(), autoincrement=False, default=0, nullable=False)
)
batch_op.add_column(
sa.Column("jobs_failed", sa.INTEGER(), autoincrement=False, default=0, nullable=False)
)
batch_op.drop_column("sysinfo")
batch_op.add_column(sa.Column("sysinfo", sa.VARCHAR(length=256), nullable=True))
Loading
Loading