Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task parameter to set custom logger name #34964

Merged
merged 26 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7a0f1c4
feat: Implement customizable logger name
Joffreybvn Oct 13, 2023
5d8eb9e
feat: Add tests
Joffreybvn Oct 13, 2023
1bef39a
feat: Implement logger config for Hooks
Joffreybvn Oct 16, 2023
db3a91e
feat: Move logger_name assignment into LoggingMixin
Joffreybvn Oct 16, 2023
ec04689
fix: Apply static-checks formatting
Joffreybvn Oct 16, 2023
445adb3
feat: Implement customizable logger name
Joffreybvn Oct 13, 2023
7beb1a3
fix: Apply static-checks formatting
Joffreybvn Oct 16, 2023
ac8a104
feat: Make logger_name a child logger of 'airflow.tasks' for Operator…
Joffreybvn Oct 18, 2023
8bc0466
fix: Refactor tests and typing for Hooks logger name
Joffreybvn Oct 18, 2023
dfa7789
fix: Add partial init for logger name
Joffreybvn Oct 18, 2023
67497cf
feat: Add documentation about custom logger name
Joffreybvn Oct 18, 2023
9d9b351
fix: Correctly format code in logging documentation
Joffreybvn Oct 19, 2023
a9e8540
fix: Correctly capture docker hooks in test
Joffreybvn Oct 19, 2023
3dc0b81
feat: Implement customizable logger name
Joffreybvn Oct 13, 2023
dc3c7eb
fix: Apply static-checks formatting
Joffreybvn Oct 16, 2023
a80e84a
feat: Make logger_name a child logger of 'airflow.tasks' for Operator…
Joffreybvn Oct 18, 2023
90e4ae7
fix: Refactor tests and typing for Hooks logger name
Joffreybvn Oct 18, 2023
fc7bcaa
fix: Rename `_parent_logger` to `_parent_logger_name`
Joffreybvn Oct 20, 2023
a7bbcea
fix: Simplify logger name creation + rename `_parent_logger` to `_log…
Joffreybvn Oct 22, 2023
13ba890
fix: Correctly explain default logger in docstring
Joffreybvn Oct 23, 2023
46e776b
feat: Implement back support for pickling logger
Joffreybvn Oct 23, 2023
002fbae
fix: store logger presence as bool instead of name
Joffreybvn Oct 24, 2023
5c35270
feat: Set back getstate/setstate to original + make `_log` None at se…
Joffreybvn Oct 24, 2023
b5c8422
fix: Correctly use pydantic's deep_update
Joffreybvn Oct 24, 2023
74195a8
fix: Remove _log only if it exists
Joffreybvn Oct 24, 2023
ded2f91
fix: Remove `_log` None creation in setstate
Joffreybvn Oct 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions airflow/hooks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,18 @@ class BaseHook(LoggingMixin):
object that can handle the connection and interaction to specific
instances of these systems, and expose consistent methods to interact
with them.

:param logger_name: Name of the logger used by the Hook to emit logs.
If set to `None` (default), the logger name will fall back to
`airflow.task.hooks.{class.__module__}.{class.__name__}` (e.g. DbApiHook will have
*airflow.task.hooks.airflow.providers.common.sql.hooks.sql.DbApiHook* as logger).
"""

def __init__(self, logger_name: str | None = None):
super().__init__()
self._log_config_logger_name = "airflow.task.hooks"
self._logger_name = logger_name

@classmethod
def get_connections(cls, conn_id: str) -> list[Connection]:
"""
Expand Down
15 changes: 11 additions & 4 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ def partial(
doc_json: str | None | ArgNotSet = NOTSET,
doc_yaml: str | None | ArgNotSet = NOTSET,
doc_rst: str | None | ArgNotSet = NOTSET,
logger_name: str | None | ArgNotSet = NOTSET,
**kwargs,
) -> OperatorPartial:
from airflow.models.dag import DagContext
Expand Down Expand Up @@ -323,6 +324,7 @@ def partial(
"doc_md": doc_md,
"doc_rst": doc_rst,
"doc_yaml": doc_yaml,
"logger_name": logger_name,
}

# Inject DAG-level default args into args provided to this function.
Expand Down Expand Up @@ -651,6 +653,10 @@ class derived from this one results in the creation of a task object,
that is visible in Task Instance details View in the Webserver
:param doc_yaml: Add documentation (in YAML format) or notes to your Task objects
that is visible in Task Instance details View in the Webserver
:param logger_name: Name of the logger used by the Operator to emit logs.
If set to `None` (default), the logger name will fall back to
`airflow.task.operators.{class.__module__}.{class.__name__}` (e.g. SimpleHttpOperator will have
*airflow.task.operators.airflow.providers.http.operators.http.SimpleHttpOperator* as logger).
"""

# Implementing Operator.
Expand All @@ -670,7 +676,6 @@ class derived from this one results in the creation of a task object,
"user_defined_macros",
"user_defined_filters",
"params",
"_log",
)

# each operator should override this class attr for shallow copy attrs.
Expand Down Expand Up @@ -781,6 +786,7 @@ def __init__(
doc_json: str | None = None,
doc_yaml: str | None = None,
doc_rst: str | None = None,
logger_name: str | None = None,
**kwargs,
):
from airflow.models.dag import DagContext
Expand Down Expand Up @@ -931,7 +937,8 @@ def __init__(
if dag:
self.dag = dag

self._log = logging.getLogger("airflow.task.operators")
self._log_config_logger_name = "airflow.task.operators"
self._logger_name = logger_name

# Lineage
self.inlets: list = []
Expand Down Expand Up @@ -1220,13 +1227,13 @@ def __deepcopy__(self, memo):

def __getstate__(self):
state = dict(self.__dict__)
del state["_log"]
if self._log:
del state["_log"]

return state

def __setstate__(self, state):
self.__dict__ = state
self._log = logging.getLogger("airflow.task.operators")

def render_template_fields(
self,
Expand Down
2 changes: 2 additions & 0 deletions airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@
"doc_json": { "type": "string" },
"doc_yaml": { "type": "string" },
"doc_rst": { "type": "string" },
"_logger_name": { "type": "string" },
"_log_config_logger_name": { "type": "string" },
"_is_mapped": { "const": true, "$comment": "only present when True" },
"expand_input": { "type": "object" },
"partial_kwargs": { "type": "object" }
Expand Down
38 changes: 36 additions & 2 deletions airflow/utils/log/logging_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,47 @@ class LoggingMixin:

_log: logging.Logger | None = None

# Parent logger used by this class. It should match one of the loggers defined in the
# `logging_config_class`. By default, this attribute is used to create the final name of the logger, and
# will prefix the `_logger_name` with a separating dot.
_log_config_logger_name: str | None = None

_logger_name: str | None = None

def __init__(self, context=None):
self._set_context(context)

@staticmethod
def _get_log(obj: Any, clazz: type[_T]) -> Logger:
def _create_logger_name(
logged_class: type[_T],
log_config_logger_name: str | None = None,
class_logger_name: str | None = None,
) -> str:
"""Generate a logger name for the given `logged_class`.

By default, this function returns the `class_logger_name` as logger name. If it is not provided,
the {class.__module__}.{class.__name__} is returned instead. When a `parent_logger_name` is provided,
it will prefix the logger name with a separating dot.
"""
logger_name: str = (
class_logger_name
if class_logger_name is not None
else f"{logged_class.__module__}.{logged_class.__name__}"
)

if log_config_logger_name:
return f"{log_config_logger_name}.{logger_name}" if logger_name else log_config_logger_name
return logger_name

@classmethod
def _get_log(cls, obj: Any, clazz: type[_T]) -> Logger:
if obj._log is None:
obj._log = logging.getLogger(f"{clazz.__module__}.{clazz.__name__}")
logger_name: str = cls._create_logger_name(
logged_class=clazz,
log_config_logger_name=obj._log_config_logger_name,
class_logger_name=obj._logger_name,
)
obj._log = logging.getLogger(logger_name)
return obj._log

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@
.. _write-logs-advanced:

Advanced logging configuration
------------------------------
==============================

Not all configuration options are available from the ``airflow.cfg`` file. The config file describes
how to configure logging for tasks, because the logs generated by tasks are not only logged in separate
files by default but has to be also accessible via the webserver.

By default standard airflow component logs are written to the ``$AIRFLOW_HOME/logs`` directory, but you
can also customize it and configure it as you want by overriding Python logger configuration that can
be configured by providing custom logging configuration object. Some configuration options require
that the logging config class be overwritten. You can do it by copying the default
be configured by providing custom logging configuration object. You can also create and use logging configuration
for specific operators and tasks.

Some configuration options require that the logging config class be overwritten. You can do it by copying the default
configuration of Airflow and modifying it to suit your needs. The default configuration can be seen in the
`airflow_local_settings.py template <https://github.com/apache/airflow/blob/|airflow-version|/airflow/config_templates/airflow_local_settings.py>`_
and you can see the loggers and handlers used there. Except the custom loggers and handlers configurable there
Expand All @@ -38,6 +40,9 @@ that Python objects log to loggers that follow naming convention of ``<package>.
You can read more about standard python logging classes (Loggers, Handlers, Formatters) in the
`Python logging documentation <https://docs.python.org/library/logging.html>`_.

