Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up f-strings in logging calls #23597

Merged
merged 3 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ def _deactivate_stale_dags(self, session=None):
dag.fileloc in last_parsed
and (dag.last_parsed_time + self._processor_timeout) < last_parsed[dag.fileloc]
):
self.log.info(f"DAG {dag.dag_id} is missing and will be deactivated.")
self.log.info("DAG %s is missing and will be deactivated.", dag.dag_id)
to_deactivate.add(dag.dag_id)

if to_deactivate:
Expand Down
5 changes: 3 additions & 2 deletions airflow/kubernetes/pod_launcher_deprecated.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ def parse_log_line(self, line: str) -> Tuple[Optional[str], str]:
split_at = line.find(' ')
if split_at == -1:
self.log.error(
f"Error parsing timestamp (no timestamp in message: '{line}'). "
"Will continue execution but won't update timestamp"
"Error parsing timestamp (no timestamp in message: %r). "
"Will continue execution but won't update timestamp",
line,
)
return None, line
timestamp = line[:split_at]
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ def get_uri(self) -> str:
"""Return connection in URI format"""
if '_' in self.conn_type:
self.log.warning(
f"Connection schemes (type: {str(self.conn_type)}) "
f"shall not contain '_' according to RFC3986."
"Connection schemes (type: %s) shall not contain '_' according to RFC3986.",
self.conn_type,
)

uri = f"{str(self.conn_type).lower().replace('_', '-')}://"
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,8 +789,8 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis: Lis
true_delay = first_start_date - data_interval_end
if true_delay.total_seconds() > 0:
Stats.timing(f'dagrun.{dag.dag_id}.first_task_scheduling_delay', true_delay)
except Exception as e:
self.log.warning(f'Failed to record first_task_scheduling_delay metric:\n{e}')
except Exception:
self.log.warning('Failed to record first_task_scheduling_delay metric:', exc_info=True)

def _emit_duration_stats_for_finished_state(self):
if self.state == State.RUNNING:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/operators/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def _start_task(self, context):

self.arn = response['tasks'][0]['taskArn']
self.ecs_task_id = self.arn.split("/")[-1]
self.log.info(f"ECS task ID is: {self.ecs_task_id}")
self.log.info("ECS task ID is: %s", self.ecs_task_id)

if self.reattach:
# Save the task ARN in XCom to be able to reattach it if needed
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/operators/redshift_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ def wait_for_results(self, statement_id):
elif status == 'FAILED' or status == 'ABORTED':
raise ValueError(f"Statement {statement_id!r} terminated with status {status}.")
else:
self.log.info(f"Query {status}")
self.log.info("Query %s", status)
sleep(self.poll_interval)

def execute(self, context: 'Context') -> None:
"""Execute a statement against Amazon Redshift"""
self.log.info(f"Executing statement: {self.sql}")
self.log.info("Executing statement: %s", self.sql)

self.statement_id = self.execute_query()

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/operators/redshift_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ def get_hook(self) -> RedshiftSQLHook:

def execute(self, context: 'Context') -> None:
"""Execute a statement against Amazon Redshift"""
self.log.info(f"Executing statement: {self.sql}")
self.log.info("Executing statement: %s", self.sql)
hook = self.get_hook()
hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/operators/sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,4 +693,4 @@ def __init__(self, *, config, aws_conn_id: str = DEFAULT_CONN_ID, **kwargs):
def execute(self, context: 'Context') -> Any:
sagemaker_hook = SageMakerHook(aws_conn_id=self.aws_conn_id)
sagemaker_hook.delete_model(model_name=self.config['ModelName'])
self.log.info(f"Model {self.config['ModelName']} deleted Successfully.")
self.log.info("Model %s deleted successfully.", self.config['ModelName'])
6 changes: 3 additions & 3 deletions airflow/providers/amazon/aws/transfers/ftp_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ def __upload_to_s3_from_ftp(self, remote_filename, s3_file_key):
gzip=self.gzip,
acl_policy=self.acl_policy,
)
self.log.info(f'File upload to {s3_file_key}')
self.log.info('File upload to %s', s3_file_key)

