Skip to content

Commit

Permalink
Extend task context logging support for remote logging using Elastics…
Browse files Browse the repository at this point in the history
…earch (#32977)

* Extend task context logging support for remote logging using Elasticsearch

With the addition of task context logging feature in PR #32646,
this PR extends the feature to Elasticsearch when is it set as
remote logging store. Here, backward compatibility is ensured for
older versions of Airflow that do not have the feature included
in Airflow Core.

* update ensure_ti

---------

Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com>
  • Loading branch information
2 people authored and ephraimbuddy committed Nov 26, 2023
1 parent 546e55a commit 6a64883
Showing 1 changed file with 41 additions and 5 deletions.
46 changes: 41 additions & 5 deletions airflow/providers/elasticsearch/log/es_task_handler.py
Expand Up @@ -34,7 +34,7 @@
from elasticsearch.exceptions import NotFoundError

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.models.dagrun import DagRun
from airflow.providers.elasticsearch.log.es_json_formatter import ElasticsearchJSONFormatter
from airflow.providers.elasticsearch.log.es_response import ElasticSearchResponse, Hit
Expand All @@ -46,7 +46,8 @@
if TYPE_CHECKING:
from datetime import datetime

from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey


LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""}
# Elasticsearch hosted log type
Expand Down Expand Up @@ -84,6 +85,32 @@ def get_es_kwargs_from_config() -> dict[str, Any]:
return kwargs_dict


def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
"""Given TI | TIKey, return a TI object.
Will raise exception if no TI is found in the database.
"""
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey

if not isinstance(ti, TaskInstanceKey):
return ti
val = (
session.query(TaskInstance)
.filter(
TaskInstance.task_id == ti.task_id,
TaskInstance.dag_id == ti.dag_id,
TaskInstance.run_id == ti.run_id,
TaskInstance.map_index == ti.map_index,
)
.one_or_none()
)
if isinstance(val, TaskInstance):
val._try_number = ti.try_number
return val
else:
raise AirflowException(f"Could not find TaskInstance for {ti}")


class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):
"""
ElasticsearchTaskHandler is a python log handler that reads logs from Elasticsearch.
Expand Down Expand Up @@ -182,8 +209,12 @@ def format_url(host: str) -> str:

return host

def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
def _render_log_id(self, ti: TaskInstance | TaskInstanceKey, try_number: int) -> str:
from airflow.models.taskinstance import TaskInstanceKey

with create_session() as session:
if isinstance(ti, TaskInstanceKey):
ti = _ensure_ti(ti, session)
dag_run = ti.get_dagrun(session=session)
if USE_PER_RUN_LOG_ID:
log_id_template = dag_run.get_log_template(session=session).elasticsearch_id
Expand Down Expand Up @@ -377,11 +408,13 @@ def emit(self, record):
setattr(record, self.offset_field, int(time.time() * (10**9)))
self.handler.emit(record)

def set_context(self, ti: TaskInstance, **kwargs) -> None:
def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None:
"""
Provide task_instance context to airflow task handler.
:param ti: task instance object
:param identifier: if set, identifies the Airflow component which is relaying logs from
exceptional scenarios related to the task instance
"""
is_trigger_log_context = getattr(ti, "is_trigger_log_context", None)
is_ti_raw = getattr(ti, "raw", None)
Expand Down Expand Up @@ -410,7 +443,10 @@ def set_context(self, ti: TaskInstance, **kwargs) -> None:
self.handler.setLevel(self.level)
self.handler.setFormatter(self.formatter)
else:
super().set_context(ti)
if getattr(self, "supports_task_context_logging", False):
super().set_context(ti, identifier=identifier)
else:
super().set_context(ti)
self.context_set = True

def close(self) -> None:
Expand Down

0 comments on commit 6a64883

Please sign in to comment.