Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Format all files (without excepions) by black #12091

Merged
merged 2 commits into from
Nov 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ repos:
rev: 20.8b1
hooks:
- id: black
exclude: .*kubernetes_pod\.py|.*google/common/hooks/base_google\.py$|^airflow/configuration.py$
args: [--config=./pyproject.toml]
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.3.0
Expand Down
156 changes: 73 additions & 83 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@

# show Airflow's deprecation warnings
if not sys.warnoptions:
warnings.filterwarnings(
action='default', category=DeprecationWarning, module='airflow')
warnings.filterwarnings(
action='default', category=PendingDeprecationWarning, module='airflow')
warnings.filterwarnings(action='default', category=DeprecationWarning, module='airflow')
warnings.filterwarnings(action='default', category=PendingDeprecationWarning, module='airflow')


def expand_env_var(env_var):
Expand All @@ -70,17 +68,14 @@ def expand_env_var(env_var):
def run_command(command):
"""Runs command and returns stdout"""
process = subprocess.Popen(
shlex.split(command),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True)
output, stderr = [stream.decode(sys.getdefaultencoding(), 'ignore')
for stream in process.communicate()]
shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
)
output, stderr = [stream.decode(sys.getdefaultencoding(), 'ignore') for stream in process.communicate()]

if process.returncode != 0:
raise AirflowConfigException(
"Cannot execute {}. Error code is: {}. Output: {}, Stderr: {}"
.format(command, process.returncode, output, stderr)
f"Cannot execute {command}. Error code is: {process.returncode}. "
f"Output: {output}, Stderr: {stderr}"
)

return output
Expand Down Expand Up @@ -183,9 +178,9 @@ class AirflowConfigParser(ConfigParser): # pylint: disable=too-many-ancestors
'email_backend': (
re.compile(r'^airflow\.contrib\.utils\.sendgrid\.send_email$'),
r'airflow.providers.sendgrid.utils.emailer.send_email',
'2.0'
'2.0',
),
}
},
}

# This method transforms option names on every read, get, or set operation.
Expand Down Expand Up @@ -213,14 +208,14 @@ def _validate(self):
current_value = self.get(section, name, fallback=None)
if self._using_old_value(old, current_value):
new_value = re.sub(old, new, current_value)
self._update_env_var(
section=section, name=name, new_value=new_value)
self._update_env_var(section=section, name=name, new_value=new_value)
self._create_future_warning(
name=name,
section=section,
current_value=current_value,
new_value=new_value,
version=version)
version=version,
)

self.is_validated = True