Create a custom logging class
-----------------------------

Configuring your logging classes can be done via the ``logging_config_class`` option in ``airflow.cfg`` file.
This configuration should specify the import path to a configuration compatible with
:func:`logging.config.dictConfig`. If your file is a standard import location, then you should set a
Expand Down Expand Up @@ -89,3 +94,64 @@ See :doc:`../modules_management` for details on how Python and Airflow manage mo
.. note::

You can override the way both standard logs of the components and "task" logs are handled.


Custom logger for Operators, Hooks and Tasks
--------------------------------------------

You can create custom logging handlers and apply them to specific Operators, Hooks and tasks. By default, the Operators
and Hooks loggers are child of the ``airflow.task`` logger: They follow respectively the naming convention
``airflow.task.operators.<package>.<module_name>`` and ``airflow.task.hooks.<package>.<module_name>``. After
:doc:`creating a custom logging class </administration-and-deployment/logging-monitoring/advanced-logging-configuration>`,
you can assign specific loggers to them.

Example of custom logging for the ``SQLExecuteQueryOperator`` and the ``HttpHook``:

.. code-block:: python

from copy import deepcopy
from pydantic.utils import deep_update
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it unused.

And why we need pydantic here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As user, I want to add custom loggers and their related handlers in the "LOGGING_CONFIG" dictionary. I want also to keep everything already defined there (without erasing them). This logging config dict is a deep nested structure.

