Skip to content

Commit

Permalink
Resolve G004: Logging statement uses f-string (#37873)
Browse files Browse the repository at this point in the history
* Resolve G004: Logging statement uses f-string

* Remove redundant whitespace in test case logging
  • Loading branch information
Taragolis committed Mar 6, 2024
1 parent ec220a8 commit 3f52790
Show file tree
Hide file tree
Showing 31 changed files with 82 additions and 59 deletions.
2 changes: 1 addition & 1 deletion airflow/cli/commands/internal_api_command.py
Expand Up @@ -70,7 +70,7 @@ def internal_api(args):
worker_timeout = args.worker_timeout

if args.debug:
log.info(f"Starting the Internal API server on port {args.port} and host {args.hostname}.")
log.info("Starting the Internal API server on port %s and host %s.", args.port, args.hostname)
app = create_app(testing=conf.getboolean("core", "unit_test_mode"))
app.run(
debug=True, # nosec
Expand Down
6 changes: 4 additions & 2 deletions airflow/configuration.py
Expand Up @@ -259,8 +259,10 @@ def _update_defaults_from_string(self, config_string: str):
if not self.is_template(section, key) and "{" in value:
errors = True
log.error(
f"The {section}.{key} value {value} read from string contains "
"variable. This is not supported"
"The %s.%s value %s read from string contains variable. This is not supported",
section,
key,
value,
)
self._default_values.set(section, key, value)
if errors:
Expand Down
8 changes: 5 additions & 3 deletions airflow/operators/python.py
Expand Up @@ -936,9 +936,11 @@ def _get_airflow_version_from_target_env(self) -> str | None:
if self.expect_airflow:
self.log.warning("When checking for Airflow installed in virtual environment got %s", e)
self.log.warning(
f"This means that Airflow is not properly installed by "
f"{self.python}. Airflow context keys will not be available. "
f"Please Install Airflow {airflow_version} in your environment to access them."
"This means that Airflow is not properly installed by %s. "
"Airflow context keys will not be available. "
"Please Install Airflow %s in your environment to access them.",
self.python,
airflow_version,
)
return None

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
Expand Up @@ -208,7 +208,7 @@ def sync(self):
if error_code in INVALID_CREDENTIALS_EXCEPTIONS:
self.IS_BOTO_CONNECTION_HEALTHY = False
self.log.warning(
f"AWS credentials are either missing or expired: {error}.\nRetrying connection"
"AWS credentials are either missing or expired: %s.\nRetrying connection", error
)

except Exception:
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/amazon/aws/hooks/batch_client.py
Expand Up @@ -438,7 +438,7 @@ def get_job_awslogs_info(self, job_id: str) -> dict[str, str] | None:
return None
if len(all_info) > 1:
self.log.warning(
f"AWS Batch job ({job_id}) has more than one log stream, only returning the first one."
"AWS Batch job (%s) has more than one log stream, only returning the first one.", job_id
)
return all_info[0]

Expand Down Expand Up @@ -474,15 +474,15 @@ def get_job_all_awslogs_info(self, job_id: str) -> list[dict[str, str]]:
# If the user selected another logDriver than "awslogs", then CloudWatch logging is disabled.
if any(c.get("logDriver", "awslogs") != "awslogs" for c in log_configs):
self.log.warning(
f"AWS Batch job ({job_id}) uses non-aws log drivers. AWS CloudWatch logging disabled."
"AWS Batch job (%s) uses non-aws log drivers. AWS CloudWatch logging disabled.", job_id
)
return []

if not stream_names:
# If this method is called very early after starting the AWS Batch job,
# there is a possibility that the AWS CloudWatch Stream Name would not exist yet.
# This can also happen in case of misconfiguration.
self.log.warning(f"AWS Batch job ({job_id}) doesn't have any AWS CloudWatch Stream.")
self.log.warning("AWS Batch job (%s) doesn't have any AWS CloudWatch Stream.", job_id)
return []

# Try to get user-defined log configuration options
Expand Down
5 changes: 3 additions & 2 deletions airflow/providers/amazon/aws/hooks/glue.py
Expand Up @@ -245,8 +245,9 @@ def display_logs_from(log_group: str, continuation_token: str | None) -> str | N
if e.response["Error"]["Code"] == "ResourceNotFoundException":
# we land here when the log groups/streams don't exist yet
self.log.warning(
"No new Glue driver logs so far.\nIf this persists, check the CloudWatch dashboard "
f"at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home"
"No new Glue driver logs so far.\n"
"If this persists, check the CloudWatch dashboard at: %r.",
f"https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home",
)
else:
raise
Expand Down
6 changes: 4 additions & 2 deletions airflow/providers/amazon/aws/transfers/azure_blob_to_s3.py
Expand Up @@ -120,8 +120,10 @@ def execute(self, context: Context) -> list[str]:
)

self.log.info(
f"Getting list of the files in Container: {self.container_name}; "
f"Prefix: {self.prefix}; Delimiter: {self.delimiter};"
"Getting list of the files in Container: %r; Prefix: %r; Delimiter: %r.",
self.container_name,
self.prefix,
self.delimiter,
)

files = wasb_hook.get_blobs_list_recursive(
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/triggers/eks.py
Expand Up @@ -214,7 +214,7 @@ async def delete_any_fargate_profiles(self, client) -> None:
)
self.log.info("All Fargate profiles deleted")
else:
self.log.info(f"No Fargate profiles associated with cluster {self.cluster_name}")
self.log.info("No Fargate profiles associated with cluster %s", self.cluster_name)


class EksCreateFargateProfileTrigger(AwsBaseWaiterTrigger):
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/beam/triggers/beam.py
Expand Up @@ -227,7 +227,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
)
is_running = bool([job async for job in jobs if job.name == self.job_name])
except Exception as e:
self.log.exception(f"Exception occurred while requesting jobs with name {self.job_name}")
self.log.exception("Exception occurred while requesting jobs with name %s", self.job_name)
yield TriggerEvent({"status": "error", "message": str(e)})
return
if is_running:
Expand Down
7 changes: 5 additions & 2 deletions airflow/providers/apache/kafka/operators/produce.py
Expand Up @@ -30,10 +30,13 @@