Expand All @@ -229,21 +224,27 @@ def _validate_config_dependencies(self):
Validate that config values aren't invalid given other config values
or system-level limitations and requirements.
"""
if (
self.get("core", "executor") not in ('DebugExecutor', 'SequentialExecutor') and
"sqlite" in self.get('core', 'sql_alchemy_conn')):
is_executor_without_sqlite_support = self.get("core", "executor") not in (
'DebugExecutor',
'SequentialExecutor',
)
is_sqlite = "sqlite" in self.get('core', 'sql_alchemy_conn')
if is_executor_without_sqlite_support and is_sqlite:
raise AirflowConfigException(
"error: cannot use sqlite with the {}".format(
self.get('core', 'executor')))
"error: cannot use sqlite with the {}".format(self.get('core', 'executor'))
)

if self.has_option('core', 'mp_start_method'):
mp_start_method = self.get('core', 'mp_start_method')
start_method_options = multiprocessing.get_all_start_methods()

if mp_start_method not in start_method_options:
raise AirflowConfigException(
"mp_start_method should not be " + mp_start_method +
". Possible values are " + ", ".join(start_method_options))
"mp_start_method should not be "
+ mp_start_method
+ ". Possible values are "
+ ", ".join(start_method_options)
)

def _using_old_value(self, old, current_value): # noqa
return old.search(current_value) is not None
Expand All @@ -264,7 +265,7 @@ def _create_future_warning(name, section, current_value, new_value, version):
'Airflow {version}.'.format(
name=name, section=section, current_value=current_value, new_value=new_value, version=version
),
FutureWarning
FutureWarning,
)

@staticmethod
Expand Down Expand Up @@ -336,17 +337,12 @@ def get(self, section, key, **kwargs):
def _get_option_from_default_config(self, section, key, **kwargs):
# ...then the default config
if self.airflow_defaults.has_option(section, key) or 'fallback' in kwargs:
return expand_env_var(
self.airflow_defaults.get(section, key, **kwargs))
return expand_env_var(self.airflow_defaults.get(section, key, **kwargs))

else:
log.warning(
"section/key [%s/%s] not found in config", section, key
)
log.warning("section/key [%s/%s] not found in config", section, key)

raise AirflowConfigException(
"section/key [{section}/{key}] not found "
"in config".format(section=section, key=key))
raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")

def _get_option_from_secrets(self, deprecated_key, deprecated_section, key, section):
# ...then from secret backends
Expand Down Expand Up @@ -377,16 +373,11 @@ def _get_option_from_config_file(self, deprecated_key, deprecated_section, key,
if super().has_option(section, key):
# Use the parent's methods to get the actual config here to be able to
# separate the config from default config.
return expand_env_var(
super().get(section, key, **kwargs))
return expand_env_var(super().get(section, key, **kwargs))
if deprecated_section:
if super().has_option(deprecated_section, deprecated_key):
self._warn_deprecate(section, key, deprecated_section, deprecated_key)
return expand_env_var(super().get(
deprecated_section,
deprecated_key,
**kwargs
))
return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs))
return None

def _get_environment_variables(self, deprecated_key, deprecated_section, key, section):
Expand Down Expand Up @@ -543,8 +534,13 @@ def write(self, fp, space_around_delimiters=True):
self._write_section(fp, section, self.getsection(section).items(), delimiter)

def as_dict(
self, display_source=False, display_sensitive=False, raw=False,
include_env=True, include_cmds=True, include_secret=True
self,
display_source=False,
display_sensitive=False,
raw=False,
include_env=True,
include_cmds=True,
include_secret=True,
) -> Dict[str, Dict[str, str]]:
"""
Returns the current configuration as an OrderedDict of OrderedDicts.
Expand Down Expand Up @@ -624,8 +620,9 @@ def _include_commands(self, config_sources, display_sensitive, display_source, r
del config_sources[section][key + '_cmd']

def _include_envs(self, config_sources, display_sensitive, display_source, raw):
for env_var in [os_environment
for os_environment in os.environ if os_environment.startswith('AIRFLOW__')]:
for env_var in [
os_environment for os_environment in os.environ if os_environment.startswith('AIRFLOW__')
]:
try:
_, section, key = env_var.split('__', 2)
opt = self._get_env_var_option(section, key)
Expand All @@ -651,13 +648,14 @@ def _include_envs(self, config_sources, display_sensitive, display_source, raw):
def _replace_config_with_display_sources(config_sources, configs, display_source, raw):
for (source_name, config) in configs:
for section in config.sections():
AirflowConfigParser.\
_replace_section_config_with_display_sources(
config, config_sources, display_source, raw, section, source_name)
AirflowConfigParser._replace_section_config_with_display_sources(
config, config_sources, display_source, raw, section, source_name
)

@staticmethod
def _replace_section_config_with_display_sources(config, config_sources, display_source, raw, section,
source_name):
def _replace_section_config_with_display_sources(
config, config_sources, display_source, raw, section, source_name
):
sect = config_sources.setdefault(section, OrderedDict())
for (k, val) in config.items(section=section, raw=raw):
if display_source:
Expand Down Expand Up @@ -730,19 +728,17 @@ def get_airflow_config(airflow_home):
# Set up dags folder for unit tests
# this directory won't exist if users install via pip
_TEST_DAGS_FOLDER = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
'tests',
'dags')
os.path.dirname(os.path.dirname(os.path.realpath(__file__))), 'tests', 'dags'
)
if os.path.exists(_TEST_DAGS_FOLDER):
TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
else:
TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, 'dags')

