Skip to content

Commit

Permalink
Render list items in rendered fields view (#32042)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: clemens.valiente <clemens.valiente@grabtaxi.com>
  • Loading branch information
cvaliente and clemens.valiente committed Jul 3, 2023
1 parent 42eb93b commit 2b5d4e3
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 14 deletions.
30 changes: 23 additions & 7 deletions airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import json
import textwrap
import time
from typing import TYPE_CHECKING, Any, Sequence
from typing import TYPE_CHECKING, Any, Callable, Sequence
from urllib.parse import urlencode

from flask import request, url_for
Expand All @@ -36,6 +36,7 @@
from pendulum.datetime import DateTime
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
from pygments.lexer import Lexer
from sqlalchemy import delete, func, types
from sqlalchemy.ext.associationproxy import AssociationProxy
from sqlalchemy.sql import Select
Expand Down Expand Up @@ -549,20 +550,35 @@ def pygment_html_render(s, lexer=lexers.TextLexer):
return highlight(s, lexer(), HtmlFormatter(linenos=True))


def render(obj, lexer):
def render(obj: Any, lexer: Lexer, handler: Callable[[Any], str] | None = None):
"""Render a given Python object with a given Pygments lexer."""
out = ""
if isinstance(obj, str):
out = Markup(pygment_html_render(obj, lexer))
return Markup(pygment_html_render(obj, lexer))

elif isinstance(obj, (tuple, list)):
out = ""
for i, text_to_render in enumerate(obj):
if lexer is lexers.PythonLexer:
text_to_render = repr(text_to_render)
out += Markup("<div>List item #{}</div>").format(i)
out += Markup("<div>" + pygment_html_render(text_to_render, lexer) + "</div>")
return out

elif isinstance(obj, dict):
out = ""
for k, v in obj.items():
if lexer is lexers.PythonLexer:
v = repr(v)
out += Markup('<div>Dict item "{}"</div>').format(k)
out += Markup("<div>" + pygment_html_render(v, lexer) + "</div>")
return out
return out

elif handler is not None and obj is not None:
return Markup(pygment_html_render(handler(obj), lexer))

else:
# Return empty string otherwise
return ""


def json_render(obj, lexer):
Expand Down Expand Up @@ -603,8 +619,8 @@ def get_attr_renderer():
"mysql": lambda x: render(x, lexers.MySqlLexer),
"postgresql": lambda x: render(x, lexers.PostgresLexer),
"powershell": lambda x: render(x, lexers.PowerShellLexer),
"py": lambda x: render(get_python_source(x), lexers.PythonLexer),
"python_callable": lambda x: render(get_python_source(x), lexers.PythonLexer),
"py": lambda x: render(x, lexers.PythonLexer, get_python_source),
"python_callable": lambda x: render(x, lexers.PythonLexer, get_python_source),
"rst": lambda x: render(x, lexers.RstLexer),
"sql": lambda x: render(x, lexers.SqlLexer),
"tsql": lambda x: render(x, lexers.TransactSqlLexer),
Expand Down
6 changes: 1 addition & 5 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1484,11 +1484,7 @@ def rendered_templates(self, session):
content = getattr(task, template_field)
renderer = task.template_fields_renderers.get(template_field, template_field)
if renderer in renderers:
if isinstance(content, (dict, list)):
json_content = json.dumps(content, sort_keys=True, indent=4)
html_dict[template_field] = renderers[renderer](json_content)
else:
html_dict[template_field] = renderers[renderer](content)
html_dict[template_field] = renderers[renderer](content)
else:
html_dict[template_field] = Markup("<pre><code>{}</pre></code>").format(pformat(content))

Expand Down
78 changes: 76 additions & 2 deletions tests/www/views/test_views_rendered.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
from markupsafe import escape

from airflow.models import DAG, RenderedTaskInstanceFields, Variable
from airflow.models.baseoperator import BaseOperator
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils import timezone
from airflow.utils.session import create_session
Expand Down Expand Up @@ -64,6 +66,39 @@ def task2(dag):
)


@pytest.fixture()
def task3(dag):
class TestOperator(BaseOperator):
template_fields = ("sql",)

def __init__(self, *, sql, **kwargs):
super().__init__(**kwargs)
self.sql = sql

def execute(self, context):
pass

return TestOperator(
task_id="task3",
sql=["SELECT 1;", "SELECT 2;"],
dag=dag,
)


@pytest.fixture()
def task4(dag):
def func(*op_args):
pass

return PythonOperator(
task_id="task4",
python_callable=func,
op_args=["{{ task_instance_key_str }}_args"],
op_kwargs={"0": "{{ task_instance_key_str }}_kwargs"},
dag=dag,
)


@pytest.fixture()
def task_secret(dag):
return BashOperator(
Expand All @@ -85,15 +120,15 @@ def init_blank_db():


@pytest.fixture(autouse=True)
def reset_db(dag, task1, task2, task_secret):
def reset_db(dag, task1, task2, task3, task4, task_secret):
yield
clear_db_dags()
clear_db_runs()
clear_rendered_ti_fields()


@pytest.fixture()
def create_dag_run(dag, task1, task2, task_secret):
def create_dag_run(dag, task1, task2, task3, task4, task_secret):
def _create_dag_run(*, execution_date, session):
dag_run = dag.create_dagrun(
state=DagRunState.RUNNING,
Expand All @@ -108,6 +143,10 @@ def _create_dag_run(*, execution_date, session):
ti2.state = TaskInstanceState.SCHEDULED
ti3 = dag_run.get_task_instance(task_secret.task_id, session=session)
ti3.state = TaskInstanceState.QUEUED
ti4 = dag_run.get_task_instance(task3.task_id, session=session)
ti4.state = TaskInstanceState.SUCCESS
ti5 = dag_run.get_task_instance(task4.task_id, session=session)
ti5.state = TaskInstanceState.SUCCESS
session.flush()
return dag_run

Expand Down Expand Up @@ -290,3 +329,38 @@ def test_rendered_task_detail_env_secret(patch_app, admin_client, request, env,
if request.node.callspec.id.endswith("-tpld-var"):
Variable.delete("plain_var")
Variable.delete("secret_var")


@pytest.mark.usefixtures("patch_app")
def test_rendered_template_view_for_list_template_field_args(admin_client, create_dag_run, task3):
"""
Test that the Rendered View can show a list of syntax-highlighted SQL statements
"""
assert task3.sql == ["SELECT 1;", "SELECT 2;"]

with create_session() as session:
create_dag_run(execution_date=DEFAULT_DATE, session=session)

url = f"rendered-templates?task_id=task3&dag_id=testdag&execution_date={quote_plus(str(DEFAULT_DATE))}"

resp = admin_client.get(url, follow_redirects=True)
check_content_in_response("List item #0", resp)
check_content_in_response("List item #1", resp)


@pytest.mark.usefixtures("patch_app")
def test_rendered_template_view_for_op_args(admin_client, create_dag_run, task4):
"""
Test that the Rendered View can show rendered values in op_args and op_kwargs
"""
assert task4.op_args == ["{{ task_instance_key_str }}_args"]
assert list(task4.op_kwargs.values()) == ["{{ task_instance_key_str }}_kwargs"]

with create_session() as session:
create_dag_run(execution_date=DEFAULT_DATE, session=session)

url = f"rendered-templates?task_id=task4&dag_id=testdag&execution_date={quote_plus(str(DEFAULT_DATE))}"

resp = admin_client.get(url, follow_redirects=True)
check_content_in_response("testdag__task4__20200301_args", resp)
check_content_in_response("testdag__task4__20200301_kwargs", resp)

0 comments on commit 2b5d4e3

Please sign in to comment.