Skip to content
Permalink
Browse files
Fix retrieval of deprecated non-config values (#23723)
It turned out that deprecation of config values did not work as
intended. While deprecation worked fine when the value was specified
in configuration value it did not work when `run_as_user` was used.

In those cases the "as_dict" option was used to generate temporary
configuratin and this temporary configuration contained default value
for the new configuration value - for example it caused that
the generated temporary value contained:

```
[database]
sql_alchemy_conn=sqlite:///{AIRFLOW_HOME}/airflow.db
```

Even if the deprecated `core/sql_alchemy_conn` was set (and no
new `database/sql_alchemy_conn` was set at the same time.

This effectively rendered the old installation that did not convert
to the new "database" configuration not working for run_as_user, because
the tasks run with "run_as_user" used wrong, empty sqlite database
instaead of the one configured for Airflow.

Also during adding tests, it turned out that the mechanism was also
not working as intended before - in case `_CMD` or `_SECRET` were used
as environment variables rather than configuration. In those cases
both _CMD and _SECRET should be evaluated during as_dict() evaluation,
because the "run_as_user" might have not enough permission to run the
command or retrieve secret. The _cmd and _secret variables were only
evaluated during as_dict() when they were in the config file (note
that this only happens when include_cmd, include_env, include_secret
are set to True).

The changes implemented in this PR fix both problems:

* the _CMD and _SECRET env vars are evaluated during as_dict when the
  respective include_* is set
* the defaults are only set for the values that have deprecations
  in case the deprecations have no values set in either of the ways:
    * in config file
    * in env variable
    * in _cmd (via config file or env variable)
    * in _secret (via config file or env variable)

Fixes: #23679
  • Loading branch information
potiuk committed May 20, 2022
1 parent ce8ea66 commit 888bc2e233b1672a61433929e26b82210796fd71
Showing 8 changed files with 849 additions and 189 deletions.
@@ -36,7 +36,7 @@
},
"sensitive": {
"type": "boolean",
"description": "When true, this option is sensitive and can be specified using AIRFLOW__{section}___{name}__SECRET or AIRFLOW__{section}___{name}__CMD environment variables. See: airflow.configuration.AirflowConfigParser.sensitive_config_values"
"description": "When true, this option is sensitive and can be specified using AIRFLOW__{section}___{name}__SECRET or AIRFLOW__{section}___{name}_CMD environment variables. See: airflow.configuration.AirflowConfigParser.sensitive_config_values"
}
},
"required": [
@@ -59,6 +59,8 @@
ConfigSectionSourcesType = Dict[str, Union[str, Tuple[str, str]]]
ConfigSourcesType = Dict[str, ConfigSectionSourcesType]

ENV_VAR_PREFIX = 'AIRFLOW__'


def _parse_sqlite_version(s: str) -> Tuple[int, ...]:
match = _SQLITE3_VERSION_PATTERN.match(s)
@@ -144,7 +146,7 @@ class AirflowConfigParser(ConfigParser):
"""Custom Airflow Configparser supporting defaults and deprecated options"""

# These configuration elements can be fetched as the stdout of commands
# following the "{section}__{name}__cmd" pattern, the idea behind this
# following the "{section}__{name}_cmd" pattern, the idea behind this
# is to not store password on boxes in text files.
# These configs can also be fetched from Secrets backend
# following the "{section}__{name}__secret" pattern
@@ -435,10 +437,8 @@ def _create_future_warning(name: str, section: str, current_value: Any, new_valu
FutureWarning,
)

ENV_VAR_PREFIX = 'AIRFLOW__'

def _env_var_name(self, section: str, key: str) -> str:
return f'{self.ENV_VAR_PREFIX}{section.upper()}__{key.upper()}'
return f'{ENV_VAR_PREFIX}{section.upper()}__{key.upper()}'

def _get_env_var_option(self, section: str, key: str):
# must have format AIRFLOW__{SECTION}__{KEY} (note double underscore)
@@ -461,23 +461,53 @@ def _get_env_var_option(self, section: str, key: str):

def _get_cmd_option(self, section: str, key: str):
fallback_key = key + '_cmd'
# if this is a valid command key...
if (section, key) in self.sensitive_config_values:
if super().has_option(section, fallback_key):
command = super().get(section, fallback_key)
return run_command(command)
return None

def _get_cmd_option_from_config_sources(
self, config_sources: ConfigSourcesType, section: str, key: str
) -> Optional[str]:
fallback_key = key + '_cmd'
if (section, key) in self.sensitive_config_values:
section_dict = config_sources.get(section)
if section_dict is not None:
command_value = section_dict.get(fallback_key)
if command_value is not None:
if isinstance(command_value, str):
command = command_value
else:
command = command_value[0]
return run_command(command)
return None

def _get_secret_option(self, section: str, key: str) -> Optional[str]:
"""Get Config option values from Secret Backend"""
fallback_key = key + '_secret'
# if this is a valid secret key...
if (section, key) in self.sensitive_config_values:
if super().has_option(section, fallback_key):
secrets_path = super().get(section, fallback_key)
return _get_config_value_from_secret_backend(secrets_path)
return None

def _get_secret_option_from_config_sources(
self, config_sources: ConfigSourcesType, section: str, key: str
) -> Optional[str]:
fallback_key = key + '_secret'
if (section, key) in self.sensitive_config_values:
section_dict = config_sources.get(section)
if section_dict is not None:
secrets_path_value = section_dict.get(fallback_key)
if secrets_path_value is not None:
if isinstance(secrets_path_value, str):
secrets_path = secrets_path_value
else:
secrets_path = secrets_path_value[0]
return _get_config_value_from_secret_backend(secrets_path)
return None

def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
value = self.get(section, key, **kwargs)
if value is None:
@@ -859,7 +889,16 @@ def as_dict(
('airflow.cfg', self),
]

self._replace_config_with_display_sources(config_sources, configs, display_source, raw)
self._replace_config_with_display_sources(
config_sources,
configs,
display_source,
raw,
self.deprecated_options,
include_cmds=include_cmds,
include_env=include_env,
include_secret=include_secret,
)

# add env vars and overwrite because they have priority
if include_env:
@@ -889,7 +928,7 @@ def _include_secrets(
raw: bool,
):
for (section, key) in self.sensitive_config_values:
value: Optional[str] = self._get_secret_option(section, key)
value: Optional[str] = self._get_secret_option_from_config_sources(config_sources, section, key)
if value:
if not display_sensitive:
value = '< hidden >'
@@ -910,17 +949,20 @@ def _include_commands(
raw: bool,
):
for (section, key) in self.sensitive_config_values:
opt = self._get_cmd_option(section, key)
opt = self._get_cmd_option_from_config_sources(config_sources, section, key)
if not opt:
continue
opt_to_set: Union[str, Tuple[str, str], None] = opt
if not display_sensitive:
opt = '< hidden >'
opt_to_set = '< hidden >'
if display_source:
opt = (opt, 'cmd')
opt_to_set = (str(opt_to_set), 'cmd')
elif raw:
opt = opt.replace('%', '%%')
config_sources.setdefault(section, OrderedDict()).update({key: opt})
del config_sources[section][key + '_cmd']
opt_to_set = str(opt_to_set).replace('%', '%%')
if opt_to_set is not None:
dict_to_update: Dict[str, Union[str, Tuple[str, str]]] = {key: opt_to_set}
config_sources.setdefault(section, OrderedDict()).update(dict_to_update)
del config_sources[section][key + '_cmd']

def _include_envs(
self,
@@ -930,7 +972,7 @@ def _include_envs(
raw: bool,
):
for env_var in [
os_environment for os_environment in os.environ if os_environment.startswith(self.ENV_VAR_PREFIX)
os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX)
]:
try:
_, section, key = env_var.split('__', 2)
@@ -1010,13 +1052,82 @@ def _replace_config_with_display_sources(
configs: Iterable[Tuple[str, ConfigParser]],
display_source: bool,
raw: bool,
deprecated_options: Dict[Tuple[str, str], Tuple[str, str, str]],
include_env: bool,
include_cmds: bool,
include_secret: bool,
):
for (source_name, config) in configs:
for section in config.sections():
AirflowConfigParser._replace_section_config_with_display_sources(
config, config_sources, display_source, raw, section, source_name
config,
config_sources,
display_source,
raw,
section,
source_name,
deprecated_options,
configs,
include_env=include_env,
include_cmds=include_cmds,
include_secret=include_secret,
)

@staticmethod
def _deprecated_value_is_set_in_config(
deprecated_section: str,
deprecated_key: str,
configs: Iterable[Tuple[str, ConfigParser]],
) -> bool:
for config_type, config in configs:
if config_type == 'default':
continue
try:
deprecated_section_array = config.items(section=deprecated_section, raw=True)
for (key_candidate, _) in deprecated_section_array:
if key_candidate == deprecated_key:
return True
except NoSectionError:
pass
return False

@staticmethod
def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool:
return (
os.environ.get(f'{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}')
is not None
)

@staticmethod
def _deprecated_command_is_set_in_config(
deprecated_section: str, deprecated_key: str, configs: Iterable[Tuple[str, ConfigParser]]
) -> bool:
return AirflowConfigParser._deprecated_value_is_set_in_config(
deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs
)

@staticmethod
def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool:
return (
os.environ.get(f'{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD')
is not None
)

@staticmethod
def _deprecated_secret_is_set_in_config(
deprecated_section: str, deprecated_key: str, configs: Iterable[Tuple[str, ConfigParser]]
) -> bool:
return AirflowConfigParser._deprecated_value_is_set_in_config(
deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs
)

@staticmethod
def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool:
return (
os.environ.get(f'{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET')
is not None
)

@staticmethod
def _replace_section_config_with_display_sources(
config: ConfigParser,
@@ -1025,9 +1136,46 @@ def _replace_section_config_with_display_sources(
raw: bool,
section: str,
source_name: str,
deprecated_options: Dict[Tuple[str, str], Tuple[str, str, str]],
configs: Iterable[Tuple[str, ConfigParser]],
include_env: bool,
include_cmds: bool,
include_secret: bool,
):
sect = config_sources.setdefault(section, OrderedDict())
for (k, val) in config.items(section=section, raw=raw):
deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None))
if deprecated_section and deprecated_key:
if source_name == 'default':
# If deprecated entry has some non-default value set for any of the sources requested,
# We should NOT set default for the new entry (because it will override anything
# coming from the deprecated ones)
if AirflowConfigParser._deprecated_value_is_set_in_config(
deprecated_section, deprecated_key, configs
):
continue
if include_env and AirflowConfigParser._deprecated_variable_is_set(
deprecated_section, deprecated_key
):
continue
if include_cmds and (
AirflowConfigParser._deprecated_variable_command_is_set(
deprecated_section, deprecated_key
)
or AirflowConfigParser._deprecated_command_is_set_in_config(
deprecated_section, deprecated_key, configs
)
):
continue
if include_secret and (
AirflowConfigParser._deprecated_variable_secret_is_set(
deprecated_section, deprecated_key
)
or AirflowConfigParser._deprecated_secret_is_set_in_config(
deprecated_section, deprecated_key, configs
)
):
continue
if display_source:
sect[k] = (val, source_name)
else:
@@ -0,0 +1,28 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


# This is the template for Airflow's unit test configuration. When Airflow runs
# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg.
# If it doesn't exist, Airflow uses this template to generate it by replacing
# variables in curly braces with their global values from configuration.py.

# Users should not modify this file; they should customize the generated
# unittests.cfg instead.
[core]
sql_alchemy_conn = mysql://
@@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


# This is the template for Airflow's unit test configuration. When Airflow runs
# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg.
# If it doesn't exist, Airflow uses this template to generate it by replacing
# variables in curly braces with their global values from configuration.py.

# Users should not modify this file; they should customize the generated
# unittests.cfg instead.

[core]
sql_alchemy_conn_cmd = echo -n "postgresql://"
@@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


# This is the template for Airflow's unit test configuration. When Airflow runs
# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg.
# If it doesn't exist, Airflow uses this template to generate it by replacing
# variables in curly braces with their global values from configuration.py.

# Users should not modify this file; they should customize the generated
# unittests.cfg instead.

[core]
sql_alchemy_conn_secret = secret_path

0 comments on commit 888bc2e

Please sign in to comment.