Validate executor_config keys in BaseOperator#53459
Validate executor_config keys in BaseOperator#53459HsiuChuanHsu wants to merge 2 commits intoapache:mainfrom
Conversation
Validate executor_config in BaseOperator to ensure it only contains 'pod_override' or 'pod_template_file' keys. This prevents invalid configurations from being set during initialization or attribute updates. - Add validate_executor_config method to check for valid keys - Call validation in __setattr__ when executor_config is modified - Call validation during BaseOperator initialization
- Introduce tests to validate executor_config in BaseOperator - Test valid keys (pod_override, pod_template_file) are accepted - Test invalid keys raise RuntimeError with appropriate message - Test error message includes DAG ID when DAG is provided
b3d3998 to
8e6e3b9
Compare
| def validate_executor_config(self, executor_config: dict | None) -> None: | ||
| """ | ||
| Validate the executor_config to ensure it only contains 'pod_override' or 'pod_template_file' keys. | ||
|
|
||
| :param executor_config: The executor_config dictionary to validate. | ||
| :raises AirflowException: If executor_config contains keys other than 'pod_override' or 'pod_template_file'. | ||
| """ | ||
| if not executor_config or not isinstance(executor_config, dict): | ||
| return | ||
|
|
||
| valid_keys = {"pod_override", "pod_template_file"} | ||
| invalid_keys = set(executor_config.keys()) - valid_keys | ||
| if invalid_keys: | ||
| error_msg = ( | ||
| f"Invalid executor_config keys for task '{self.task_id}'" | ||
| f"{' in DAG ' + self.dag.dag_id if self.has_dag() else ''}: {sorted(invalid_keys)}. " | ||
| f"Only 'pod_override' and 'pod_template_file' are allowed." | ||
| ) | ||
| self.log.error(error_msg) | ||
| raise AirflowException(error_msg) |
There was a problem hiding this comment.
@o-nikolas does this stand true form ECS executor too?
I see one of the tests doing this and I wanted to check if thats the case:
tags_exec_config = [{"key": "FOO", "value": "BAR"}]
workload.ti.executor_config = {"tags": tags_exec_config}
There was a problem hiding this comment.
Whoa, whoa, whoa, thanks for tagging me @amoghrajesh we do not want to do this.
We have many executors other than KubernetesExecutor, so at the base class level we absolutely should not assume/assert anything about the executor_config.
For example the ECS executor treats the executor_config as kwargs for the ECS run_task api call, to allow users to set overrides for the parameters that API accepts.
If we want to do any validation it's going to have to be a lot more complicated to try detect the executor being used, which feels like a lot of work and I expect it to be a little brittle. You could also do validation at task queued time, which would at least bring it a bit earlier.
o-nikolas
left a comment
There was a problem hiding this comment.
Adding a blocker so that this change does not get merged.
| def validate_executor_config(self, executor_config: dict | None) -> None: | ||
| """ | ||
| Validate the executor_config to ensure it only contains 'pod_override' or 'pod_template_file' keys. | ||
|
|
||
| :param executor_config: The executor_config dictionary to validate. | ||
| :raises AirflowException: If executor_config contains keys other than 'pod_override' or 'pod_template_file'. | ||
| """ | ||
| if not executor_config or not isinstance(executor_config, dict): | ||
| return | ||
|
|
||
| valid_keys = {"pod_override", "pod_template_file"} | ||
| invalid_keys = set(executor_config.keys()) - valid_keys | ||
| if invalid_keys: | ||
| error_msg = ( | ||
| f"Invalid executor_config keys for task '{self.task_id}'" | ||
| f"{' in DAG ' + self.dag.dag_id if self.has_dag() else ''}: {sorted(invalid_keys)}. " | ||
| f"Only 'pod_override' and 'pod_template_file' are allowed." | ||
| ) | ||
| self.log.error(error_msg) | ||
| raise AirflowException(error_msg) |
There was a problem hiding this comment.
Whoa, whoa, whoa, thanks for tagging me @amoghrajesh we do not want to do this.
We have many executors other than KubernetesExecutor, so at the base class level we absolutely should not assume/assert anything about the executor_config.
For example the ECS executor treats the executor_config as kwargs for the ECS run_task api call, to allow users to set overrides for the parameters that API accepts.
If we want to do any validation it's going to have to be a lot more complicated to try detect the executor being used, which feels like a lot of work and I expect it to be a little brittle. You could also do validation at task queued time, which would at least bring it a bit earlier.
kaxil
left a comment
There was a problem hiding this comment.
Yup, wrong change. This will not allow anyone to use Custom Executors which might accept different executor_config
|
Hi @amoghrajesh, @o-nikolas, @kaxil, Thanks again for all the help and pointers! |
Description
Closes: #47702 by adding validation for the
executor_configdictionary in theBaseOperatorclass oftask-sdk/src/airflow/sdk/bases/operator.py. The change ensures that only the allowed keys (pod_overrideandpod_template_file) are used inexecutor_config. If invalid keys are present, anAirflowExceptionis raised with a descriptive error message, preventing silent failures and providing clear feedback to users.Changes:
Additional Notes:
The error would be displayed in the

import-errorssection of the Airflow UI.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.