def acked(err, msg):
if err is not None:
local_logger.error(f"Failed to deliver message: {err}")
local_logger.error("Failed to deliver message: %s", err)
else:
local_logger.info(
f"Produced record to topic {msg.topic()} partition [{msg.partition()}] @ offset {msg.offset()}"
"Produced record to topic %s, partition [%s] @ offset %s",
msg.topic(),
msg.partition(),
msg.offset(),
)


Expand Down
9 changes: 6 additions & 3 deletions airflow/providers/fab/auth_manager/decorators/auth.py
Expand Up @@ -93,11 +93,14 @@ def decorated(*args, **kwargs):

if len(unique_dag_ids) > 1:
log.warning(
f"There are different dag_ids passed in the request: {unique_dag_ids}. Returning 403."
"There are different dag_ids passed in the request: %s. Returning 403.", unique_dag_ids
)
log.warning(
f"kwargs: {dag_id_kwargs}, args: {dag_id_args}, "
f"form: {dag_id_form}, json: {dag_id_json}"
"kwargs: %s, args: %s, form: %s, json: %s",
dag_id_kwargs,
dag_id_args,
dag_id_form,
dag_id_json,
)
return (
render_template(
Expand Down
4 changes: 3 additions & 1 deletion airflow/providers/ftp/operators/ftp.py
Expand Up @@ -156,7 +156,9 @@ def get_openlineage_facets_on_start(self):
local_host = socket.gethostbyname(local_host)
except Exception as e:
self.log.warning(
f"Failed to resolve local hostname. Using the hostname got by socket.gethostbyname() without resolution. {e}",
"Failed to resolve local hostname. "
"Using the hostname got by socket.gethostbyname() without resolution. %s",
e,
exc_info=True,
)

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/hooks/compute_ssh.py
Expand Up @@ -281,7 +281,7 @@ def get_conn(self) -> paramiko.SSHClient:
if retry == self.max_retries:
raise AirflowException("Maximum retries exceeded. Aborting operation.")
delay = random.randint(0, max_delay)
self.log.info(f"Failed establish SSH connection, waiting {delay} seconds to retry...")
self.log.info("Failed establish SSH connection, waiting %s seconds to retry...", delay)
time.sleep(delay)
if not sshclient:
raise AirflowException("Unable to establish SSH connection.")
Expand Down
Expand Up @@ -467,7 +467,7 @@ def _find_max_value_in_column(self):
impersonation_chain=self.impersonation_chain,
)
if self.max_id_key:
self.log.info(f"Selecting the MAX value from BigQuery column '{self.max_id_key}'...")
self.log.info("Selecting the MAX value from BigQuery column %r...", self.max_id_key)
select_command = (
f"SELECT MAX({self.max_id_key}) AS max_value "
f"FROM {self.destination_project_dataset_table}"
Expand Down
7 changes: 2 additions & 5 deletions airflow/providers/google/cloud/transfers/s3_to_gcs.py
Expand Up @@ -319,14 +319,11 @@ def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3H
body[TRANSFER_SPEC][OBJECT_CONDITIONS][INCLUDE_PREFIXES] = files_chunk
job = transfer_hook.create_transfer_job(body=body)

s = "s" if len(files_chunk) > 1 else ""
self.log.info(f"Submitted job {job['name']} to transfer {len(files_chunk)} file{s}")
self.log.info("Submitted job %s to transfer %s file(s).", job["name"], len(files_chunk))
job_names.append(job["name"])

if len(files) > chunk_size:
js = "s" if len(job_names) > 1 else ""
fs = "s" if len(files) > 1 else ""
self.log.info(f"Overall submitted {len(job_names)} job{js} to transfer {len(files)} file{fs}")
self.log.info("Overall submitted %s job(s) to transfer %s file(s).", len(job_names), len(files))

return job_names

Expand Down
5 changes: 3 additions & 2 deletions airflow/providers/google/cloud/transfers/sql_to_gcs.py
Expand Up @@ -153,9 +153,10 @@ def __init__(
def execute(self, context: Context):
if self.partition_columns:
self.log.info(
f"Found partition columns: {','.join(self.partition_columns)}. "
"Found partition columns: %s. "
"Assuming the SQL statement is properly sorted by these columns in "
"ascending or descending order."
"ascending or descending order.",
",".join(self.partition_columns),
)

self.log.info("Executing query")
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/triggers/cloud_batch.py
Expand Up @@ -140,7 +140,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
yield TriggerEvent({"status": "error", "message": str(e)})
return

self.log.exception(f"Job with name [{self.job_name}] timed out")
self.log.exception("Job with name [%s] timed out", self.job_name)
yield TriggerEvent(
{
"job_name": self.job_name,
Expand Down
Expand Up @@ -105,8 +105,9 @@ def execute(
impersonation_chain=self.impersonation_chain,
)
self.log.info(
"Requesting list of Google Analytics accounts. "
f"Page size: {self.page_size}, page token: {self.page_token}"
"Requesting list of Google Analytics accounts. Page size: %s, page token: %s",
self.page_size,
self.page_token,
)
accounts = hook.list_accounts(
page_size=self.page_size,
Expand Down
Expand Up @@ -157,7 +157,7 @@ def poke(self, context: Context) -> bool:

response = hook.get_report(query_id=self.query_id, report_id=self.report_id)
status = response.get("metadata", {}).get("status", {}).get("state")
self.log.info(f"STATUS OF THE REPORT {self.report_id} FOR QUERY {self.query_id}: {status}")
self.log.info("STATUS OF THE REPORT %s FOR QUERY %s: %s", self.report_id, self.query_id, status)
if response and status in ["DONE", "FAILED"]:
return True
return False
14 changes: 8 additions & 6 deletions airflow/providers/openlineage/extractors/base.py
Expand Up @@ -87,8 +87,9 @@ def _execute_extraction(self) -> OperatorLineage | None:
def extract(self) -> OperatorLineage | None:
if self._is_operator_disabled:
self.log.debug(
f"Skipping extraction for operator {self.operator.task_type} "
"due to its presence in [openlineage] openlineage_disabled_for_operators."
"Skipping extraction for operator %s "
"due to its presence in [openlineage] openlineage_disabled_for_operators.",
self.operator.task_type,
)
return None
return self._execute_extraction()
Expand Down Expand Up @@ -121,16 +122,17 @@ def _execute_extraction(self) -> OperatorLineage | None:
return None
except AttributeError:
self.log.debug(
f"Operator {self.operator.task_type} does not have the "
"get_openlineage_facets_on_start method."
"Operator %s does not have the get_openlineage_facets_on_start method.",
self.operator.task_type,
)
return None

def extract_on_complete(self, task_instance) -> OperatorLineage | None:
if self._is_operator_disabled:
self.log.debug(
f"Skipping extraction for operator {self.operator.task_type} "
"due to its presence in [openlineage] openlineage_disabled_for_operators."
"Skipping extraction for operator %s "
"due to its presence in [openlineage] openlineage_disabled_for_operators.",
self.operator.task_type,
)
return None
if task_instance.state == TaskInstanceState.FAILED:
Expand Down
6 changes: 4 additions & 2 deletions airflow/providers/openlineage/plugins/listener.py
Expand Up @@ -61,8 +61,10 @@ def on_task_instance_running(
):
if not hasattr(task_instance, "task"):
self.log.warning(
f"No task set for TI object task_id: {task_instance.task_id} - "
f"dag_id: {task_instance.dag_id} - run_id {task_instance.run_id}"
"No task set for TI object task_id: %s - dag_id: %s - run_id %s",
task_instance.task_id,
task_instance.dag_id,
task_instance.run_id,
)
return

Expand Down
7 changes: 5 additions & 2 deletions airflow/providers/sftp/operators/sftp.py
Expand Up @@ -210,7 +210,9 @@ def get_openlineage_facets_on_start(self):
local_host = socket.gethostbyname(local_host)
except Exception as e:
self.log.warning(
f"Failed to resolve local hostname. Using the hostname got by socket.gethostbyname() without resolution. {e}",
"Failed to resolve local hostname. "
"Using the hostname got by socket.gethostbyname() without resolution. %s",
e,
exc_info=True,
)

Expand All @@ -225,7 +227,8 @@ def get_openlineage_facets_on_start(self):
remote_host = socket.gethostbyname(remote_host)
except OSError as e:
self.log.warning(
f"Failed to resolve remote hostname. Using the provided hostname without resolution. {e}",
"Failed to resolve remote hostname. Using the provided hostname without resolution. %s",
e,
exc_info=True,
)

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/snowflake/transfers/copy_into_snowflake.py
Expand Up @@ -254,8 +254,8 @@ def get_openlineage_facets_on_complete(self, task_instance):
run_facets = {}
if extraction_error_files:
self.log.debug(
f"Unable to extract Dataset namespace and name "
f"for the following files: `{extraction_error_files}`."
"Unable to extract Dataset namespace and name for the following files: `%s`.",
extraction_error_files,
)
run_facets["extractionError"] = ExtractionErrorRunFacet(
totalTasks=len(query_results),
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers_manager.py
Expand Up @@ -657,7 +657,7 @@ def _discover_all_airflow_builtin_providers_from_local_sources(self) -> None:
seen.add(path)
self._add_provider_info_from_local_source_files_on_path(path)
except Exception as e:
log.warning(f"Error when loading 'provider.yaml' files from {path} airflow sources: {e}")
log.warning("Error when loading 'provider.yaml' files from %s airflow sources: %s", path, e)

def _add_provider_info_from_local_source_files_on_path(self, path) -> None:
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/triggers/temporal.py
Expand Up @@ -63,7 +63,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
for step in 3600, 60, 10:
seconds_remaining = (self.moment - pendulum.instance(timezone.utcnow())).total_seconds()
while seconds_remaining > 2 * step:
self.log.info(f"{int(seconds_remaining)} seconds remaining; sleeping {step} seconds")
self.log.info("%d seconds remaining; sleeping %s seconds", seconds_remaining, step)
await asyncio.sleep(step)
seconds_remaining = (self.moment - pendulum.instance(timezone.utcnow())).total_seconds()
# Sleep a second at a time otherwise
Expand Down
9 changes: 6 additions & 3 deletions airflow/www/auth.py
Expand Up @@ -222,11 +222,14 @@ def decorated(*args, **kwargs):

if len(unique_dag_ids) > 1:
log.warning(
f"There are different dag_ids passed in the request: {unique_dag_ids}. Returning 403."
"There are different dag_ids passed in the request: %s. Returning 403.", unique_dag_ids
)
log.warning(
f"kwargs: {dag_id_kwargs}, args: {dag_id_args}, "
f"form: {dag_id_form}, json: {dag_id_json}"
"kwargs: %s, args: %s, form: %s, json: %s",
dag_id_kwargs,
dag_id_args,
dag_id_form,
dag_id_json,
)
return (
render_template(
Expand Down
2 changes: 1 addition & 1 deletion airflow/www/views.py
Expand Up @@ -4896,7 +4896,7 @@ def varimport(self, session):
if action_on_existing == "fail" and existing_keys:
failed_repr = ", ".join(repr(k) for k in sorted(existing_keys))
flash(f"Failed. The variables with these keys: {failed_repr} already exists.")
logger.error(f"Failed. The variables with these keys: {failed_repr} already exists.")
logger.error("Failed. The variables with these keys: %s already exists.", failed_repr)
self.update_redirect()
return redirect(self.get_redirect())
skipped = set()
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Expand Up @@ -1339,7 +1339,6 @@ extend-select = [
"B006", # Checks for uses of mutable objects as function argument defaults.
]
ignore = [
"G004", # Logging statement uses f-string (not fixed yet)
"D203",
"D212",
"D213",
Expand Down
4 changes: 2 additions & 2 deletions tests/system/providers/amazon/aws/example_sagemaker.py
Expand Up @@ -458,8 +458,8 @@ def delete_docker_image(image_name):
if docker_build.returncode != 0:
logger.error(
"Failed to delete local docker image. "
"Run 'docker images' to see if you need to clean it yourself.\n"
f"error message: {stderr}"
"Run 'docker images' to see if you need to clean it yourself.\nerror message: %s",
stderr,
)


Expand Down
Expand Up @@ -125,15 +125,15 @@ def producer_function():
def consumer_function(message, prefix=None):
key = json.loads(message.key())
value = json.loads(message.value())
consumer_logger.info(f"{prefix} {message.topic()} @ {message.offset()}; {key} : {value}")
consumer_logger.info("%s %s @ %s; %s : %s", prefix, message.topic(), message.offset(), key, value)
return


def consumer_function_batch(messages, prefix=None):
for message in messages:
key = json.loads(message.key())
value = json.loads(message.value())
consumer_logger.info(f"{prefix} {message.topic()} @ {message.offset()}; {key} : {value}")
consumer_logger.info("%s %s @ %s; %s : %s", prefix, message.topic(), message.offset(), key, value)
return


Expand Down

0 comments on commit 3f52790

Please sign in to comment.