diff --git a/airflow-core/tests/unit/always/test_example_dags.py b/airflow-core/tests/unit/always/test_example_dags.py index 995436b73bc42..0b84f6ae26ce5 100644 --- a/airflow-core/tests/unit/always/test_example_dags.py +++ b/airflow-core/tests/unit/always/test_example_dags.py @@ -118,7 +118,8 @@ def example_not_excluded_dags(xfail_db_exception: bool = False): example_dirs = [ "airflow-core/**/example_dags/example_*.py", "tests/system/**/example_*.py", - "providers/**/example_*.py", + "providers/**/tests/system/**/example_*.py", + "providers/**/example_dags/example_*.py", ] default_branch = os.environ.get("DEFAULT_BRANCH", "main") diff --git a/airflow-core/tests/unit/always/test_project_structure.py b/airflow-core/tests/unit/always/test_project_structure.py index 1169de1bd1754..13ef1ee8320b9 100644 --- a/airflow-core/tests/unit/always/test_project_structure.py +++ b/airflow-core/tests/unit/always/test_project_structure.py @@ -107,6 +107,7 @@ def test_providers_modules_should_have_tests(self): "providers/common/compat/tests/unit/common/compat/standard/test_utils.py", "providers/common/messaging/tests/unit/common/messaging/providers/test_base_provider.py", "providers/common/messaging/tests/unit/common/messaging/providers/test_sqs.py", + "providers/edge3/tests/unit/edge3/cli/test_example_extended_sysinfo.py", "providers/edge3/tests/unit/edge3/models/test_edge_job.py", "providers/edge3/tests/unit/edge3/models/test_edge_logs.py", "providers/edge3/tests/unit/edge3/models/test_edge_worker.py", diff --git a/providers/edge3/docs/deployment.rst b/providers/edge3/docs/deployment.rst index c843349c50ed9..d7e5695dacfd7 100644 --- a/providers/edge3/docs/deployment.rst +++ b/providers/edge3/docs/deployment.rst @@ -258,3 +258,45 @@ instance. The commands are: Workers are identified by hostname. See the :doc:`cli-ref` for the full list of arguments. + +Worker Monitoring +----------------- + +The workers send a regular heartbeat to the central site with their status and the status of the tasks running on them. +This information is stored in the database and can be used to monitor the workers and their tasks as well as is displayed +in the web UI. + +Basic information that is provided by the workers includes: + +- Time of being first online +- Time of last heartbeat +- Airflow version +- Edge provider version +- Python version +- Worker start time +- Concurrency (Jobs that the worker can run in parallel) +- Free concurrency (Jobs that the worker can run in parallel but are currently free) + +In order to extend the basic information provided by the worker, you can implement a custom function and register it via the +``[edge]`` section property ``extended_system_info_function`` setting in your Airflow configuration. +The function needs to be implemented in Python asyncio and returns a dictionary with string keys and values that are JSON serializable. +The returned information will be merged with the basic system information and transported to the central site where it is stored +in the database and extend the worker's system information. + +All numeric values (int and float) that are returned by the function will be populated to the telemetry subsystem (StatsD or OTel) +in the form ``edge_worker.{key}``. Note this requires to add the respective keys to the list of monitored metrics in your telemetry +configuration. This allows you to create custom metrics based on the information returned by the function. + +In the returned dictionary there are two special keys that are used to display the health: + +- ``status``: This is a numeric value that is used to determine the health status of the worker. It is expected to be one of the + logging levels defined in the logging module (e.g. logging.INFO, logging.WARNING, logging.ERROR). Based on this value the health + status of the worker is determined and displayed in the web UI. +- ``status_text``: This is a string value that is used to display additional information about the health status of the worker in + the web UI. It can be used to provide more context about the health status of the worker and overrides the default status text. + +See https://github.com/apache/airflow/blob/main/providers/edge3/src/airflow/providers/edge3/cli/example_extended_sysinfo.py +for an example implementation of such a function. + +.. image:: img/edge_sysinfo.png + :alt: Example of the extended system information provided by the worker on the UI plugin diff --git a/providers/edge3/docs/img/edge_sysinfo.png b/providers/edge3/docs/img/edge_sysinfo.png new file mode 100644 index 0000000000000..ac308373922dd Binary files /dev/null and b/providers/edge3/docs/img/edge_sysinfo.png differ diff --git a/providers/edge3/docs/migrations-ref.rst b/providers/edge3/docs/migrations-ref.rst index f1b0c08573fd2..7d0846c0029a6 100644 --- a/providers/edge3/docs/migrations-ref.rst +++ b/providers/edge3/docs/migrations-ref.rst @@ -34,7 +34,10 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-----------------+----------------------------------------------------------+ | Revision ID | Revises ID | Edge3 Version | Description | +=========================+==================+=================+==========================================================+ -| ``a09c3ee8e1d3`` (head) | ``8c275b6fbaa8`` | ``3.4.0`` | Add team_name column to edge_job and edge_worker tables. | +| ``c6b3c3d093fd`` (head) | ``a09c3ee8e1d3`` | ``3.5.0`` | Replace individual counters with extended JSON based | +| | | | sysinfo. | ++-------------------------+------------------+-----------------+----------------------------------------------------------+ +| ``a09c3ee8e1d3`` | ``8c275b6fbaa8`` | ``3.4.0`` | Add team_name column to edge_job and edge_worker tables. | +-------------------------+------------------+-----------------+----------------------------------------------------------+ | ``8c275b6fbaa8`` | ``b3c4d5e6f7a8`` | ``3.2.0`` | Fix migration file/ORM inconsistencies. | +-------------------------+------------------+-----------------+----------------------------------------------------------+ diff --git a/providers/edge3/provider.yaml b/providers/edge3/provider.yaml index 1ba8b89067a4c..741f8b14d0317 100644 --- a/providers/edge3/provider.yaml +++ b/providers/edge3/provider.yaml @@ -172,3 +172,19 @@ config: type: string default: ~ example: ~ + extended_system_info_function: + description: | + The function to call to get extended system information for the worker. + + The function must be async and return a ``dict[str, str | int | float | datetime]``. + The information will be sent to the central site with each heartbeat and can be used for monitoring + and debugging purposes. All int and float values will also be published to metric collection systems + like statsd or otel. + + Function must be provided as a string with the full path to the function. See + https://github.com/apache/airflow/blob/main/providers/edge3/src/airflow/providers/edge3/cli/example_extended_sysinfo.py + for an example implementation. + version_added: 3.5.0 + type: string + default: ~ + example: airflow.providers.edge3.cli.example_extended_sysinfo.get_example_extended_sysinfo diff --git a/providers/edge3/src/airflow/providers/edge3/cli/api_client.py b/providers/edge3/src/airflow/providers/edge3/cli/api_client.py index 1fd53245e19ec..b7ce2316bf61d 100644 --- a/providers/edge3/src/airflow/providers/edge3/cli/api_client.py +++ b/providers/edge3/src/airflow/providers/edge3/cli/api_client.py @@ -144,7 +144,7 @@ async def worker_set_state( state: EdgeWorkerState, jobs_active: int, queues: list[str] | None, - sysinfo: dict, + sysinfo: dict[str, str | int | float | datetime], maintenance_comments: str | None = None, team_name: str | None = None, ) -> WorkerSetStateReturn: diff --git a/providers/edge3/src/airflow/providers/edge3/cli/example_extended_sysinfo.py b/providers/edge3/src/airflow/providers/edge3/cli/example_extended_sysinfo.py new file mode 100644 index 0000000000000..1651ebc692e6a --- /dev/null +++ b/providers/edge3/src/airflow/providers/edge3/cli/example_extended_sysinfo.py @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example of an extended sysinfo function that can be used in the Edge Worker. + +To enable this set the airflow config [edge] extended_system_info_function to +airflow.providers.edge3.cli.example_extended_sysinfo.get_example_extended_sysinfo +""" + +from __future__ import annotations + +import asyncio +import logging +import os +import shutil +import sys +from datetime import datetime + +import psutil + + +async def get_example_extended_sysinfo() -> dict[str, str | int | float | datetime]: + """Provide an example extended sysinfo function that can be used in the Edge Worker.""" + disk_usage, cpu_usage, loadavg = await asyncio.gather( + asyncio.to_thread(shutil.disk_usage, "/"), + asyncio.to_thread(psutil.cpu_percent, None), + asyncio.to_thread(os.getloadavg), + ) + disk_free_gb = round(disk_usage.free / (1024**3), 2) + + load_1 = loadavg[0] + + status = logging.INFO + status_text = "I am good, sun is shining 🌞" + if cpu_usage > 95 or disk_free_gb < 5: + status = logging.ERROR + status_text = "Critical condition!" + elif cpu_usage > 70 or disk_free_gb < 20: + status = logging.WARNING + status_text = "Warning condition!" + + return { + "status": status, + "status_text": status_text, + "platform": sys.platform, + "disk_free_gb": disk_free_gb, + "cpu_usage": cpu_usage, + "sys_load": round(load_1, 2), + } diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py b/providers/edge3/src/airflow/providers/edge3/cli/worker.py index 6ab2a9bc80770..2e4010e9dc778 100644 --- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py +++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py @@ -22,6 +22,7 @@ import sys import traceback from asyncio import Task, create_task, get_running_loop, sleep +from collections.abc import Awaitable, Callable from datetime import datetime from functools import cached_property from http import HTTPStatus @@ -106,6 +107,8 @@ def __init__( self.daemon = daemon self.team_name = team_name + self.worker_start_time: datetime = datetime.now() + if TYPE_CHECKING: self.conf: ExecutorConf | AirflowConfigParser @@ -125,6 +128,20 @@ def __init__( self.push_logs = self.conf.getboolean("edge", "push_logs") self.push_log_chunk_size = self.conf.getint("edge", "push_log_chunk_size") + self.extended_sysinfo: Callable[[], Awaitable[dict[str, str | int | float | datetime]]] | None = None + extended_sysinfo_func_path = self.conf.get("edge", "extended_system_info_function", fallback=None) + if extended_sysinfo_func_path: + module_path, func_name = extended_sysinfo_func_path.rsplit(".", 1) + try: + module = __import__(module_path, fromlist=[func_name]) + self.extended_sysinfo = getattr(module, func_name) + logger.info("Using extended sysinfo function: %s", extended_sysinfo_func_path) + except Exception: + logger.exception( + "Failed to import extended sysinfo function %s, skipping it.", + extended_sysinfo_func_path, + ) + @cached_property def _execution_api_server_url(self) -> str | None: """Get the execution api server url from config or environment.""" @@ -185,14 +202,24 @@ def shutdown_handler(self): os.setpgid(job.process.pid, 0) os.kill(job.process.pid, signal.SIGTERM) - def _get_sysinfo(self) -> dict: + async def _get_sysinfo(self) -> dict[str, str | int | float | datetime]: """Produce the sysinfo from worker to post to central site.""" - return { + sysinfo: dict[str, str | int | float | datetime] = { + "status": logging.INFO, "airflow_version": airflow_version, "edge_provider_version": edge_provider_version, + "python_version": sys.version, + "worker_start_time": self.worker_start_time, "concurrency": self.concurrency, "free_concurrency": self.free_concurrency, } + if self.extended_sysinfo: + try: + sysinfo.update(await self.extended_sysinfo()) + except Exception: + logger.exception("Failed to get extended sysinfo, skipping it.") + + return sysinfo def _get_state(self) -> EdgeWorkerState: """State of the Edge Worker.""" @@ -280,7 +307,11 @@ async def start(self): """Start the execution in a loop until terminated.""" try: await worker_register( - self.hostname, EdgeWorkerState.STARTING, self.queues, self._get_sysinfo(), self.team_name + self.hostname, + EdgeWorkerState.STARTING, + self.queues, + await self._get_sysinfo(), + self.team_name, ) except EdgeWorkerVersionException as e: logger.info("Version mismatch of Edge worker and Core. Shutting down worker.") @@ -309,12 +340,16 @@ async def start(self): logger.info("Quitting worker, signal being offline.") try: + sysinfo = await self._get_sysinfo() + sysinfo["status"] = logging.NOTSET + if "status_text" in sysinfo: + del sysinfo["status_text"] # Remove old status text if exists await worker_set_state( self.hostname, EdgeWorkerState.OFFLINE_MAINTENANCE if self.maintenance_mode else EdgeWorkerState.OFFLINE, 0, self.queues, - self._get_sysinfo(), + sysinfo, team_name=self.team_name, ) except EdgeWorkerVersionException: @@ -408,7 +443,7 @@ async def fetch_and_run_job(self) -> None: async def heartbeat(self, new_maintenance_comments: str | None = None) -> bool: """Report liveness state of worker to central site with stats.""" state = self._get_state() - sysinfo = self._get_sysinfo() + sysinfo = await self._get_sysinfo() worker_state_changed: bool = False try: worker_info = await worker_set_state( diff --git a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py index be4ba3f915e29..d9b67f69c94b0 100644 --- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py +++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py @@ -17,6 +17,7 @@ from __future__ import annotations +import logging from collections.abc import Sequence from copy import deepcopy from datetime import datetime, timedelta @@ -153,7 +154,7 @@ def _check_worker_liveness(self, session: Session) -> bool: """Reset worker state if heartbeat timed out.""" changed = False heartbeat_interval: int = self.conf.getint("edge", "heartbeat_interval") - lifeless_workers: Sequence[EdgeWorkerModel] = session.scalars( + lifeless_workers = session.scalars( select(EdgeWorkerModel) .with_for_update(skip_locked=True) .where( @@ -182,6 +183,12 @@ def _check_worker_liveness(self, session: Session) -> bool: ) else EdgeWorkerState.UNKNOWN ) + # Reset presented status + sysinfo = dict(worker.sysinfo or {}) # copy needed to have alembic detect change in content + sysinfo["status"] = logging.NOTSET + sysinfo.pop("status_text", None) # Remove old status text if exists + worker.sysinfo = sysinfo + self.log.warning("Worker %s is lifeless. Setting state to %s", worker.worker_name, worker.state) reset_metrics(worker.worker_name) return changed @@ -189,7 +196,7 @@ def _check_worker_liveness(self, session: Session) -> bool: def _update_orphaned_jobs(self, session: Session) -> bool: """Update status ob jobs when workers die and don't update anymore.""" heartbeat_interval: int = self.conf.getint("scheduler", "task_instance_heartbeat_timeout") - lifeless_jobs: Sequence[EdgeJobModel] = session.scalars( + lifeless_jobs = session.scalars( select(EdgeJobModel) .with_for_update(skip_locked=True) .where( @@ -231,7 +238,7 @@ def _purge_jobs(self, session: Session) -> bool: purged_marker = False job_success_purge = self.conf.getint("edge", "job_success_purge") job_fail_purge = self.conf.getint("edge", "job_fail_purge") - jobs: Sequence[EdgeJobModel] = session.scalars( + jobs = session.scalars( select(EdgeJobModel) .with_for_update(skip_locked=True) .where( diff --git a/providers/edge3/src/airflow/providers/edge3/get_provider_info.py b/providers/edge3/src/airflow/providers/edge3/get_provider_info.py index 9e4c9a2bf9883..441cee6b65a13 100644 --- a/providers/edge3/src/airflow/providers/edge3/get_provider_info.py +++ b/providers/edge3/src/airflow/providers/edge3/get_provider_info.py @@ -109,6 +109,13 @@ def get_provider_info(): "default": None, "example": None, }, + "extended_system_info_function": { + "description": "The function to call to get extended system information for the worker.\n\nThe function must be async and return a ``dict[str, str | int | float | datetime]``.\nThe information will be sent to the central site with each heartbeat and can be used for monitoring\nand debugging purposes. All int and float values will also be published to metric collection systems\nlike statsd or otel.\n\nFunction must be provided as a string with the full path to the function. See\nhttps://github.com/apache/airflow/blob/main/providers/edge3/src/airflow/providers/edge3/cli/example_extended_sysinfo.py\nfor an example implementation.\n", + "version_added": "3.5.0", + "type": "string", + "default": None, + "example": "airflow.providers.edge3.cli.example_extended_sysinfo.get_example_extended_sysinfo", + }, }, } }, diff --git a/providers/edge3/src/airflow/providers/edge3/migrations/versions/0005_3_5_0_replace_individual_counters_with_.py b/providers/edge3/src/airflow/providers/edge3/migrations/versions/0005_3_5_0_replace_individual_counters_with_.py new file mode 100644 index 0000000000000..10eeef9efeaa2 --- /dev/null +++ b/providers/edge3/src/airflow/providers/edge3/migrations/versions/0005_3_5_0_replace_individual_counters_with_.py @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Replace individual counters with extended JSON based sysinfo. + +Revision ID: c6b3c3d093fd +Revises: a09c3ee8e1d3 +Create Date: 2026-04-15 21:57:07.662359 +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "c6b3c3d093fd" +down_revision = "a09c3ee8e1d3" +branch_labels = None +depends_on = None +edge3_version = "3.5.0" + + +def upgrade() -> None: + with op.batch_alter_table("edge_worker", schema=None) as batch_op: + # Can not easuly convert old sysinfo string to new JSON structure, just clear and re-populate by workers on next heartbeat + batch_op.drop_column("sysinfo") + batch_op.add_column(sa.Column("sysinfo", sa.JSON(), nullable=True)) + batch_op.drop_column("jobs_failed") + batch_op.drop_column("jobs_taken") + batch_op.drop_column("jobs_success") + + +def downgrade() -> None: + with op.batch_alter_table("edge_worker", schema=None) as batch_op: + batch_op.add_column( + sa.Column("jobs_success", sa.INTEGER(), autoincrement=False, default=0, nullable=False) + ) + batch_op.add_column( + sa.Column("jobs_taken", sa.INTEGER(), autoincrement=False, default=0, nullable=False) + ) + batch_op.add_column( + sa.Column("jobs_failed", sa.INTEGER(), autoincrement=False, default=0, nullable=False) + ) + batch_op.drop_column("sysinfo") + batch_op.add_column(sa.Column("sysinfo", sa.VARCHAR(length=256), nullable=True)) diff --git a/providers/edge3/src/airflow/providers/edge3/models/db.py b/providers/edge3/src/airflow/providers/edge3/models/db.py index f564b1d117b93..a1cd1d42ee1c4 100644 --- a/providers/edge3/src/airflow/providers/edge3/models/db.py +++ b/providers/edge3/src/airflow/providers/edge3/models/db.py @@ -46,6 +46,7 @@ def _callable_accepts_use_migration_files(callable_: Any) -> bool: "3.0.0": "9d34dfc2de06", "3.2.0": "8c275b6fbaa8", "3.4.0": "a09c3ee8e1d3", + "3.5.0": "c6b3c3d093fd", } diff --git a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py index cf72300681c2a..5f2f7e999e5ad 100644 --- a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py +++ b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py @@ -17,23 +17,18 @@ from __future__ import annotations import ast -import json import logging from datetime import datetime from enum import Enum from typing import TYPE_CHECKING -from sqlalchemy import Integer, String, delete, select +from sqlalchemy import JSON, Integer, String, delete, select from sqlalchemy.orm import Mapped -from airflow.providers.common.compat.sdk import AirflowException, Stats, timezone -from airflow.providers.edge3.models.edge_base import Base - -try: - from airflow.sdk.observability.stats import DualStatsManager -except ImportError: - DualStatsManager = None # type: ignore[assignment,misc] # Airflow < 3.2 compat +from airflow.providers.common.compat.sdk import AirflowException, timezone from airflow.providers.common.compat.sqlalchemy.orm import mapped_column +from airflow.providers.edge3.models.edge_base import Base +from airflow.providers.edge3.version_compat import AIRFLOW_V_3_2_PLUS from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.providers_configuration_loader import providers_configuration_loaded from airflow.utils.session import NEW_SESSION, provide_session @@ -99,10 +94,7 @@ class EdgeWorkerModel(Base, LoggingMixin): first_online: Mapped[datetime | None] = mapped_column(UtcDateTime) last_update: Mapped[datetime | None] = mapped_column(UtcDateTime) jobs_active: Mapped[int] = mapped_column(Integer, default=0) - jobs_taken: Mapped[int] = mapped_column(Integer, default=0) - jobs_success: Mapped[int] = mapped_column(Integer, default=0) - jobs_failed: Mapped[int] = mapped_column(Integer, default=0) - sysinfo: Mapped[str | None] = mapped_column(String(256)) + sysinfo: Mapped[dict | None] = mapped_column(JSON, nullable=True) team_name: Mapped[str | None] = mapped_column(String(64), nullable=True) concurrency: Mapped[int | None] = mapped_column(Integer, nullable=True) @@ -125,10 +117,6 @@ def __init__( self.team_name = team_name super().__init__() - @property - def sysinfo_json(self) -> dict | None: - return json.loads(self.sysinfo) if self.sysinfo else None - @property def queues(self) -> list[str] | None: """Return list of queues which are stored in queues field.""" @@ -168,6 +156,7 @@ def set_metrics( concurrency: int, free_concurrency: int, queues: list[str] | None, + sysinfo: dict[str, str | int | float | datetime], ) -> None: """Set metric of edge worker.""" queues = queues if queues else [] @@ -181,8 +170,31 @@ def set_metrics( EdgeWorkerState.MAINTENANCE_EXIT, EdgeWorkerState.OFFLINE_MAINTENANCE, ) + additional_keys = set(sysinfo or ()) - { + "status", + "airflow_version", + "edge_provider_version", + "python_version", + "worker_start_time", + "concurrency", + "free_concurrency", + } + + if AIRFLOW_V_3_2_PLUS: + from airflow.sdk.observability.stats import DualStatsManager + + try: + DualStatsManager.gauge( + "edge_worker.status", + sysinfo.get("status", logging.NOTSET), # type: ignore + tags={}, + extra_tags={"worker_name": worker_name}, + ) + except ValueError: + logger.warning( + "Failed to set metric edge_worker.status. Mapping is missing in metrics_template.yaml" + ) - if DualStatsManager is not None: DualStatsManager.gauge( "edge_worker.connected", int(connected), @@ -224,7 +236,34 @@ def set_metrics( tags={}, extra_tags={"worker_name": worker_name, "queues": ",".join(queues)}, ) + + for key in additional_keys: + value = sysinfo.get(key) + if isinstance(value, (int, float)): + try: + DualStatsManager.gauge( + f"edge_worker.{key}", + value, + tags={}, + extra_tags={"worker_name": worker_name}, + ) + except ValueError as e: + logger.warning( + "Failed to set metric for key %s with value %s: %s", + key, + value, + e, + ) else: + from airflow.providers.common.compat.sdk import Stats + + Stats.gauge(f"edge_worker.status.{worker_name}", sysinfo.get("status", logging.NOTSET)) # type: ignore + Stats.gauge( + "edge_worker.status", + sysinfo.get("status", logging.NOTSET), # type: ignore + tags={"worker_name": worker_name}, + ) + Stats.gauge(f"edge_worker.connected.{worker_name}", int(connected)) Stats.gauge("edge_worker.connected", int(connected), tags={"worker_name": worker_name}) @@ -247,6 +286,12 @@ def set_metrics( tags={"worker_name": worker_name, "queues": ",".join(queues)}, ) + for key in additional_keys: + value = sysinfo.get(key) + if isinstance(value, (int, float)): + Stats.gauge(f"edge_worker.{key}.{worker_name}", value) + Stats.gauge(f"edge_worker.{key}", value, tags={"worker_name": worker_name}) + def reset_metrics(worker_name: str) -> None: """Reset metrics of worker.""" @@ -257,6 +302,9 @@ def reset_metrics(worker_name: str) -> None: concurrency=0, free_concurrency=-1, queues=None, + sysinfo={ + "status": logging.NOTSET, + }, ) diff --git a/providers/edge3/src/airflow/providers/edge3/plugins/www/src/components/WorkerSysinfoBadge.tsx b/providers/edge3/src/airflow/providers/edge3/plugins/www/src/components/WorkerSysinfoBadge.tsx new file mode 100644 index 0000000000000..639c9be78055a --- /dev/null +++ b/providers/edge3/src/airflow/providers/edge3/plugins/www/src/components/WorkerSysinfoBadge.tsx @@ -0,0 +1,141 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Badge, type BadgeProps, HoverCard, List, Text } from "@chakra-ui/react"; +import * as React from "react"; +import TimeAgo from "react-timeago"; + +import { LuCheck } from "react-icons/lu"; +import { MdErrorOutline } from "react-icons/md"; +import { PiQuestion, PiWarning } from "react-icons/pi"; + +const status2Color = (status: number | undefined) => { + // logging.NOTSET == 0, keep 10 as boundary between NOTSET and INFO + if (status === undefined || status <= 10) { + return "gray"; + } + // logging.INFO == 20, keep 25 as boundary between INFO and WARNING + if (status <= 25) { + return "green"; + } + // logging.WARNING == 30, keep 35 as boundary between WARNING and ERROR + if (status <= 35) { + return "yellow"; + } + // all other assume is like logging.ERROR == 40 or higher + return "red"; +}; + +const status2Text = (status: number | undefined) => { + // same levels as above in status2Color + if (status === undefined || status <= 10) { + return "Unknown"; + } + // logging.INFO == 20, keep 25 as boundary between INFO and WARNING + if (status <= 25) { + return "Healthy"; + } + // logging.WARNING == 30, keep 35 as boundary between WARNING and ERROR + if (status <= 35) { + return "Warning"; + } + // all other assume is like logging.ERROR == 40 or higher + return "Error"; +}; + +const capitalize = (s: string) => s.charAt(0).toUpperCase() + s.slice(1).replaceAll("_", " "); + +const isDate = (value: string | number): boolean => { + if (typeof value === "number") { + return false; // numbers are not considered dates in this context + } + // Do not attempt to parse version strings that may look like dates but are not + if (/^\d+\.\d+\.\d+.*/.test(value)) { + return false; + } + // Check if the value is a string that can be parsed into a date + const date = Date.parse(value); + return !isNaN(date); +}; + +type IconProps = { + readonly status: number | undefined; +}; + +const WorkerSysinfoIcon = ({ status, ...rest }: IconProps) => { + // same levels as above in status2Color + if (status === undefined || status <= 10) { + return ; + } + if (status <= 25) { + return ; + } + if (status <= 35) { + return ; + } + return ; +}; + +export type Props = { + readonly sysinfo: { [key: string]: string | number; } + readonly first_online: string | null | undefined; + readonly last_heartbeat: string | null | undefined; +} & BadgeProps; + +export const WorkerSysinfoBadge = React.forwardRef( + ({ children, sysinfo, first_online, last_heartbeat, ...rest }, ref) => ( + + + + + + {sysinfo.status_text ?? status2Text(sysinfo.status as number | undefined)} + + {children} + + + + + + {sysinfo ? ( + + First online: {first_online ? : "N/A"} + Last heartbeat: {last_heartbeat ? : "N/A"} + {Object.entries(sysinfo).filter(([key]) => key !== "status" && key !== "status_text").map(([key, value]) => ( + + {capitalize(key)}: {isDate(value) ? : value} + + ))} + + ) : ( + "N/A" + )} + + + + ), +); diff --git a/providers/edge3/src/airflow/providers/edge3/plugins/www/src/pages/WorkerPage.tsx b/providers/edge3/src/airflow/providers/edge3/plugins/www/src/pages/WorkerPage.tsx index 23da0d86d8269..d96d41d9bb9a2 100644 --- a/providers/edge3/src/airflow/providers/edge3/plugins/www/src/pages/WorkerPage.tsx +++ b/providers/edge3/src/airflow/providers/edge3/plugins/www/src/pages/WorkerPage.tsx @@ -35,7 +35,6 @@ import { useUiServiceWorker } from "openapi/queries"; import type { EdgeWorkerState, Worker } from "openapi/requests/types.gen"; import { Link } from "react-router-dom"; import { LuExternalLink } from "react-icons/lu"; -import TimeAgo from "react-timeago"; import { BulkWorkerOperations } from "src/components/BulkWorkerOperations"; import { ErrorAlert } from "src/components/ErrorAlert"; @@ -45,6 +44,7 @@ import { WorkerStateBadge } from "src/components/WorkerStateBadge"; import { ScrollToAnchor, Select } from "src/components/ui"; import { workerStateOptions } from "src/constants"; import { autoRefreshInterval } from "src/utils"; +import { WorkerSysinfoBadge } from "src/components/WorkerSysinfoBadge"; export const WorkerPage = () => { const [workerNamePattern, setWorkerNamePattern] = useState(""); @@ -220,10 +220,8 @@ export const WorkerPage = () => { Worker Name State Queues - First Online - Last Heartbeat Active Jobs - System Information + System Status Operations @@ -259,12 +257,6 @@ export const WorkerPage = () => { "(all queues)" )} - - {worker.first_online ? : undefined} - - - {worker.last_heartbeat ? : undefined} - {worker.jobs_active !== undefined && worker.jobs_active > 0 ? ( @@ -275,17 +267,7 @@ export const WorkerPage = () => { )} - {worker.sysinfo ? ( - - {Object.entries(worker.sysinfo).map(([key, value]) => ( - - {key}: {value} - - ))} - - ) : ( - "N/A" - )} + diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels.py b/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels.py index 1a523550545bf..aefc1af6119a9 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels.py +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels.py @@ -148,11 +148,12 @@ class WorkerStateBody(WorkerQueuesBase): ), ] = None sysinfo: Annotated[ - dict[str, str | int], + dict[str, str | int | float | datetime], Field( description="System information of the worker.", examples=[ { + "status": 20, "concurrency": 4, "free_concurrency": 3, "airflow_version": "2.0.0", diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py index e05159c273148..e18fed366ac32 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py @@ -26,13 +26,9 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.executors.workloads import ExecuteTask -from airflow.providers.common.compat.sdk import Stats, timezone - -try: - from airflow.sdk.observability.stats import DualStatsManager -except ImportError: - DualStatsManager = None # type: ignore[assignment,misc] # Airflow < 3.2 compat +from airflow.providers.common.compat.sdk import timezone from airflow.providers.edge3.models.edge_job import EdgeJobModel +from airflow.providers.edge3.version_compat import AIRFLOW_V_3_2_PLUS from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest from airflow.providers.edge3.worker_api.datamodels import ( EdgeJobFetched, @@ -92,9 +88,13 @@ def fetch( session.commit() # Edge worker does not backport emitted Airflow metrics, so export some metrics tags = {"dag_id": job.dag_id, "task_id": job.task_id, "queue": job.queue} - if DualStatsManager is not None: + if AIRFLOW_V_3_2_PLUS: + from airflow.sdk.observability.stats import DualStatsManager + DualStatsManager.incr("edge_worker.ti.start", tags=tags) else: + from airflow.providers.common.compat.sdk import Stats + Stats.incr(f"edge_worker.ti.start.{job.queue}.{job.dag_id}.{job.task_id}", tags=tags) Stats.incr("edge_worker.ti.start", tags=tags) return EdgeJobFetched( @@ -149,12 +149,16 @@ def state( "queue": job.queue, "state": str(state), } - if DualStatsManager is not None: + if AIRFLOW_V_3_2_PLUS: + from airflow.sdk.observability.stats import DualStatsManager + DualStatsManager.incr( "edge_worker.ti.finish", tags=tags, ) else: + from airflow.providers.common.compat.sdk import Stats + Stats.incr( f"edge_worker.ti.finish.{job.queue}.{state}.{job.dag_id}.{job.task_id}", tags=tags, diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/ui.py b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/ui.py index 6796326014079..59a39f9b4cfc0 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/ui.py +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/ui.py @@ -86,7 +86,7 @@ def worker( queues=w.queues, state=w.state, jobs_active=w.jobs_active, - sysinfo=w.sysinfo_json or {}, + sysinfo=w.sysinfo or {}, maintenance_comments=w.maintenance_comment, first_online=w.first_online, last_heartbeat=w.last_update, diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py index b66be595c0c74..223c94afb735d 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py @@ -17,7 +17,7 @@ from __future__ import annotations -import json +from datetime import datetime from typing import Annotated from fastapi import Body, Depends, HTTPException, Path, status @@ -26,13 +26,9 @@ from airflow.api_fastapi.common.db.common import SessionDep # noqa: TC001 from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc -from airflow.providers.common.compat.sdk import Stats, timezone - -try: - from airflow.sdk.observability.stats import DualStatsManager -except ImportError: - DualStatsManager = None # type: ignore[assignment,misc] # Airflow < 3.2 compat +from airflow.providers.common.compat.sdk import timezone from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState, set_metrics +from airflow.providers.edge3.version_compat import AIRFLOW_V_3_2_PLUS from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest from airflow.providers.edge3.worker_api.datamodels import ( WorkerQueueUpdateBody, @@ -54,7 +50,7 @@ ) -def _assert_version(sysinfo: dict[str, str | int]) -> None: +def _assert_version(sysinfo: dict[str, str | int | float | datetime]) -> None: """Check if the Edge Worker version matches the central API site.""" from airflow import __version__ as airflow_version from airflow.providers.edge3 import __version__ as edge_provider_version @@ -98,6 +94,7 @@ def _assert_version(sysinfo: dict[str, str | int]) -> None: "jobs_active": 3, "queues": ["large_node", "wisconsin_site"], "sysinfo": { + "status": 20, "concurrency": 4, "airflow_version": "2.10.0", "edge_provider_version": "1.0.0", @@ -194,7 +191,7 @@ def register( worker.maintenance_comment, body.maintenance_comments ) worker.queues = body.queues - worker.sysinfo = json.dumps(body.sysinfo) + worker.sysinfo = body.sysinfo worker.last_update = timezone.utcnow() worker.team_name = body.team_name session.add(worker) @@ -217,10 +214,12 @@ def set_state( worker.maintenance_comment, body.maintenance_comments ) worker.jobs_active = body.jobs_active - worker.sysinfo = json.dumps(body.sysinfo) + worker.sysinfo = body.sysinfo worker.last_update = timezone.utcnow() session.commit() - if DualStatsManager is not None: + if AIRFLOW_V_3_2_PLUS: + from airflow.sdk.observability.stats import DualStatsManager + DualStatsManager.incr( "edge_worker.heartbeat_count", 1, @@ -229,15 +228,20 @@ def set_state( extra_tags={"worker_name": worker_name}, ) else: + from airflow.providers.common.compat.sdk import Stats + Stats.incr(f"edge_worker.heartbeat_count.{worker_name}", 1, 1) Stats.incr("edge_worker.heartbeat_count", 1, 1, tags={"worker_name": worker_name}) + concurrency: int = body.sysinfo.get("concurrency", -1) # type: ignore + free_concurrency: int = body.sysinfo.get("free_concurrency", -1) # type: ignore set_metrics( worker_name=worker_name, state=body.state, jobs_active=body.jobs_active, - concurrency=int(body.sysinfo.get("concurrency", -1)), - free_concurrency=int(body.sysinfo["free_concurrency"]), + concurrency=concurrency, + free_concurrency=free_concurrency, queues=worker.queues, + sysinfo=body.sysinfo, ) _assert_version(body.sysinfo) # Exception only after worker state is in the DB return WorkerSetStateReturn( diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml index e6babb7d3dca0..0cc94046e158e 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml @@ -375,6 +375,7 @@ paths: - large_node - wisconsin_site sysinfo: + status: 20 concurrency: 4 airflow_version: 2.10.0 edge_provider_version: 1.0.0 @@ -447,6 +448,7 @@ paths: - large_node - wisconsin_site sysinfo: + status: 20 concurrency: 4 airflow_version: 2.10.0 edge_provider_version: 1.0.0 @@ -1336,6 +1338,9 @@ components: anyOf: - type: string - type: integer + - type: number + - type: string + format: date-time type: object title: Sysinfo description: System information of the worker. @@ -1344,6 +1349,7 @@ components: concurrency: 4 edge_provider_version: 1.0.0 free_concurrency: 3 + status: 20 maintenance_comments: anyOf: - type: string @@ -1518,6 +1524,9 @@ components: anyOf: - type: string - type: integer + - type: number + - type: string + format: date-time type: object title: Sysinfo description: System information of the worker. @@ -1526,6 +1535,7 @@ components: concurrency: 4 edge_provider_version: 1.0.0 free_concurrency: 3 + status: 20 maintenance_comments: anyOf: - type: string diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py b/providers/edge3/tests/unit/edge3/cli/test_worker.py index d3b721c1e3213..dcfbe6614ef3d 100644 --- a/providers/edge3/tests/unit/edge3/cli/test_worker.py +++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py @@ -139,6 +139,20 @@ def worker_with_job(self, tmp_path: Path, mock_joblist: list[Job]) -> EdgeWorker EdgeWorker.jobs = mock_joblist return test_worker + @pytest.fixture + def worker_with_job_and_sysinfo(self, tmp_path: Path, mock_joblist: list[Job]) -> EdgeWorker: + with conf_vars( + { + ( + "edge", + "extended_system_info_function", + ): "airflow.providers.edge3.cli.example_extended_sysinfo.get_example_extended_sysinfo" + } + ): + test_worker = EdgeWorker(str(tmp_path / "mock.pid"), "mock", None, 8) + EdgeWorker.jobs = mock_joblist + return test_worker + @pytest.fixture def mock_edgeworker(self) -> EdgeWorkerModel: test_edgeworker = EdgeWorkerModel( @@ -500,13 +514,35 @@ def stop_running(): mock_loop.assert_called_once() assert mock_set_state.call_count == 1 - def test_get_sysinfo(self, worker_with_job: EdgeWorker): + @pytest.mark.asyncio + async def test_get_sysinfo(self, worker_with_job: EdgeWorker): concurrency = 8 worker_with_job.concurrency = concurrency - sysinfo = worker_with_job._get_sysinfo() + sysinfo = await worker_with_job._get_sysinfo() + assert "airflow_version" in sysinfo + assert "edge_provider_version" in sysinfo + assert "python_version" in sysinfo + assert "concurrency" in sysinfo + assert "worker_start_time" in sysinfo + assert sysinfo["worker_start_time"] == worker_with_job.worker_start_time + assert "status" in sysinfo + assert "status_text" not in sysinfo # is only defined if extended sysinfo provides this field + assert sysinfo["concurrency"] == concurrency + + @pytest.mark.asyncio + async def test_get_sysinfo_extended(self, worker_with_job_and_sysinfo: EdgeWorker): + concurrency = 42 + worker_with_job_and_sysinfo.concurrency = concurrency + sysinfo = await worker_with_job_and_sysinfo._get_sysinfo() assert "airflow_version" in sysinfo assert "edge_provider_version" in sysinfo + assert "python_version" in sysinfo assert "concurrency" in sysinfo + assert "worker_start_time" in sysinfo + assert sysinfo["worker_start_time"] == worker_with_job_and_sysinfo.worker_start_time + assert "status" in sysinfo + assert "status_text" in sysinfo + assert "disk_free_gb" in sysinfo assert sysinfo["concurrency"] == concurrency @pytest.mark.db_test diff --git a/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py b/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py index 2ef41ece0a5b9..0b31797d528ad 100644 --- a/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py +++ b/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import logging import os from datetime import datetime, timedelta from unittest import mock @@ -25,8 +26,7 @@ import time_machine from sqlalchemy import delete, select -from airflow.models.taskinstancekey import TaskInstanceKey -from airflow.providers.common.compat.sdk import Stats, conf, timezone +from airflow.providers.common.compat.sdk import Stats, TaskInstanceKey, conf, timezone from airflow.providers.edge3.executors.edge_executor import EdgeExecutor from airflow.providers.edge3.models.edge_job import EdgeJobModel from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState @@ -240,15 +240,15 @@ def test_sync_active_worker(self): datetime(2023, 1, 1, 0, 59, 10, tzinfo=timezone.utc), ), ]: - session.add( - EdgeWorkerModel( - worker_name=worker_name, - state=state, - last_update=last_heartbeat, - queues="", - first_online=timezone.utcnow(), - ) + ewm = EdgeWorkerModel( + worker_name=worker_name, + state=state, + last_update=last_heartbeat, + queues="", + first_online=timezone.utcnow(), ) + ewm.sysinfo = {"status": logging.INFO, "status_text": "I am good, sun is shining 🌞"} + session.add(ewm) session.commit() with time_machine.travel(datetime(2023, 1, 1, 1, 0, 0, tzinfo=timezone.utc), tick=False): @@ -259,13 +259,19 @@ def test_sync_active_worker(self): for worker in session.scalars(select(EdgeWorkerModel)).all(): print(worker.worker_name) if "maintenance_" in worker.worker_name: - EdgeWorkerState.OFFLINE_MAINTENANCE + assert worker.state == EdgeWorkerState.OFFLINE_MAINTENANCE elif "offline_" in worker.worker_name: assert worker.state == EdgeWorkerState.OFFLINE elif "inactive_" in worker.worker_name: assert worker.state == EdgeWorkerState.UNKNOWN + assert worker.sysinfo + assert worker.sysinfo["status"] == logging.NOTSET + assert "status_text" not in worker.sysinfo else: assert worker.state == EdgeWorkerState.IDLE + assert worker.sysinfo + assert worker.sysinfo["status"] == logging.INFO + assert "status_text" in worker.sysinfo def test_revoke_task(self): """Test that revoke_task removes task from executor and database.""" diff --git a/providers/edge3/tests/unit/edge3/models/test_db.py b/providers/edge3/tests/unit/edge3/models/test_db.py index 3f852185a7df4..4c2b95a9fff83 100644 --- a/providers/edge3/tests/unit/edge3/models/test_db.py +++ b/providers/edge3/tests/unit/edge3/models/test_db.py @@ -20,6 +20,7 @@ from unittest import mock import pytest +import sqlalchemy as sa from airflow.utils.db_manager import RunDBManager @@ -240,6 +241,18 @@ def test_initdb_stamps_and_upgrades_when_tables_exist_without_version(self, sess mc = MigrationContext.configure(conn, opts={"render_as_batch": True}) ops = Operations(mc) ops.drop_column("edge_worker", "concurrency") + ops.add_column( + "edge_worker", + sa.Column("jobs_failed", sa.INTEGER(), autoincrement=False, default=0, nullable=False), + ) + ops.add_column( + "edge_worker", + sa.Column("jobs_taken", sa.INTEGER(), autoincrement=False, default=0, nullable=False), + ) + ops.add_column( + "edge_worker", + sa.Column("jobs_success", sa.INTEGER(), autoincrement=False, default=0, nullable=False), + ) # initdb() should detect tables exist, stamp to base, then upgrade manager.initdb() @@ -248,7 +261,7 @@ def test_initdb_stamps_and_upgrades_when_tables_exist_without_version(self, sess version = conn.execute(text("SELECT version_num FROM alembic_version_edge3")).scalar() columns = {col["name"] for col in inspect(conn).get_columns("edge_worker")} - assert version == "a09c3ee8e1d3" + assert version == "c6b3c3d093fd" assert "concurrency" in columns assert "team_name" in columns @@ -273,6 +286,18 @@ def test_migration_adds_concurrency_column(self, session): mc = MigrationContext.configure(conn, opts={"render_as_batch": True}) ops = Operations(mc) ops.drop_column("edge_worker", "concurrency") + ops.add_column( + "edge_worker", + sa.Column("jobs_failed", sa.INTEGER(), autoincrement=False, default=0, nullable=False), + ) + ops.add_column( + "edge_worker", + sa.Column("jobs_taken", sa.INTEGER(), autoincrement=False, default=0, nullable=False), + ) + ops.add_column( + "edge_worker", + sa.Column("jobs_success", sa.INTEGER(), autoincrement=False, default=0, nullable=False), + ) # Stamp to old revision (pre-concurrency) using alembic's own connection command.stamp(config, "9d34dfc2de06") diff --git a/providers/edge3/tests/unit/edge3/worker_api/routes/test_jobs.py b/providers/edge3/tests/unit/edge3/worker_api/routes/test_jobs.py index dde158ebc6e6e..bd87afe1226bb 100644 --- a/providers/edge3/tests/unit/edge3/worker_api/routes/test_jobs.py +++ b/providers/edge3/tests/unit/edge3/worker_api/routes/test_jobs.py @@ -29,22 +29,24 @@ from airflow.utils.session import create_session from airflow.utils.state import TaskInstanceState +from tests_common.test_utils.version_compat import AIRFLOW_V_3_2_PLUS + if TYPE_CHECKING: from sqlalchemy.orm import Session -try: - from airflow.sdk._shared.observability.metrics.dual_stats_manager import DualStatsManager # noqa: F401 +pytestmark = pytest.mark.db_test + +if AIRFLOW_V_3_2_PLUS: + from airflow.sdk._shared.observability.metrics.dual_stats_manager import DualStatsManager - stats_reference = "airflow.sdk._shared.observability.metrics.dual_stats_manager.DualStatsManager" + stats_reference = f"{DualStatsManager.__module__}.DualStatsManager" expected_call_count = 1 -except ImportError: +else: from airflow.providers.common.compat.sdk import Stats stats_reference = f"{Stats.__module__}.Stats" expected_call_count = 2 -pytestmark = pytest.mark.db_test - DAG_ID = "my_dag" TASK_ID = "my_task" diff --git a/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py b/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py index 080b019948ef7..fef1c9d50256a 100644 --- a/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py +++ b/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py @@ -17,6 +17,7 @@ from __future__ import annotations from collections.abc import Sequence +from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING @@ -24,7 +25,9 @@ from fastapi import HTTPException from sqlalchemy import delete, select +from airflow import __version__ as airflow_version from airflow.providers.common.compat.sdk import timezone +from airflow.providers.edge3 import __version__ as edge_provider_version from airflow.providers.edge3.cli.worker import EdgeWorker from airflow.providers.edge3.models.edge_worker import ( EdgeWorkerModel, @@ -46,6 +49,16 @@ class TestWorkerApiRoutes: + MOCK_SYSINFO: dict[str, str | int | float | datetime] = { + "status": 20, + "airflow_version": airflow_version, + "edge_provider_version": edge_provider_version, + "python_version": "3.10.17 (main, Apr 9 2025, 04:03:39) [Clang 20.1.0 ]", + "worker_start_time": "2026-04-18T21:10:42.714344", + "concurrency": 8, + "free_concurrency": 8, + } + @pytest.fixture def cli_worker(self, tmp_path: Path) -> EdgeWorker: test_worker = EdgeWorker(str(tmp_path / "mock.pid"), "mock", None, 8) @@ -88,7 +101,7 @@ def test_register(self, session: Session, input_queues: list[str] | None, cli_wo state=EdgeWorkerState.STARTING, jobs_active=0, queues=input_queues, - sysinfo=cli_worker._get_sysinfo(), + sysinfo=self.MOCK_SYSINFO, ) register("test_worker", body, session) session.commit() @@ -107,7 +120,7 @@ def test_register_with_team_name(self, session: Session, cli_worker: EdgeWorker) state=EdgeWorkerState.STARTING, jobs_active=0, queues=["default"], - sysinfo=cli_worker._get_sysinfo(), + sysinfo=self.MOCK_SYSINFO, team_name="team_a", ) register("test_worker", body, session) @@ -137,7 +150,7 @@ def test_register_same_name_different_team_rejects_when_active( state=EdgeWorkerState.STARTING, jobs_active=0, queues=["default"], - sysinfo=cli_worker._get_sysinfo(), + sysinfo=self.MOCK_SYSINFO, team_name="team_b", ) with pytest.raises(HTTPException) as exc_info: @@ -163,7 +176,7 @@ def test_register_same_name_different_team_reuses_when_offline( state=EdgeWorkerState.STARTING, jobs_active=0, queues=["default"], - sysinfo=cli_worker._get_sysinfo(), + sysinfo=self.MOCK_SYSINFO, team_name="team_b", ) register("test_worker", body, session) @@ -206,7 +219,7 @@ def test_register_duplicate_worker( state=EdgeWorkerState.STARTING, jobs_active=0, queues=["default"], - sysinfo=cli_worker._get_sysinfo(), + sysinfo=self.MOCK_SYSINFO, ) if should_raise: @@ -315,7 +328,7 @@ def test_set_state(self, session: Session, cli_worker: EdgeWorker): state=EdgeWorkerState.RUNNING, jobs_active=1, queues=["default2"], - sysinfo=cli_worker._get_sysinfo(), + sysinfo=self.MOCK_SYSINFO, ) return_queues = set_state("test2_worker", body, session).queues @@ -342,7 +355,7 @@ def test_set_state_returns_concurrency(self, session: Session, cli_worker: EdgeW state=EdgeWorkerState.RUNNING, jobs_active=0, queues=["default"], - sysinfo=cli_worker._get_sysinfo(), + sysinfo=self.MOCK_SYSINFO, ) result = set_state("test2_worker", body, session) assert result.concurrency == 16 @@ -364,7 +377,7 @@ def test_set_state_returns_none_concurrency_when_not_overridden( state=EdgeWorkerState.RUNNING, jobs_active=0, queues=["default"], - sysinfo=cli_worker._get_sysinfo(), + sysinfo=self.MOCK_SYSINFO, ) result = set_state("test2_worker", body, session) assert result.concurrency is None diff --git a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml index 17356bdcbb0be..e816a1a80d31b 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml +++ b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml @@ -474,6 +474,12 @@ metrics: legacy_name: "-" name_variables: ["event_type", "operator_name"] + - name: "edge_worker.status" + description: "Edge worker status (expressed as Python logging level)." + type: "gauge" + legacy_name: "edge_worker.status.{worker_name}" + name_variables: ["worker_name"] + - name: "edge_worker.connected" description: "Edge worker in state connected." type: "gauge"