def execute(self, context: 'Context'):
self.ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
self.s3_hook = S3Hook(self.aws_conn_id)

if self.ftp_filenames:
if isinstance(self.ftp_filenames, str):
self.log.info(f'Getting files in {self.ftp_path}')
self.log.info('Getting files in %s', self.ftp_path)

list_dir = self.ftp_hook.list_directory(
path=self.ftp_path,
Expand All @@ -129,7 +129,7 @@ def execute(self, context: 'Context'):
files = list(filter(lambda f: ftp_filename in f, list_dir))

for file in files:
self.log.info(f'Moving file {file}')
self.log.info('Moving file %s', file)

if self.s3_filenames and isinstance(self.s3_filenames, str):
filename = file.replace(self.ftp_filenames, self.s3_filenames)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,6 @@ def execute(self, context: 'Context') -> str:
)

s3_uri = f"s3://{self.s3_bucket_name}/{self.s3_key}"
self.log.info(f"Salesforce data uploaded to S3 at {s3_uri}.")
self.log.info("Salesforce data uploaded to S3 at %s.", s3_uri)

return s3_uri
4 changes: 2 additions & 2 deletions airflow/providers/arangodb/sensors/arangodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ def __init__(self, *, query: str, arangodb_conn_id: str = "arangodb_default", **
self.query = query

def poke(self, context: 'Context') -> bool:
self.log.info(f"Sensor running following query: {self.query}")
self.log.info("Sensor running the following query: %s", self.query)
hook = ArangoDBHook(self.arangodb_conn_id)
records = hook.query(self.query, count=True).count()
self.log.info(f"Total Records found: {records}")
self.log.info("Total records found: %d", records)
return 0 != records
2 changes: 1 addition & 1 deletion airflow/providers/cncf/kubernetes/hooks/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def create_custom_object(
)
self.log.warning("Deleted SparkApplication with the same name.")
except client.rest.ApiException:
self.log.info(f"SparkApp {body_dict['metadata']['name']} not found.")
self.log.info("SparkApp %s not found.", body_dict['metadata']['name'])

try:
response = api.create_namespaced_custom_object(
Expand Down
5 changes: 3 additions & 2 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,9 @@ def parse_log_line(self, line: str) -> Tuple[Optional[DateTime], str]:
split_at = line.find(' ')
if split_at == -1:
self.log.error(
f"Error parsing timestamp (no timestamp in message '${line}'). "
"Will continue execution but won't update timestamp"
"Error parsing timestamp (no timestamp in message %r). "
"Will continue execution but won't update timestamp",
line,
)
return None, line
timestamp = line[:split_at]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ def execute(self, context: 'Context'):
impersonation_chain=self.impersonation_chain,
)

self.log.info(f"Removing a DeployedModel {self.deployed_model_id}")
self.log.info("Removing a DeployedModel %s", self.deployed_model_id)
operation = hook.undeploy_model(
project_id=self.project_id,
region=self.region,
Expand Down
5 changes: 3 additions & 2 deletions airflow/providers/google/cloud/sensors/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ def poke(self, context: "Context") -> bool:
job_id=self.dataproc_job_id, region=self.region, project_id=self.project_id
)
except ServerError as err:
self.log.info(f"DURATION RUN: {self._duration()}")
if self._duration() > self.wait_timeout:
duration = self._duration()
self.log.info("DURATION RUN: %f", duration)
if duration > self.wait_timeout:
raise AirflowException(
f"Timeout: dataproc job {self.dataproc_job_id} "
f"is not ready after {self.wait_timeout}s"
Expand Down
5 changes: 4 additions & 1 deletion airflow/providers/google/suite/transfers/sql_to_sheets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


import datetime
import logging
import numbers
from contextlib import closing
from typing import Any, Iterable, Mapping, Optional, Sequence, Union
Expand Down Expand Up @@ -120,7 +121,9 @@ def execute(self, context: Any) -> None:
impersonation_chain=self.impersonation_chain,
)

self.log.info(f"Uploading data to https://docs.google.com/spreadsheets/d/{self.spreadsheet_id}")
if self.log.isEnabledFor(logging.INFO):
url = f"https://docs.google.com/spreadsheets/d/{self.spreadsheet_id}"
self.log.info("Uploading data to %s", url)

sheet_hook.update_values(
spreadsheet_id=self.spreadsheet_id,
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/microsoft/azure/hooks/data_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,13 +637,13 @@ def get_pipeline_run_status(
:param factory_name: The factory name.
:return: The status of the pipeline run.
"""
self.log.info(f"Getting the status of run ID {run_id}.")
self.log.info("Getting the status of run ID %s.", run_id)
pipeline_run_status = self.get_pipeline_run(
run_id=run_id,
factory_name=factory_name,
resource_group_name=resource_group_name,
).status
self.log.info(f"Current status of pipeline run {run_id}: {pipeline_run_status}")
self.log.info("Current status of pipeline run %s: %s", run_id, pipeline_run_status)

return pipeline_run_status

Expand Down
8 changes: 4 additions & 4 deletions airflow/providers/microsoft/azure/operators/data_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def __init__(

def execute(self, context: "Context") -> None:
self.hook = AzureDataFactoryHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
self.log.info(f"Executing the {self.pipeline_name} pipeline.")
self.log.info("Executing the %s pipeline.", self.pipeline_name)
response = self.hook.run_pipeline(
pipeline_name=self.pipeline_name,
resource_group_name=self.resource_group_name,
Expand All @@ -174,7 +174,7 @@ def execute(self, context: "Context") -> None:
context["ti"].xcom_push(key="run_id", value=self.run_id)

if self.wait_for_termination:
self.log.info(f"Waiting for pipeline run {self.run_id} to terminate.")
self.log.info("Waiting for pipeline run %s to terminate.", self.run_id)

if self.hook.wait_for_pipeline_run_status(
run_id=self.run_id,
Expand All @@ -184,7 +184,7 @@ def execute(self, context: "Context") -> None:
resource_group_name=self.resource_group_name,
factory_name=self.factory_name,
):
self.log.info(f"Pipeline run {self.run_id} has completed successfully.")
self.log.info("Pipeline run %s has completed successfully.", self.run_id)
else:
raise AzureDataFactoryPipelineRunException(
f"Pipeline run {self.run_id} has failed or has been cancelled."
Expand All @@ -207,6 +207,6 @@ def on_kill(self) -> None:
resource_group_name=self.resource_group_name,
factory_name=self.factory_name,
):
self.log.info(f"Pipeline run {self.run_id} has been cancelled successfully.")
self.log.info("Pipeline run %s has been cancelled successfully.", self.run_id)
else:
raise AzureDataFactoryPipelineRunException(f"Pipeline run {self.run_id} was not cancelled.")
9 changes: 6 additions & 3 deletions airflow/providers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,12 @@ def _verify_all_providers_all_compatible(self):
if min_version:
if packaging_version.parse(min_version) > packaging_version.parse(info.version):
log.warning(
f"The package {provider_id} is not compatible with this version of Airflow. "
f"The package has version {info.version} but the minimum supported version "
f"of the package is {min_version}"
"The package %s is not compatible with this version of Airflow. "
"The package has version %s but the minimum supported version "
"of the package is %s",
provider_id,
info.version,
min_version,
)

@provider_info_cache("hooks")
Expand Down
2 changes: 1 addition & 1 deletion airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def configure_orm(disable_connection_pool=False):
data = result.fetchone()[0]
if data != 1:
log.critical("MSSQL database MUST have READ_COMMITTED_SNAPSHOT enabled.")
log.critical(f"The database {engine.url.database} has it disabled.")
log.critical("The database %s has it disabled.", engine.url.database)
log.critical("This will cause random deadlocks, Refusing to start.")
log.critical(
"See https://airflow.apache.org/docs/apache-airflow/stable/howto/"
Expand Down
Loading