Skip to content

Commit

Permalink
KubernetesPodTrigger: add exception stack trace in TriggerEvent (#35716)
Browse files Browse the repository at this point in the history
  • Loading branch information
functicons committed Dec 16, 2023
1 parent 63e97ab commit a398d9d
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 1 deletion.
7 changes: 6 additions & 1 deletion airflow/providers/cncf/kubernetes/operators/pod.py
Expand Up @@ -672,6 +672,7 @@ def invoke_defer_method(self):
)

def execute_complete(self, context: Context, event: dict, **kwargs):
self.log.debug("Triggered with event: %s", event)
pod = None
try:
pod = self.hook.get_pod(
Expand All @@ -682,7 +683,11 @@ def execute_complete(self, context: Context, event: dict, **kwargs):
# fetch some logs when pod is failed
if self.get_logs:
self.write_logs(pod)
raise AirflowException(event["message"])
if "stack_trace" in event:
message = f"{event['message']}\n{event['stack_trace']}"
else:
message = event["message"]
raise AirflowException(message)
elif event["status"] == "success":
# fetch some logs when pod is executed successfully
if self.get_logs:
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/cncf/kubernetes/triggers/pod.py
Expand Up @@ -18,6 +18,7 @@

import asyncio
import datetime
import traceback
import warnings
from asyncio import CancelledError
from enum import Enum
Expand Down Expand Up @@ -231,6 +232,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"namespace": self.pod_namespace,
"status": "error",
"message": str(e),
"stack_trace": traceback.format_exc(),
}
)

Expand Down
2 changes: 2 additions & 0 deletions tests/providers/cncf/kubernetes/triggers/test_pod.py
Expand Up @@ -181,12 +181,14 @@ async def test_logging_in_trigger_when_exception_should_execute_successfully(

generator = trigger.run()
actual = await generator.asend(None)
actual_stack_trace = actual.payload.pop("stack_trace")
assert (
TriggerEvent(
{"name": POD_NAME, "namespace": NAMESPACE, "status": "error", "message": "Test exception"}
)
== actual
)
assert actual_stack_trace.startswith("Traceback (most recent call last):")

@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_PATH}.define_container_state")
Expand Down
Expand Up @@ -199,12 +199,14 @@ async def test_logging_in_trigger_when_exception_should_execute_successfully(

generator = trigger.run()
actual = await generator.asend(None)
actual_stack_trace = actual.payload.pop("stack_trace")
assert (
TriggerEvent(
{"name": POD_NAME, "namespace": NAMESPACE, "status": "error", "message": "Test exception"}
)
== actual
)
assert actual_stack_trace.startswith("Traceback (most recent call last):")

@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state")
Expand Down

0 comments on commit a398d9d

Please sign in to comment.