Skip to content
Permalink
Browse files
Bump pre-commit hook versions (#22887)
  • Loading branch information
mik-laj committed May 3, 2022
1 parent 5b1ab96 commit 2d109401b3566aef613501691d18cf7e4c776cd2
Showing 22 changed files with 65 additions and 97 deletions.
@@ -39,7 +39,7 @@ repos:
- "--maxlevel"
- "2"
- repo: https://github.com/Lucas-C/pre-commit-hooks
rev: v1.1.10
rev: v1.1.13
hooks:
- id: forbid-tabs
name: Fail if tabs are used in the project
@@ -146,6 +146,7 @@ repos:
- --fuzzy-match-generates-todo
files: >
\.cfg$|\.conf$|\.ini$|\.ldif$|\.properties$|\.readthedocs$|\.service$|\.tf$|Dockerfile.*$
# Keep version of black in sync wit blackend-docs and pre-commit-hook-names
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
@@ -161,7 +162,7 @@ repos:
alias: black
additional_dependencies: [black==22.3.0]
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.1.0
rev: v4.2.0
hooks:
- id: check-merge-conflict
name: Check that merge conflicts are not being committed
@@ -203,7 +204,7 @@ repos:
pass_filenames: true
# TODO: Bump to Python 3.8 when support for Python 3.7 is dropped in Airflow.
- repo: https://github.com/asottile/pyupgrade
rev: v2.31.0
rev: v2.32.0
hooks:
- id: pyupgrade
name: Upgrade Python code automatically
@@ -264,7 +265,7 @@ repos:
^airflow/_vendor/
additional_dependencies: ['flake8>=4.0.1']
- repo: https://github.com/ikamensh/flynt
rev: '0.69'
rev: '0.76'
hooks:
- id: flynt
name: Run flynt string format converter for Python
@@ -546,7 +547,7 @@ repos:
- id: run-shellcheck
name: Check Shell scripts syntax correctness
language: docker_image
entry: koalaman/shellcheck:v0.7.2 -x -a
entry: koalaman/shellcheck:v0.8.0 -x -a
files: ^breeze-legacy$|^breeze-complete$|\.sh$|^hooks/build$|^hooks/push$|\.bash$
exclude: ^dev/breeze/autocomplete/.*$
- id: lint-css
@@ -270,12 +270,8 @@ def connections_add(args):
or urlunparse(
(
new_conn.conn_type,
'{login}:{password}@{host}:{port}'.format(
login=new_conn.login or '',
password='******' if new_conn.password else '',
host=new_conn.host or '',
port=new_conn.port or '',
),
f"{new_conn.login or ''}:{'******' if new_conn.password else ''}"
f"@{new_conn.host or ''}:{new_conn.port or ''}",
new_conn.schema or '',
'',
'',
@@ -36,9 +36,7 @@ def should_run(**kwargs):
:rtype: str
"""
print(
'------------- exec dttm = {} and minute = {}'.format(
kwargs['execution_date'], kwargs['execution_date'].minute
)
f"------------- exec dttm = {kwargs['execution_date']} and minute = {kwargs['execution_date'].minute}"
)
if kwargs['execution_date'].minute % 2 == 0:
return "empty_task_1"
@@ -115,14 +115,9 @@ def _validate_pool(self, session):
pool = session.query(Pool).filter(Pool.slots == 1).filter(Pool.pool == self.pool).first()
if pool and any(t.pool == self.pool for t in self.subdag.tasks):
raise AirflowException(
'SubDagOperator {sd} and subdag task{plural} {t} both '
'use pool {p}, but the pool only has 1 slot. The '
'subdag tasks will never run.'.format(
sd=self.task_id,
plural=len(conflicts) > 1,
t=', '.join(t.task_id for t in conflicts),
p=self.pool,
)
f"SubDagOperator {self.task_id} and subdag task{'s' if len(conflicts) > 1 else ''} "
f"{', '.join(t.task_id for t in conflicts)} both use pool {self.pool}, "
f"but the pool only has 1 slot. The subdag tasks will never run."
)

def _get_dagrun(self, execution_date):
@@ -467,9 +467,8 @@ def _check_success_task(self) -> None:
# https://docs.aws.amazon.com/AmazonECS/latest/developerguide/stopped-task-errors.html
if re.match(r'Host EC2 \(instance .+?\) (stopped|terminated)\.', task.get('stoppedReason', '')):
raise AirflowException(
'The task was stopped because the host instance terminated: {}'.format(
task.get('stoppedReason', '')
)
f"The task was stopped because the host instance terminated:"
f" {task.get('stoppedReason', '')}"
)
containers = task['containers']
for container in containers:
@@ -488,9 +487,8 @@ def _check_success_task(self) -> None:
raise AirflowException(f'This task is still pending {task}')
elif 'error' in container.get('reason', '').lower():
raise AirflowException(
'This containers encounter an error during launching : {}'.format(
container.get('reason', '').lower()
)
f"This containers encounter an error during launching: "
f"{container.get('reason', '').lower()}"
)

def get_hook(self) -> AwsBaseHook:
@@ -258,8 +258,9 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
cluster_status = response['Cluster']['Status']
state_change_reason = cluster_status.get('StateChangeReason')
if state_change_reason:
return 'for code: {} with message {}'.format(
state_change_reason.get('Code', 'No code'), state_change_reason.get('Message', 'Unknown')
return (
f"for code: {state_change_reason.get('Code', 'No code')} "
f"with message {state_change_reason.get('Message', 'Unknown')}"
)
return None

@@ -338,7 +339,8 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
"""
fail_details = response['Step']['Status'].get('FailureDetails')
if fail_details:
return 'for reason {} with message {} and log file {}'.format(
fail_details.get('Reason'), fail_details.get('Message'), fail_details.get('LogFile')
return (
f"for reason {fail_details.get('Reason')} "
f"with message {fail_details.get('Message')} and log file {fail_details.get('LogFile')}"
)
return None
@@ -76,8 +76,9 @@ def __init__(
) -> None:
if 'col_blacklist' in kwargs:
warnings.warn(
'col_blacklist kwarg passed to {c} (task_id: {t}) is deprecated, please rename it to '
'excluded_columns instead'.format(c=self.__class__.__name__, t=kwargs.get('task_id')),
f"col_blacklist kwarg passed to {self.__class__.__name__} "
f"(task_id: {kwargs.get('task_id')}) is deprecated, "
f"please rename it to excluded_columns instead",
category=FutureWarning,
stacklevel=2,
)
@@ -32,12 +32,10 @@ def failure_callback(context):
:param context: The context of the executed task.
"""
message = (
'AIRFLOW TASK FAILURE TIPS:\n'
'DAG: {}\n'
'TASKS: {}\n'
'Reason: {}\n'.format(
context['task_instance'].dag_id, context['task_instance'].task_id, context['exception']
)
f"AIRFLOW TASK FAILURE TIPS:\n"
f"DAG: {context['task_instance'].dag_id}\n"
f"TASKS: {context['task_instance'].task_id}\n"
f"Reason: {context['exception']}\n"
)
return DingdingOperator(
task_id='dingding_success_callback',
@@ -495,8 +495,7 @@ def operations_contain_expected_statuses(

if len(NEGATIVE_STATUSES - current_statuses) != len(NEGATIVE_STATUSES):
raise AirflowException(
'An unexpected operation status was encountered. Expected: {}'.format(
", ".join(expected_statuses_set)
)
f"An unexpected operation status was encountered. "
f"Expected: {', '.join(expected_statuses_set)}"
)
return False
@@ -68,11 +68,9 @@ def get_uri(self, conn: Connection):
based on SSL or other InfluxDB host requirements
"""
return '{scheme}://{host}:{port}'.format(
scheme='https' if conn.schema is None else f'{conn.schema}',
host=conn.host,
port='7687' if conn.port is None else f'{conn.port}',
)
conn_scheme = 'https' if conn.schema is None else conn.schema
conn_port = 7687 if conn.port is None else conn.port
return f"{conn_scheme}://{conn.host}:{conn_port}"

def get_conn(self) -> InfluxDBClient:
"""
@@ -212,7 +212,7 @@ def execute(self, context: "Context") -> int:
for conn_id, account_name, share_name, mount_path, read_only in self.volumes:
hook = AzureContainerVolumeHook(conn_id)

mount_name = "mount-%d" % len(volumes)
mount_name = f"mount-{len(volumes)}"
volumes.append(hook.get_file_volume(mount_name, share_name, account_name, read_only))
volume_mounts.append(VolumeMount(name=mount_name, mount_path=mount_path, read_only=read_only))

@@ -60,13 +60,9 @@ def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None:
srv = self.extras.pop('srv', False)
scheme = 'mongodb+srv' if srv else 'mongodb'

self.uri = '{scheme}://{creds}{host}{port}/{database}'.format(
scheme=scheme,
creds=f'{self.connection.login}:{self.connection.password}@' if self.connection.login else '',
host=self.connection.host,
port='' if self.connection.port is None else f':{self.connection.port}',
database=self.connection.schema,
)
creds = f'{self.connection.login}:{self.connection.password}@' if self.connection.login else ''
port = '' if self.connection.port is None else f':{self.connection.port}'
self.uri = f'{scheme}://{creds}{self.connection.host}{port}/{self.connection.schema}'

def __enter__(self):
return self
@@ -91,12 +91,7 @@ def get_uri(self, conn: Connection) -> str:
elif trusted_ca:
encryption_scheme = '+s'

return '{scheme}{encryption_scheme}://{host}:{port}'.format(
scheme=scheme,
encryption_scheme=encryption_scheme,
host=conn.host,
port='7687' if conn.port is None else f'{conn.port}',
)
return f"{scheme}{encryption_scheme}://{conn.host}:{7687 if conn.port is None else conn.port}"

def run(self, query) -> Result:
"""
@@ -222,7 +222,7 @@ def get_changes(verbose: bool, previous_release: str, current_release: str) -> L
change_strings = subprocess.check_output(
get_git_log_command(verbose, from_commit=previous_release, to_commit=current_release),
cwd=SOURCE_DIR_PATH,
universal_newlines=True,
text=True,
)
return [get_change_from_line(line) for line in change_strings.split("\n")]

@@ -166,7 +166,7 @@ def get_changes(
verbose, from_commit=previous_release, to_commit=current_release, is_helm_chart=is_helm_chart
),
cwd=SOURCE_DIR_PATH,
universal_newlines=True,
text=True,
)
return [get_change_from_line(line) for line in change_strings.split("\n")]

@@ -1318,7 +1318,7 @@ def get_all_changes_for_package(
changes = subprocess.check_output(
get_git_log_command(verbose, HEAD_OF_HTTPS_REMOTE, current_tag_no_suffix),
cwd=source_provider_package_path,
universal_newlines=True,
text=True,
)
if changes:
provider_details = get_provider_details(provider_package_id)
@@ -1332,7 +1332,7 @@ def get_all_changes_for_package(
changes_since_last_doc_only_check = subprocess.check_output(
get_git_log_command(verbose, HEAD_OF_HTTPS_REMOTE, last_doc_only_hash),
cwd=source_provider_package_path,
universal_newlines=True,
text=True,
)
if not changes_since_last_doc_only_check:
console.print()
@@ -1385,7 +1385,7 @@ def get_all_changes_for_package(
changes = subprocess.check_output(
get_git_log_command(verbose, next_version_tag, version_tag),
cwd=source_provider_package_path,
universal_newlines=True,
text=True,
)
changes_table_for_version, array_of_changes_for_version = convert_git_changes_to_table(
current_version, changes, base_url="https://github.com/apache/airflow/commit/", markdown=False
@@ -1397,7 +1397,7 @@ def get_all_changes_for_package(
changes = subprocess.check_output(
get_git_log_command(verbose, next_version_tag),
cwd=source_provider_package_path,
universal_newlines=True,
text=True,
)
changes_table_for_version, array_of_changes_for_version = convert_git_changes_to_table(
current_version, changes, base_url="https://github.com/apache/airflow/commit/", markdown=False
@@ -35,7 +35,7 @@ function md5sum::calculate_file_md5sum {
echo "${md5sum}" > "${md5sum_file_new}"
local ret_code=0
if [[ ! -f "${md5sum_file}" ]]; then
verbosity::print_info "Missing md5sum for ${file#${AIRFLOW_SOURCES}} (${md5sum_file#${AIRFLOW_SOURCES}})"
verbosity::print_info "Missing md5sum for ${file#"${AIRFLOW_SOURCES}"} (${md5sum_file#"${AIRFLOW_SOURCES}"})"
ret_code=1
else
diff "${md5sum_file_new}" "${md5sum_file}" >/dev/null
@@ -917,8 +917,7 @@ def test_process_error_event_for_raise_if_not_410(self):
self.events.append({"type": "ERROR", "object": self.pod, "raw_object": raw_object})
with self.assertRaises(AirflowException) as e:
self._run()
assert str(e.exception) == 'Kubernetes failure for {} with code {} and message: {}'.format(
raw_object['reason'],
raw_object['code'],
raw_object['message'],
assert str(e.exception) == (
f"Kubernetes failure for {raw_object['reason']} "
f"with code {raw_object['code']} and message: {raw_object['message']}"
)
@@ -437,19 +437,13 @@ def validate_dags(self, expected_parent_dag, actual_found_dags, actual_dagbag, s

for dag_id in expected_dag_ids:
actual_dagbag.log.info(f'validating {dag_id}')
assert (
dag_id in actual_found_dag_ids
) == should_be_found, 'dag "{}" should {}have been found after processing dag "{}"'.format(
dag_id,
'' if should_be_found else 'not ',
expected_parent_dag.dag_id,
assert (dag_id in actual_found_dag_ids) == should_be_found, (
f"dag \"{dag_id}\" should {'' if should_be_found else 'not '}"
f"have been found after processing dag \"{expected_parent_dag.dag_id}\""
)
assert (
dag_id in actual_dagbag.dags
) == should_be_found, 'dag "{}" should {}be in dagbag.dags after processing dag "{}"'.format(
dag_id,
'' if should_be_found else 'not ',
expected_parent_dag.dag_id,
assert (dag_id in actual_dagbag.dags) == should_be_found, (
f"dag \"{dag_id}\" should {'' if should_be_found else 'not '}"
f"be in dagbag.dags after processing dag \"{expected_parent_dag.dag_id}\""
)

def test_load_subdags(self):
@@ -528,11 +528,8 @@ def test_secondary_training_status_changed_false(self):
def test_secondary_training_status_message_status_changed(self):
now = datetime.now(tzlocal())
SECONDARY_STATUS_DESCRIPTION_1['LastModifiedTime'] = now
expected = '{} {} - {}'.format(
datetime.utcfromtimestamp(time.mktime(now.timetuple())).strftime('%Y-%m-%d %H:%M:%S'),
status,
message,
)
expected_time = datetime.utcfromtimestamp(time.mktime(now.timetuple())).strftime('%Y-%m-%d %H:%M:%S')
expected = f"{expected_time} {status} - {message}"
assert (
secondary_training_status_message(SECONDARY_STATUS_DESCRIPTION_1, SECONDARY_STATUS_DESCRIPTION_2)
== expected
@@ -52,8 +52,9 @@ def state_from_response(response):
def failure_message_from_response(response):
change_reason = response['SomeKey'].get('StateChangeReason')
if change_reason:
return 'for code: {} with message {}'.format(
change_reason.get('Code', EMPTY_CODE), change_reason.get('Message', 'Unknown')
return (
f"for code: {change_reason.get('Code', EMPTY_CODE)} "
f"with message {change_reason.get('Message', 'Unknown')}"
)
return None

@@ -125,8 +125,8 @@ def test_external_task_sensor_failed_states_as_success(self):
with pytest.raises(AirflowException) as ctx:
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
assert (
'INFO:airflow.task.operators:Poking for tasks [\'time_sensor_check\']'
' in dag unit_test_dag on %s ... ' % DEFAULT_DATE.isoformat() in cm.output
f'INFO:airflow.task.operators:Poking for tasks [\'time_sensor_check\'] '
f'in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... ' in cm.output
)
assert (
str(ctx.value) == "Some of the external tasks "
@@ -191,9 +191,9 @@ def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self):
with pytest.raises(AirflowException) as ctx:
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
assert (
'INFO:airflow.task.operators:Poking for tasks '
'[\'time_sensor_check\', \'time_sensor_check_alternate\'] '
'in dag unit_test_dag on %s ... ' % DEFAULT_DATE.isoformat() in cm.output
f'INFO:airflow.task.operators:Poking for tasks '
f'[\'time_sensor_check\', \'time_sensor_check_alternate\'] '
f'in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... ' in cm.output
)
assert (
str(ctx.value) == "Some of the external tasks "

0 comments on commit 2d10940

Please sign in to comment.