Skip to content

Commit

Permalink
Allow email field to be templated (#35546)
Browse files Browse the repository at this point in the history
  • Loading branch information
gdavoian committed Nov 9, 2023
1 parent 3c5f5b5 commit addbd58
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
4 changes: 3 additions & 1 deletion airflow/serialization/serialized_objects.py
Expand Up @@ -878,10 +878,12 @@ def _serialize_node(cls, op: BaseOperator | MappedOperator, include_deps: bool)
# If not, store them as strings
# And raise an exception if the field is not templateable
forbidden_fields = set(inspect.signature(BaseOperator.__init__).parameters.keys())
# Though allow some of the BaseOperator fields to be templated anyway
forbidden_fields.difference_update({"email"})
if op.template_fields:
for template_field in op.template_fields:
if template_field in forbidden_fields:
raise AirflowException(f"Cannot template BaseOperator fields: {template_field}")
raise AirflowException(f"Cannot template BaseOperator field: {template_field!r}")
value = getattr(op, template_field, None)
if not cls._is_excluded(value, template_field, op):
serialize_op[template_field] = serialize_template_field(value)
Expand Down
29 changes: 21 additions & 8 deletions tests/serialization/test_dag_serialization.py
Expand Up @@ -2035,20 +2035,33 @@ def test_params_serialize_default(self):
assert param.description == "hello"
assert param.schema == {"type": "string"}

def test_not_templateable_fields_in_serialized_dag(
self,
):
@pytest.mark.db_test
def test_not_templateable_fields_in_serialized_dag(self):
"""
Test that when we use not templateable fields, an Airflow exception is raised.
Test that when we use not templateable fields, an Airflow exception is raised.
"""

class TestOperator(BaseOperator):
template_fields = ("execution_timeout",)
template_fields = (
"email", # templateable
"execution_timeout", # not templateable
)

def execute(self, context: Context):
pass

dag = DAG(dag_id="test_dag", start_date=datetime(2023, 11, 9))

dag = DAG("test_not_templateable_fields", start_date=datetime(2019, 8, 1))
with dag:
TestOperator(task_id="test", execution_timeout=timedelta(seconds=10))
with pytest.raises(AirflowException, match="Cannot template BaseOperator fields: execution_timeout"):
task = TestOperator(
task_id="test_task",
email="{{ ','.join(test_email_list) }}",
execution_timeout=timedelta(seconds=10),
)
task.render_template_fields(context={"test_email_list": ["foo@test.com", "bar@test.com"]})
assert task.email == "foo@test.com,bar@test.com"

with pytest.raises(AirflowException, match="Cannot template BaseOperator field: 'execution_timeout'"):
SerializedDAG.to_dict(dag)


Expand Down

0 comments on commit addbd58

Please sign in to comment.