Skip to content

Commit

Permalink
Fix secrets rendered in UI when task is not executed. (#22754)
Browse files Browse the repository at this point in the history
(cherry picked from commit ce8ea66)
  • Loading branch information
tirkarthi authored and ephraimbuddy committed May 21, 2022
1 parent 622b5e2 commit 1cbb0ad
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 4 deletions.
14 changes: 13 additions & 1 deletion airflow/models/taskinstance.py
Expand Up @@ -2162,15 +2162,27 @@ def get_rendered_template_fields(self, session: Session = NEW_SESSION) -> None:
for field_name, rendered_value in rendered_task_instance_fields.items():
setattr(self.task, field_name, rendered_value)
return

try:
self.render_templates()
# Task was never executed. Initialize RenderedTaskInstanceFields
# to render template and mask secrets. Set MASK_SECRETS_IN_LOGS
# to True to enable masking similar to task run.
original_value = settings.MASK_SECRETS_IN_LOGS
settings.MASK_SECRETS_IN_LOGS = True
rendered_task_instance = RenderedTaskInstanceFields(self)
rendered_fields = rendered_task_instance.rendered_fields
if rendered_fields:
for field_name, rendered_value in rendered_fields.items():
setattr(self.task, field_name, rendered_value)
except (TemplateAssertionError, UndefinedError) as e:
raise AirflowException(
"Webserver does not have access to User-defined Macros or Filters "
"when Dag Serialization is enabled. Hence for the task that have not yet "
"started running, please use 'airflow tasks render' for debugging the "
"rendering of template_fields."
) from e
finally:
settings.MASK_SECRETS_IN_LOGS = original_value

@provide_session
def get_rendered_k8s_spec(self, session=NEW_SESSION):
Expand Down
43 changes: 40 additions & 3 deletions tests/www/views/test_views_rendered.py
Expand Up @@ -20,7 +20,7 @@

import pytest

from airflow.models import DAG, RenderedTaskInstanceFields
from airflow.models import DAG, RenderedTaskInstanceFields, Variable
from airflow.operators.bash import BashOperator
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils import timezone
Expand Down Expand Up @@ -61,6 +61,15 @@ def task2(dag):
)


@pytest.fixture()
def task_secret(dag):
return BashOperator(
task_id='task_secret',
bash_command='echo {{ var.value.my_secret }} && echo {{ var.value.spam }}',
dag=dag,
)


@pytest.fixture(scope="module", autouse=True)
def init_blank_db():
"""Make sure there are no runs before we test anything.
Expand All @@ -73,15 +82,15 @@ def init_blank_db():


@pytest.fixture(autouse=True)
def reset_db(dag, task1, task2):
def reset_db(dag, task1, task2, task_secret):
yield
clear_db_dags()
clear_db_runs()
clear_rendered_ti_fields()


@pytest.fixture()
def create_dag_run(dag, task1, task2):
def create_dag_run(dag, task1, task2, task_secret):
def _create_dag_run(*, execution_date, session):
dag_run = dag.create_dagrun(
state=DagRunState.RUNNING,
Expand All @@ -94,6 +103,8 @@ def _create_dag_run(*, execution_date, session):
ti1.state = TaskInstanceState.SUCCESS
ti2 = dag_run.get_task_instance(task2.task_id, session=session)
ti2.state = TaskInstanceState.SCHEDULED
ti3 = dag_run.get_task_instance(task_secret.task_id, session=session)
ti3.state = TaskInstanceState.QUEUED
session.flush()
return dag_run

Expand Down Expand Up @@ -168,3 +179,29 @@ def test_user_defined_filter_and_macros_raise_error(admin_client, create_dag_run
# MarkupSafe changed the exception detail from 'no filter named' to
# 'No filter named' in 2.0 (I think), so we normalize for comparison.
assert "originalerror: no filter named 'hello'" in resp_html.lower()


@pytest.mark.usefixtures("patch_app")
def test_rendered_template_secret(admin_client, create_dag_run, task_secret):
"""Test that the Rendered View masks values retrieved from secret variables."""
Variable.set("my_secret", "foo")
Variable.set("spam", "egg")

assert task_secret.bash_command == 'echo {{ var.value.my_secret }} && echo {{ var.value.spam }}'

with create_session() as session:
dag_run = create_dag_run(execution_date=DEFAULT_DATE, session=session)
ti = dag_run.get_task_instance(task_secret.task_id, session=session)
assert ti is not None, "task instance not found"
ti.refresh_from_task(task_secret)
assert ti.state == TaskInstanceState.QUEUED

date = quote_plus(str(DEFAULT_DATE))
url = f'rendered-templates?task_id=task_secret&dag_id=testdag&execution_date={date}'

resp = admin_client.get(url, follow_redirects=True)
check_content_in_response(
'echo</span> *** <span class="o">&amp;&amp;</span> <span class="nb">echo</span> egg', resp
)
ti.refresh_from_task(task_secret)
assert ti.state == TaskInstanceState.QUEUED

0 comments on commit 1cbb0ad

Please sign in to comment.