Thus, to have a concise and versatile example, I use deep_update which will merge in depth two dicts (unlike dict.update() which merge only at root level). Unless you prefer an example without it ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @Taragolis ?

from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG

LOGGING_CONFIG = deep_update(
deepcopy(DEFAULT_LOGGING_CONFIG),
{
"loggers": {
"airflow.task.operators.airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator": {
"handlers": ["task"],
"level": "DEBUG",
"propagate": True,
},
"airflow.task.hooks.airflow.providers.http.hooks.http.HttpHook": {
"handlers": ["task"],
"level": "WARNING",
"propagate": False,
},
}
},
)


You can also set a custom name to a Dag's task with the ``logger_name`` attribute. This can be useful if multiple tasks
are using the same Operator, but you want to disable logging for some of them.

Example of custom logger name:

.. code-block:: python

# In your Dag file
SQLExecuteQueryOperator(..., logger_name="sql.big_query")

# In your custom `log_config.py`
LOGGING_CONFIG = deep_update(
deepcopy(DEFAULT_LOGGING_CONFIG),
{
"loggers": {
"airflow.task.operators.sql.big_query": {
"handlers": ["task"],
"level": "WARNING",
"propagate": True,
},
}
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ Advanced configuration
----------------------

You can configure :doc:`advanced features </administration-and-deployment/logging-monitoring/advanced-logging-configuration>`
- including adding your own custom task log handlers (but also log handlers for all airflow components).
- including adding your own custom task log handlers (but also log handlers for all airflow components), and creating
custom log handlers per operators, hooks and tasks.

.. _serving-worker-trigger-logs:

Expand Down
34 changes: 34 additions & 0 deletions tests/hooks/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from airflow.hooks.base import BaseHook


class TestBaseHook:
def test_hook_has_default_logger_name(self):
hook = BaseHook()
assert hook.log.name == "airflow.task.hooks.airflow.hooks.base.BaseHook"

def test_custom_logger_name_is_correctly_set(self):
hook = BaseHook(logger_name="airflow.custom.logger")
assert hook.log.name == "airflow.task.hooks.airflow.custom.logger"

def test_empty_string_as_logger_name(self):
hook = BaseHook(logger_name="")
assert hook.log.name == "airflow.task.hooks"
27 changes: 27 additions & 0 deletions tests/operators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import copy
import logging
import os
import pickle
import re
import sys
import tempfile
import warnings
from collections import namedtuple
from datetime import date, datetime, timedelta
from functools import partial
from subprocess import CalledProcessError
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Generator
Expand Down Expand Up @@ -309,6 +311,31 @@ def func():

assert python_operator.template_ext == ["test_ext"]

def test_python_operator_has_default_logger_name(self):
python_operator = PythonOperator(task_id="task", python_callable=partial(int, 2))

logger_name: str = "airflow.task.operators.airflow.operators.python.PythonOperator"
assert python_operator.log.name == logger_name

def test_custom_logger_name_is_correctly_set(self):
"""
Ensure the custom logger name is correctly set when the Operator is created,
and when its state is resumed via __setstate__.
"""
logger_name: str = "airflow.task.operators.custom.logger"

python_operator = PythonOperator(
task_id="task", python_callable=partial(int, 2), logger_name="custom.logger"
)
assert python_operator.log.name == logger_name

setstate_operator = pickle.loads(pickle.dumps(python_operator))
assert setstate_operator.log.name == logger_name

def test_custom_logger_name_can_be_empty_string(self):
python_operator = PythonOperator(task_id="task", python_callable=partial(int, 2), logger_name="")
assert python_operator.log.name == "airflow.task.operators"


class TestBranchOperator(BasePythonTest):
opcls = BranchPythonOperator
Expand Down
2 changes: 1 addition & 1 deletion tests/providers/docker/hooks/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
TEST_CONN = {"host": "some.docker.registry.com", "login": "some_user", "password": "some_p4$$w0rd"}
MOCK_CONNECTION_NOT_EXIST_MSG = "Testing connection not exists"
MOCK_CONNECTION_NOT_EXISTS_EX = AirflowNotFoundException(MOCK_CONNECTION_NOT_EXIST_MSG)
HOOK_LOGGER_NAME = "airflow.providers.docker.hooks.docker.DockerHook"
HOOK_LOGGER_NAME = "airflow.task.hooks.airflow.providers.docker.hooks.docker.DockerHook"
potiuk marked this conversation as resolved.
Show resolved Hide resolved


@pytest.fixture
Expand Down
5 changes: 4 additions & 1 deletion tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i
},
},
"doc_md": "### Task Tutorial Documentation",
"_log_config_logger_name": "airflow.task.operators",
},
{
"task_id": "custom_task",
Expand All @@ -206,6 +207,7 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i
"is_setup": False,
"is_teardown": False,
"on_failure_fail_dagrun": False,
"_log_config_logger_name": "airflow.task.operators",
},
],
"schedule_interval": {"__type": "timedelta", "__var": 86400.0},
Expand Down Expand Up @@ -1208,7 +1210,8 @@ def test_no_new_fields_added_to_base_operator(self):
base_operator = BaseOperator(task_id="10")
fields = {k: v for (k, v) in vars(base_operator).items() if k in BaseOperator.get_serialized_fields()}
assert fields == {
"_log": base_operator.log,
"_logger_name": None,
"_log_config_logger_name": "airflow.task.operators",
"_post_execute_hook": None,
"_pre_execute_hook": None,
"depends_on_past": False,
Expand Down