Skip to content

Commit

Permalink
fix(google,log): Avoid log name overriding
Browse files Browse the repository at this point in the history
Avoid the use of the generic `name` attribute which is overrode by the
dict configurator.
  • Loading branch information
AlexisBRENON committed Mar 27, 2024
1 parent bf9bb72 commit 2618e13
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 7 deletions.
2 changes: 1 addition & 1 deletion airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@
"task": {
"class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
"formatter": "airflow",
"name": log_name,
"gcp_log_name": log_name,
"gcp_key_path": key_path,
}
}
Expand Down
21 changes: 17 additions & 4 deletions airflow/providers/google/cloud/log/stackdriver_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from __future__ import annotations

import logging
import warnings
from functools import cached_property
from typing import TYPE_CHECKING, Collection
from urllib.parse import urlencode
Expand All @@ -29,9 +30,11 @@
from google.cloud.logging_v2.services.logging_service_v2 import LoggingServiceV2Client
from google.cloud.logging_v2.types import ListLogEntriesRequest, ListLogEntriesResponse

from airflow.exceptions import RemovedInAirflow3Warning
from airflow.providers.google.cloud.utils.credentials_provider import get_credentials_and_project_id
from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.utils.log.trigger_handler import ctx_indiv_trigger
from airflow.utils.types import NOTSET, ArgNotSet

if TYPE_CHECKING:
from google.auth.credentials import Credentials
Expand Down Expand Up @@ -92,15 +95,25 @@ def __init__(
self,
gcp_key_path: str | None = None,
scopes: Collection[str] | None = _DEFAULT_SCOPESS,
name: str = DEFAULT_LOGGER_NAME,
name: str | ArgNotSet = NOTSET,
transport: type[Transport] = BackgroundThreadTransport,
resource: Resource = _GLOBAL_RESOURCE,
labels: dict[str, str] | None = None,
gcp_log_name: str = DEFAULT_LOGGER_NAME,
):
if name is not NOTSET:
warnings.warn(
"Param `name` is deprecated and will be removed in a future release. "
"Please use `gcp_log_name` instead. ",
RemovedInAirflow3Warning,
stacklevel=2,
)
gcp_log_name = str(name)

super().__init__()
self.gcp_key_path: str | None = gcp_key_path
self.scopes: Collection[str] | None = scopes
self.name: str = name
self.gcp_log_name: str = gcp_log_name
self.transport_type: type[Transport] = transport
self.resource: Resource = resource
self.labels: dict[str, str] | None = labels
Expand Down Expand Up @@ -140,7 +153,7 @@ def _transport(self) -> Transport:
"""Object responsible for sending data to Stackdriver."""
# The Transport object is badly defined (no init) but in the docs client/name as constructor
# arguments are a requirement for any class that derives from Transport class, hence ignore:
return self.transport_type(self._client, self.name) # type: ignore[call-arg]
return self.transport_type(self._client, self.gcp_log_name) # type: ignore[call-arg]

def _get_labels(self, task_instance=None):
if task_instance:
Expand Down Expand Up @@ -245,7 +258,7 @@ def escale_label_value(value: str) -> str:
_, project = self._credentials_and_project
log_filters = [
f"resource.type={escale_label_value(self.resource.type)}",
f'logName="projects/{project}/logs/{self.name}"',
f'logName="projects/{project}/logs/{self.gcp_log_name}"',
]

for key, value in self.resource.labels.items():
Expand Down
4 changes: 2 additions & 2 deletions docs/apache-airflow-providers-google/logging/stackdriver.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ example:
# location. If remote_logging is set to true, see UPDATING.md for additional
# configuration requirements.
remote_logging = True
remote_base_log_folder = stackdriver://logs-name
remote_base_log_folder = stackdriver:///logs-name
All configuration options are in the ``[logging]`` section.

Expand All @@ -50,7 +50,7 @@ Turning this option off will result in data not being sent to Stackdriver.

The ``remote_base_log_folder`` option contains the URL that specifies the type of handler to be used.
For integration with Stackdriver, this option should start with ``stackdriver://``.
The path section of the URL specifies the name of the log e.g. ``stackdriver://airflow-tasks`` writes
The path section of the URL specifies the name of the log e.g. ``stackdriver:///airflow-tasks`` writes
logs under the name ``airflow-tasks``.

You can set ``google_key_path`` option in the ``[logging]`` section to specify the path to `the service
Expand Down
24 changes: 24 additions & 0 deletions newsfragments/38071.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
Rename the ``name`` attribute of the StackdriverTaskHandler to ``gcp_log_name`` to avoid name overriding by the the ``DictConfigurator``.

Airflow relies on the ``logging.config.dictConfig`` (`documentation <https://docs.python.org/3/library/logging.config.html>`_) method
to `setup the logging stack <https://github.com/apache/airflow/blob/a58441ca1b263cae61a5bb653e6839f0dd29b08e/airflow/logging_config.py#L69>`_.
However, during this setup, it iterates through the handlers and
`explicitly sets their name <https://github.com/python/cpython/blob/2a4cbf17af19a01d942f9579342f77c39fbd23c4/Lib/logging/config.py#L578>`_:

.. code-block:: python
:linenos:
for name in sorted(handlers):
try:
handler = self.configure_handler(handlers[name])
handler.name = name
handlers[name] = handler
So, before this fix:

#. You setup the remote logging through the environment variables ``AIRFLOW__LOGGING__REMOTE_LOGGING="true"`` and ``AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER="stackdriver://host/path"``.
#. Airflow instantiates a ``StackdriverTaskHandler`` with the name of ``"path"``
#. **BUT** the ``dictConfig`` call overrides the name of the handler with the key of the handlers configuration (i.e. `task <https://github.com/apache/airflow/blob/a58441ca1b263cae61a5bb653e6839f0dd29b08e/airflow/config_templates/airflow_local_settings.py#L350>`_).
#. Hence, the next calls to the ``emit`` method of the handler will generate logs to the wrong destination (``task`` instead of ``path``).

Changing the field, from ``name`` to ``gcp_log_name`` prevents the overriding from the dictConfig.
28 changes: 28 additions & 0 deletions tests/providers/google/cloud/log/test_stackdriver_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,34 @@ def test_should_pass_message_to_client(mock_client, mock_get_creds_and_project_i
mock_client.assert_called_once_with(credentials="creds", client_info=mock.ANY, project="project_id")


@pytest.mark.usefixtures("clean_stackdriver_handlers")
@mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id")
@mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client")
def test_should_use_configured_log_name(mock_client, mock_get_creds_and_project_id):
mock_get_creds_and_project_id.return_value = ("creds", "project_id")

with mock.patch.dict(
"os.environ",
AIRFLOW__LOGGING__REMOTE_LOGGING="true",
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER="stackdriver://host/path",
):
import importlib
import logging

from airflow import settings
from airflow.config_templates import airflow_local_settings

importlib.reload(airflow_local_settings)
settings.configure_logging()

logger = logging.getLogger("airflow.task")
handler = logger.handlers[0]
assert isinstance(handler, StackdriverTaskHandler)
with mock.patch.object(handler, "transport_type") as transport_type_mock:
logger.error("foo")
transport_type_mock.assert_called_once_with(mock_client.return_value, "path")


@pytest.mark.db_test
class TestStackdriverLoggingHandlerTask:
DAG_ID = "dag_for_testing_stackdriver_file_task_handler"
Expand Down

0 comments on commit 2618e13

Please sign in to comment.