Bring back edge worker metric compatibility with Airflow 3.2#67328
Bring back edge worker metric compatibility with Airflow 3.2#67328AutomationDev85 wants to merge 1 commit into
Conversation
|
@xBis7 this PR was made in relation to the cleanup in #63932 which it seems broke monitoring. Aif Airflow 3.2.x is used and Common-Compat is upgraded (which PR was this?) what is the best strategy to fix it back? Brnging back DualStasMgr into Edge seems to be counter productive to what you cleaned, obviously there is some drift. Does tag handling need to be added in Common-Compat provider somewhere? |
There was a problem hiding this comment.
Yes adding back the DualStatsManager is counter productive. In addition, we added a Stats shim so that imports won't get affected but only the latest code has the dual stat emission logic in the Stats.X() calls.
Because the imports have stayed the same, I think in this case, it makes more sense to use a version check. The code should look like
if AIRFLOW_V_3_3_PLUS:
Stats.gauge(
"edge_worker.status",
sysinfo.get("status", logging.NOTSET), # type: ignore
tags={"worker_name": worker_name},
)
Stats.gauge("edge_worker.connected", int(connected), tags={"worker_name": worker_name})
Stats.gauge("edge_worker.maintenance", int(maintenance), tags={"worker_name": worker_name})
Stats.gauge("edge_worker.jobs_active", jobs_active, tags={"worker_name": worker_name})
Stats.gauge("edge_worker.concurrency", concurrency, tags={"worker_name": worker_name})
Stats.gauge("edge_worker.free_concurrency", free_concurrency, tags={"worker_name": worker_name})
Stats.gauge(
"edge_worker.num_queues",
len(queues),
tags={"worker_name": worker_name, "queues": ",".join(queues)},
)
for key in additional_keys:
value = sysinfo.get(key)
if isinstance(value, (int, float)):
Stats.gauge(f"edge_worker.{key}", value, tags={"worker_name": worker_name})
else:
Stats.gauge(f"edge_worker.status.{worker_name}", sysinfo.get("status", logging.NOTSET)) # type: ignore
Stats.gauge(
"edge_worker.status",
sysinfo.get("status", logging.NOTSET), # type: ignore
tags={"worker_name": worker_name},
)
Stats.gauge(f"edge_worker.connected.{worker_name}", int(connected))
Stats.gauge("edge_worker.connected", int(connected), tags={"worker_name": worker_name})
Stats.gauge(f"edge_worker.maintenance.{worker_name}", int(maintenance))
Stats.gauge("edge_worker.maintenance", int(maintenance), tags={"worker_name": worker_name})
Stats.gauge(f"edge_worker.jobs_active.{worker_name}", jobs_active)
Stats.gauge("edge_worker.jobs_active", jobs_active, tags={"worker_name": worker_name})
Stats.gauge(f"edge_worker.concurrency.{worker_name}", concurrency)
Stats.gauge("edge_worker.concurrency", concurrency, tags={"worker_name": worker_name})
Stats.gauge(f"edge_worker.free_concurrency.{worker_name}", free_concurrency)
Stats.gauge("edge_worker.free_concurrency", free_concurrency, tags={"worker_name": worker_name})
Stats.gauge(f"edge_worker.num_queues.{worker_name}", len(queues))
Stats.gauge(
"edge_worker.num_queues",
len(queues),
tags={"worker_name": worker_name, "queues": ",".join(queues)},
)
for key in additional_keys:
value = sysinfo.get(key)
if isinstance(value, (int, float)):
Stats.gauge(f"edge_worker.{key}.{worker_name}", value)
Stats.gauge(f"edge_worker.{key}", value, tags={"worker_name": worker_name})Since this issue wasn't caught by the back compat CI steps, either the unit tests don't check for both new and legacy metric names or the assertions don't run. Can you extend some of the unit tests? It will help us identify similar issues in the future.
| extra_tags={"worker_name": worker_name, "queues": ",".join(queues)}, | ||
| ) | ||
| else: | ||
| Stats.gauge( |
There was a problem hiding this comment.
This includes the DualStatsManager logic under the hood. In versions, prior to 3.2, where the DualStatsManager wasn't available, Stats.X() didn't have the dual stat emission logic. In that case, the else block will execute and you still won't get the legacy metrics.
|
@AutomationDev85 A few things need addressing before review — see our Pull Request quality criteria.
No rush. Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. Drafted-by: Claude Code (Opus 4.7); reviewed by @potiuk before posting |
Overview
After switching to edge3 provider version 3.6.0, we observed that edge metrics were exposed without worker_name tags. During root-cause analysis, we found that metrics were no longer exported through DualStatsManager. As a result, StatsD mappings broke because the worker name was no longer included in the metric name.
In Airflow version 3.2.1, this behavior was handled by DualStatsManager. As we understand it, the new implementation that replaces DualStatsManager will address this, but it is planned for Airflow 3.3. Therefore, this change reintroduces the DualStatsManager check to keep edge worker metrics compatible with Airflow 3.2.
Details of change: