Skip to content

Commit

Permalink
Create UndefinedJinjaVariablesRule (Resolves apache#11144)
Browse files Browse the repository at this point in the history
Adding a rule to check for undefined jinja variables when upgrading to Airflow2.0
Logic - Use a DagBag to pull all dags and iterate over
every dag. For every dag the task will be rendered using
an updated Jinja Environment using - jinja2.DebugUndefined
This will render the template leaving undefined variables
as they were. Using regex we can extract the variables
and present possible error cases when upgrading.
  • Loading branch information
ashmeet13 committed Oct 8, 2020
1 parent 2900f13 commit 0ee69ad
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 0 deletions.
121 changes: 121 additions & 0 deletions airflow/upgrade/rules/undefined_jinja_varaibles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import absolute_import

import re

import jinja2
import six

from airflow import conf
from airflow.models import DagBag, TaskInstance
from airflow.upgrade.rules.base_rule import BaseRule
from airflow.utils import timezone


class UndefinedJinjaVariablesRule(BaseRule):

title = "Jinja Template Variables cannot be undefined"

description = """\
Jinja Templates have been updated to the following rule - jinja2.StrictUndefined
With this change a task will fail if it recieves any undefined variables.
"""

def _check_rendered_content(self, rendered_content):
"""Replicates the logic in BaseOperator.render_template() to
cover all the cases needed to be checked.
"""
if isinstance(rendered_content, six.string_types):
return set(re.findall(r"{{(.*?)}}", rendered_content))

elif isinstance(rendered_content, (tuple, list, set)):
debug_error_messages = set()
for element in rendered_content:
debug_error_messages.union(self._check_rendered_content(element))
return debug_error_messages

elif isinstance(rendered_content, dict):
debug_error_messages = set()
for key, value in rendered_content.items():
debug_error_messages.union(self._check_rendered_content(str(value)))
return debug_error_messages

def _render_task_content(self, task, content, context):
completed_rendering = False
errors_while_rendering = []
while not completed_rendering:
# Catch errors such as {{ object.element }} where
# object is not defined
try:
renderend_content = task.render_template(content, context)
completed_rendering = True
except Exception as e:
undefined_variable = re.sub(" is undefined", "", str(e))
undefined_variable = re.sub("'", "", undefined_variable)
context[undefined_variable] = dict()
message = "Could not find the object '{}'".format(undefined_variable)
errors_while_rendering.append(message)
return renderend_content, errors_while_rendering

def _task_level_(self, task):
messages = {}
task_instance = TaskInstance(task=task, execution_date=timezone.utcnow())
context = task_instance.get_template_context()
for attr_name in task.template_fields:
content = getattr(task, attr_name)
if content:
rendered_content, errors_while_rendering = self._render_task_content(
task, content, context
)
debug_error_messages = list(
self._check_rendered_content(rendered_content)
)
messages[attr_name] = errors_while_rendering + debug_error_messages

return messages

def _dag_level_(self, dag):
dag.template_undefined = jinja2.DebugUndefined
tasks = dag.tasks
messages = {}
for task in tasks:
error_messages = self._task_level_(task)
messages[task.task_id] = error_messages
return messages

def check(self, dagbag=None):
if not dagbag:
dag_folder = conf.get("core", "dags_folder")
dagbag = DagBag(dag_folder)
dags = dagbag.dags
messages = []
for dag_id, dag in dags.items():
dag_messages = self._dag_level_(dag)

for task_id, task_messages in dag_messages.items():
for attr_name, error_messages in task_messages.items():
for error_message in error_messages:
message = (
"Possible UndefinedJinjaVariable -> DAG: {}, Task: {}, "
"Attribute: {}, Error: {}".format(
dag_id, task_id, attr_name, error_message.strip()
)
)
messages.append(message)
return messages
111 changes: 111 additions & 0 deletions tests/upgrade/rules/test_undefined_jinja_varaibles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from tempfile import mkdtemp
from unittest import TestCase

from airflow import DAG
from airflow.models import DagBag
from airflow.operators.bash_operator import BashOperator
from airflow.upgrade.rules.undefined_jinja_varaibles import \
UndefinedJinjaVariablesRule
from tests.models import DEFAULT_DATE


class TestConnTypeIsNotNullableRule(TestCase):
@classmethod
def setUpClass(cls):
cls.empty_dir = mkdtemp()

def setUp(self):

self.invalid_dag = DAG(
dag_id="test-undefined-jinja-variables", start_date=DEFAULT_DATE
)
self.valid_dag = DAG(
dag_id="test-defined-jinja-variables", start_date=DEFAULT_DATE
)

template_command = """
{% for i in range(5) %}
echo "{{ params.defined_variable }}"
echo "{{ execution_date.today }}"
echo "{{ execution_date.invalid_element }}"
echo "{{ params.undefined_variable }}"
echo "{{ foo }}"
{% endfor %}
"""

BashOperator(
task_id="templated_string",
depends_on_past=False,
bash_command=template_command,
env={"undefined_object": "{{ undefined_object.element }}"},
params={"defined_variable": "defined_value"},
dag=self.invalid_dag,
)

BashOperator(
task_id="templated_string",
depends_on_past=False,
bash_command="echo",
env={"defined_object": "{{ params.element }}"},
params={
"element": "defined_value",
},
dag=self.valid_dag,
)

def test_invalid_check(self):
dagbag = DagBag(dag_folder=self.empty_dir, include_examples=False)
dagbag.dags[self.invalid_dag.dag_id] = self.invalid_dag
rule = UndefinedJinjaVariablesRule()

assert isinstance(rule.description, str)
assert isinstance(rule.title, str)

messages = rule.check(dagbag)

expected_messages = [
"Possible UndefinedJinjaVariable -> DAG: test-undefined-jinja-variables, "
"Task: templated_string, Attribute: env, Error: Could not find the "
"object 'undefined_object",
"Possible UndefinedJinjaVariable -> DAG: test-undefined-jinja-variables, "
"Task: templated_string, Attribute: bash_command, Error: no such element: "
"dict object['undefined_variable']",
"Possible UndefinedJinjaVariable -> DAG: test-undefined-jinja-variables, "
"Task: templated_string, Attribute: bash_command, Error: no such element: "
"pendulum.pendulum.Pendulum object['invalid_element']",
"Possible UndefinedJinjaVariable -> DAG: test-undefined-jinja-variables, "
"Task: templated_string, Attribute: bash_command, Error: foo",
]

assert [m for m in messages if m in expected_messages], len(messages) == len(
expected_messages
)

def test_valid_check(self):
dagbag = DagBag(dag_folder=self.empty_dir, include_examples=False)
dagbag.dags[self.valid_dag.dag_id] = self.valid_dag
rule = UndefinedJinjaVariablesRule()

assert isinstance(rule.description, str)
assert isinstance(rule.title, str)

messages = rule.check(dagbag)

assert len(messages) == 0

0 comments on commit 0ee69ad

Please sign in to comment.