Skip to content
Browse files
Mask sensitive values for not-yet-running TIs (#23807)
Alternative approach to #22754.  Resolves  #22738.
  • Loading branch information
dstandish committed May 21, 2022
1 parent 10a0d8e commit 2dc806367c3dc27df5db4b955d151e789fbc78b0
Showing 3 changed files with 19 additions and 17 deletions.
@@ -2085,7 +2085,10 @@ def get_prev_ds_nodash() -> Optional[str]:

def get_rendered_template_fields(self, session: Session = NEW_SESSION) -> None:
"""Fetch rendered template fields from DB"""
Update task with rendered template fields for presentation in UI.
If task has already run, will fetch from DB; otherwise will render.
from airflow.models.renderedtifields import RenderedTaskInstanceFields

rendered_task_instance_fields = RenderedTaskInstanceFields.get_templated_fields(self, session=session)
@@ -2096,25 +2099,20 @@ def get_rendered_template_fields(self, session: Session = NEW_SESSION) -> None:

# Task was never executed. Initialize RenderedTaskInstanceFields
# to render template and mask secrets. Set MASK_SECRETS_IN_LOGS
# to True to enable masking similar to task run.
original_value = settings.MASK_SECRETS_IN_LOGS
settings.MASK_SECRETS_IN_LOGS = True
rendered_task_instance = RenderedTaskInstanceFields(self)
rendered_fields = rendered_task_instance.rendered_fields
if rendered_fields:
for field_name, rendered_value in rendered_fields.items():
setattr(self.task, field_name, rendered_value)
# If we get here, either the task hasn't run or the RTIF record was purged.
from airflow.utils.log.secrets_masker import redact

for field_name in self.task.template_fields:
rendered_value = getattr(self.task, field_name)
setattr(self.task, field_name, redact(rendered_value, field_name))
except (TemplateAssertionError, UndefinedError) as e:
raise AirflowException(
"Webserver does not have access to User-defined Macros or Filters "
"when Dag Serialization is enabled. Hence for the task that have not yet "
"started running, please use 'airflow tasks render' for debugging the "
"rendering of template_fields."
) from e
settings.MASK_SECRETS_IN_LOGS = original_value

def get_rendered_k8s_spec(self, session=NEW_SESSION):
@@ -20,6 +20,7 @@
import re
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple, Union

from airflow import settings
from airflow.compat.functools import cache, cached_property

@@ -84,12 +85,9 @@ def mask_secret(secret: Union[str, dict, Iterable], name: Optional[str] = None)
If ``secret`` is a dict or a iterable (excluding str) then it will be
recursively walked and keys with sensitive names will be hidden.
# Delay import
from airflow import settings

# Filtering all log messages is not a free process, so we only do it when
# running tasks
if not settings.MASK_SECRETS_IN_LOGS or not secret:
if not secret:

_secrets_masker().add_mask(secret, name)
@@ -161,6 +159,9 @@ def _redact_exception_with_context(self, exception):

def filter(self, record) -> bool:
if settings.MASK_SECRETS_IN_LOGS is not True:
return True

if self.ALREADY_FILTERED_FLAG in record.__dict__:
# Filters are attached to multiple handlers and logs, keep a
# "private" flag that stops us needing to process it more than once
@@ -22,9 +22,12 @@

import pytest

from airflow import settings
from airflow.utils.log.secrets_masker import SecretsMasker, should_hide_value_for_key
from tests.test_utils.config import conf_vars

settings.MASK_SECRETS_IN_LOGS = True

p = "password"

0 comments on commit 2dc8063

Please sign in to comment.