Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@
from airflow.providers.common.compat.sdk import AirflowException, timezone
from airflow.providers.common.compat.sqlalchemy.orm import mapped_column
from airflow.providers.edge3.models.edge_base import Base
from airflow.providers.edge3.version_compat import AIRFLOW_V_3_2_PLUS
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime

try:
from airflow.sdk.observability.stats import DualStatsManager
except ImportError:
DualStatsManager = None # type: ignore[assignment,misc] # Airflow < 3.2.1 compat

if TYPE_CHECKING:
from collections.abc import Sequence

Expand Down Expand Up @@ -180,9 +184,7 @@ def set_metrics(
"free_concurrency",
}

if AIRFLOW_V_3_2_PLUS:
from airflow.sdk.observability.stats import DualStatsManager

if DualStatsManager is not None:
try:
DualStatsManager.gauge(
"edge_worker.status",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
from fastapi import Body, Depends, status
from sqlalchemy import select, update

try:
from airflow.sdk.observability.stats import DualStatsManager
except ImportError:
DualStatsManager = None # type: ignore[assignment,misc] # Airflow < 3.2 compat
from airflow.api_fastapi.common.db.common import SessionDep # noqa: TC001
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.executors.workloads import ExecuteTask
from airflow.providers.common.compat.sdk import timezone
from airflow.providers.edge3.models.edge_job import EdgeJobModel
from airflow.providers.edge3.version_compat import AIRFLOW_V_3_2_PLUS
from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest
from airflow.providers.edge3.worker_api.datamodels import (
EdgeJobFetched,
Expand Down Expand Up @@ -88,9 +91,7 @@ def fetch(
session.commit()
# Edge worker does not backport emitted Airflow metrics, so export some metrics
tags = {"dag_id": job.dag_id, "task_id": job.task_id, "queue": job.queue}
if AIRFLOW_V_3_2_PLUS:
from airflow.sdk.observability.stats import DualStatsManager

if DualStatsManager is not None:
DualStatsManager.incr("edge_worker.ti.start", tags=tags)
else:
from airflow.providers.common.compat.sdk import Stats
Expand Down Expand Up @@ -149,9 +150,7 @@ def state(
"queue": job.queue,
"state": str(state),
}
if AIRFLOW_V_3_2_PLUS:
from airflow.sdk.observability.stats import DualStatsManager

if DualStatsManager is not None:
DualStatsManager.incr(
"edge_worker.ti.finish",
tags=tags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.providers.common.compat.sdk import timezone
from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState, set_metrics
from airflow.providers.edge3.version_compat import AIRFLOW_V_3_2_PLUS
from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest
from airflow.providers.edge3.worker_api.datamodels import (
WorkerQueueUpdateBody,
Expand All @@ -37,6 +36,11 @@
WorkerStateBody,
)

try:
from airflow.sdk.observability.stats import DualStatsManager
except ImportError:
DualStatsManager = None # type: ignore[assignment,misc] # Airflow < 3.2 compat

worker_router = AirflowRouter(
tags=["Worker"],
prefix="/worker",
Expand Down Expand Up @@ -217,9 +221,7 @@ def set_state(
worker.sysinfo = body.sysinfo
worker.last_update = timezone.utcnow()
session.commit()
if AIRFLOW_V_3_2_PLUS:
from airflow.sdk.observability.stats import DualStatsManager

if DualStatsManager is not None:
DualStatsManager.incr(
"edge_worker.heartbeat_count",
1,
Expand Down
Loading