# Set up plugins folder for unit tests
_TEST_PLUGINS_FOLDER = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
'tests',
'plugins')
os.path.dirname(os.path.dirname(os.path.realpath(__file__))), 'tests', 'plugins'
)
if os.path.exists(_TEST_PLUGINS_FOLDER):
TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER
else:
Expand Down Expand Up @@ -777,20 +773,14 @@ def get_airflow_test_config(airflow_home):

SECRET_KEY = b64encode(os.urandom(16)).decode('utf-8')

TEMPLATE_START = (
'# ----------------------- TEMPLATE BEGINS HERE -----------------------')
TEMPLATE_START = '# ----------------------- TEMPLATE BEGINS HERE -----------------------'
if not os.path.isfile(TEST_CONFIG_FILE):
log.info(
'Creating new Airflow config file for unit tests in: %s', TEST_CONFIG_FILE
)
log.info('Creating new Airflow config file for unit tests in: %s', TEST_CONFIG_FILE)
with open(TEST_CONFIG_FILE, 'w') as file:
cfg = parameterized_config(TEST_CONFIG)
file.write(cfg.split(TEMPLATE_START)[-1].strip())
if not os.path.isfile(AIRFLOW_CONFIG):
log.info(
'Creating new Airflow config file in: %s',
AIRFLOW_CONFIG
)
log.info('Creating new Airflow config file in: %s', AIRFLOW_CONFIG)
with open(AIRFLOW_CONFIG, 'w') as file:
cfg = parameterized_config(DEFAULT_CONFIG)
cfg = cfg.split(TEMPLATE_START)[-1].strip()
Expand Down Expand Up @@ -835,110 +825,110 @@ def get_airflow_test_config(airflow_home):


# Historical convenience functions to access config entries
def load_test_config(): # noqa: D103
def load_test_config(): # noqa: D103
"""Historical load_test_config"""
warnings.warn(
"Accessing configuration method 'load_test_config' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.load_test_config'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
conf.load_test_config()


def get(*args, **kwargs): # noqa: D103
def get(*args, **kwargs): # noqa: D103
"""Historical get"""
warnings.warn(
"Accessing configuration method 'get' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.get'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.get(*args, **kwargs)


def getboolean(*args, **kwargs): # noqa: D103
def getboolean(*args, **kwargs): # noqa: D103
"""Historical getboolean"""
warnings.warn(
"Accessing configuration method 'getboolean' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.getboolean'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.getboolean(*args, **kwargs)


def getfloat(*args, **kwargs): # noqa: D103
def getfloat(*args, **kwargs): # noqa: D103
"""Historical getfloat"""
warnings.warn(
"Accessing configuration method 'getfloat' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.getfloat'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.getfloat(*args, **kwargs)


def getint(*args, **kwargs): # noqa: D103
def getint(*args, **kwargs): # noqa: D103
"""Historical getint"""
warnings.warn(
"Accessing configuration method 'getint' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.getint'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.getint(*args, **kwargs)


def getsection(*args, **kwargs): # noqa: D103
def getsection(*args, **kwargs): # noqa: D103
"""Historical getsection"""
warnings.warn(
"Accessing configuration method 'getsection' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.getsection'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.getint(*args, **kwargs)


def has_option(*args, **kwargs): # noqa: D103
def has_option(*args, **kwargs): # noqa: D103
"""Historical has_option"""
warnings.warn(
"Accessing configuration method 'has_option' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.has_option'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.has_option(*args, **kwargs)


def remove_option(*args, **kwargs): # noqa: D103
def remove_option(*args, **kwargs): # noqa: D103
"""Historical remove_option"""
warnings.warn(
"Accessing configuration method 'remove_option' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.remove_option'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.remove_option(*args, **kwargs)


def as_dict(*args, **kwargs): # noqa: D103
def as_dict(*args, **kwargs): # noqa: D103
"""Historical as_dict"""
warnings.warn(
"Accessing configuration method 'as_dict' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.as_dict'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.as_dict(*args, **kwargs)

Expand All @@ -950,7 +940,7 @@ def set(*args, **kwargs): # noqa pylint: disable=redefined-builtin
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.set'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.set(*args, **kwargs)

Expand Down