diff --git a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py index 5f2f7e999e5ad..a294cd4b44a0f 100644 --- a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py +++ b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py @@ -28,12 +28,16 @@ from airflow.providers.common.compat.sdk import AirflowException, timezone from airflow.providers.common.compat.sqlalchemy.orm import mapped_column from airflow.providers.edge3.models.edge_base import Base -from airflow.providers.edge3.version_compat import AIRFLOW_V_3_2_PLUS from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.providers_configuration_loader import providers_configuration_loaded from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.sqlalchemy import UtcDateTime +try: + from airflow.sdk.observability.stats import DualStatsManager +except ImportError: + DualStatsManager = None # type: ignore[assignment,misc] # Airflow < 3.2.1 compat + if TYPE_CHECKING: from collections.abc import Sequence @@ -180,9 +184,7 @@ def set_metrics( "free_concurrency", } - if AIRFLOW_V_3_2_PLUS: - from airflow.sdk.observability.stats import DualStatsManager - + if DualStatsManager is not None: try: DualStatsManager.gauge( "edge_worker.status", diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py index e18fed366ac32..0ecf99520246a 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py @@ -22,13 +22,16 @@ from fastapi import Body, Depends, status from sqlalchemy import select, update +try: + from airflow.sdk.observability.stats import DualStatsManager +except ImportError: + DualStatsManager = None # type: ignore[assignment,misc] # Airflow < 3.2 compat from airflow.api_fastapi.common.db.common import SessionDep # noqa: TC001 from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.executors.workloads import ExecuteTask from airflow.providers.common.compat.sdk import timezone from airflow.providers.edge3.models.edge_job import EdgeJobModel -from airflow.providers.edge3.version_compat import AIRFLOW_V_3_2_PLUS from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest from airflow.providers.edge3.worker_api.datamodels import ( EdgeJobFetched, @@ -88,9 +91,7 @@ def fetch( session.commit() # Edge worker does not backport emitted Airflow metrics, so export some metrics tags = {"dag_id": job.dag_id, "task_id": job.task_id, "queue": job.queue} - if AIRFLOW_V_3_2_PLUS: - from airflow.sdk.observability.stats import DualStatsManager - + if DualStatsManager is not None: DualStatsManager.incr("edge_worker.ti.start", tags=tags) else: from airflow.providers.common.compat.sdk import Stats @@ -149,9 +150,7 @@ def state( "queue": job.queue, "state": str(state), } - if AIRFLOW_V_3_2_PLUS: - from airflow.sdk.observability.stats import DualStatsManager - + if DualStatsManager is not None: DualStatsManager.incr( "edge_worker.ti.finish", tags=tags, diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py index 223c94afb735d..81d5496360591 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py @@ -28,7 +28,6 @@ from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.providers.common.compat.sdk import timezone from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState, set_metrics -from airflow.providers.edge3.version_compat import AIRFLOW_V_3_2_PLUS from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest from airflow.providers.edge3.worker_api.datamodels import ( WorkerQueueUpdateBody, @@ -37,6 +36,11 @@ WorkerStateBody, ) +try: + from airflow.sdk.observability.stats import DualStatsManager +except ImportError: + DualStatsManager = None # type: ignore[assignment,misc] # Airflow < 3.2 compat + worker_router = AirflowRouter( tags=["Worker"], prefix="/worker", @@ -217,9 +221,7 @@ def set_state( worker.sysinfo = body.sysinfo worker.last_update = timezone.utcnow() session.commit() - if AIRFLOW_V_3_2_PLUS: - from airflow.sdk.observability.stats import DualStatsManager - + if DualStatsManager is not None: DualStatsManager.incr( "edge_worker.heartbeat_count", 1,