Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow email field to be templated #35546

Merged
merged 4 commits into from Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion airflow/serialization/serialized_objects.py
Expand Up @@ -878,10 +878,12 @@ def _serialize_node(cls, op: BaseOperator | MappedOperator, include_deps: bool)
# If not, store them as strings
# And raise an exception if the field is not templateable
forbidden_fields = set(inspect.signature(BaseOperator.__init__).parameters.keys())
# Though allow some of the BaseOperator fields to be templated anyway
forbidden_fields.difference_update({"email"})
if op.template_fields:
for template_field in op.template_fields:
if template_field in forbidden_fields:
raise AirflowException(f"Cannot template BaseOperator fields: {template_field}")
raise AirflowException(f"Cannot template BaseOperator field: {template_field!r}")
value = getattr(op, template_field, None)
if not cls._is_excluded(value, template_field, op):
serialize_op[template_field] = serialize_template_field(value)
Expand Down
10 changes: 4 additions & 6 deletions tests/serialization/test_dag_serialization.py
Expand Up @@ -2035,20 +2035,18 @@ def test_params_serialize_default(self):
assert param.description == "hello"
assert param.schema == {"type": "string"}

def test_not_templateable_fields_in_serialized_dag(
self,
):
def test_not_templateable_fields_in_serialized_dag(self):
"""
Test that when we use not templateable fields, an Airflow exception is raised.
Test that when we use not templateable fields, an Airflow exception is raised.
"""

class TestOperator(BaseOperator):
template_fields = ("execution_timeout",)
template_fields = ("email", "execution_timeout")

dag = DAG("test_not_templateable_fields", start_date=datetime(2019, 8, 1))
with dag:
TestOperator(task_id="test", execution_timeout=timedelta(seconds=10))
with pytest.raises(AirflowException, match="Cannot template BaseOperator fields: execution_timeout"):
with pytest.raises(AirflowException, match="Cannot template BaseOperator field: 'execution_timeout'"):
SerializedDAG.to_dict(dag)
gdavoian marked this conversation as resolved.
Show resolved Hide resolved


Expand Down