Skip to content

Commit

Permalink
Replace repr() with proper formatting (#33520)
Browse files Browse the repository at this point in the history
  • Loading branch information
eumiro committed Aug 20, 2023
1 parent a91ee7a commit abef61f
Show file tree
Hide file tree
Showing 14 changed files with 31 additions and 38 deletions.
12 changes: 6 additions & 6 deletions airflow/cli/commands/db_command.py
Expand Up @@ -45,15 +45,15 @@ def initdb(args):
"airflow connections create-default-connections to create the default connections",
DeprecationWarning,
)
print("DB: " + repr(settings.engine.url))
print(f"DB: {settings.engine.url!r}")
db.initdb()
print("Initialization done")


@providers_configuration_loaded
def resetdb(args):
"""Reset the metadata database."""
print("DB: " + repr(settings.engine.url))
print(f"DB: {settings.engine.url!r}")
if not (args.yes or input("This will drop existing tables if they exist. Proceed? (y/n)").upper() == "Y"):
raise SystemExit("Cancelled")
db.resetdb(skip_init=args.skip_init)
Expand All @@ -69,7 +69,7 @@ def upgradedb(args):
@providers_configuration_loaded
def migratedb(args):
"""Migrates the metadata database."""
print("DB: " + repr(settings.engine.url))
print(f"DB: {settings.engine.url!r}")
if args.to_revision and args.to_version:
raise SystemExit("Cannot supply both `--to-revision` and `--to-version`.")
if args.from_version and args.from_revision:
Expand Down Expand Up @@ -97,7 +97,7 @@ def migratedb(args):
to_revision = args.to_revision

if not args.show_sql_only:
print("Performing upgrade to the metadata database " + repr(settings.engine.url))
print(f"Performing upgrade to the metadata database {settings.engine.url!r}")
else:
print("Generating sql for upgrade -- upgrade commands will *not* be submitted.")

Expand Down Expand Up @@ -139,7 +139,7 @@ def downgrade(args):
elif args.to_revision:
to_revision = args.to_revision
if not args.show_sql_only:
print("Performing downgrade with database " + repr(settings.engine.url))
print(f"Performing downgrade with database {settings.engine.url!r}")
else:
print("Generating sql for downgrade -- downgrade commands will *not* be submitted.")

Expand Down Expand Up @@ -170,7 +170,7 @@ def check_migrations(args):
def shell(args):
"""Run a shell that allows to access metadata database."""
url = settings.engine.url
print("DB: " + repr(url))
print(f"DB: {url!r}")

if url.get_backend_name() == "mysql":
with NamedTemporaryFile(suffix="my.cnf") as f:
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/variable_command.py
Expand Up @@ -90,7 +90,7 @@ def variables_import(args):
try:
Variable.set(k, v, serialize_json=not isinstance(v, str))
except Exception as e:
print(f"Variable import failed: {repr(e)}")
print(f"Variable import failed: {e!r}")
fail_count += 1
else:
suc_count += 1
Expand Down
4 changes: 2 additions & 2 deletions airflow/jobs/backfill_job_runner.py
Expand Up @@ -1036,8 +1036,8 @@ def query(result, items):

reset_tis = helpers.reduce_in_chunks(query, tis_to_reset, [], self.job.max_tis_per_query)

task_instance_str = "\n\t".join(repr(x) for x in reset_tis)
task_instance_str = "\n".join(f"\t{x!r}" for x in reset_tis)
session.flush()

self.log.info("Reset the following %s TaskInstances:\n\t%s", len(reset_tis), task_instance_str)
self.log.info("Reset the following %s TaskInstances:\n%s", len(reset_tis), task_instance_str)
return len(reset_tis)
10 changes: 4 additions & 6 deletions airflow/jobs/scheduler_job_runner.py
Expand Up @@ -410,10 +410,8 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) -
break

# Put one task instance on each line
task_instance_str = "\n\t".join(repr(x) for x in task_instances_to_examine)
self.log.info(
"%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str
)
task_instance_str = "\n".join(f"\t{x!r}" for x in task_instances_to_examine)
self.log.info("%s tasks up for execution:\n%s", len(task_instances_to_examine), task_instance_str)

for task_instance in task_instances_to_examine:
pool_name = task_instance.pool
Expand Down Expand Up @@ -591,8 +589,8 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) -
Stats.gauge("scheduler.tasks.executable", len(executable_tis))

if len(executable_tis) > 0:
task_instance_str = "\n\t".join(repr(x) for x in executable_tis)
self.log.info("Setting the following tasks to queued state:\n\t%s", task_instance_str)
task_instance_str = "\n".join(f"\t{x!r}" for x in executable_tis)
self.log.info("Setting the following tasks to queued state:\n%s", task_instance_str)

# set TIs to queued state
filter_for_tis = TI.filter_for_tis(executable_tis)
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/apache/hive/hooks/hive.py
Expand Up @@ -923,8 +923,8 @@ def _get_results(
description = cur.description
if previous_description and previous_description != description:
message = f"""The statements are producing different descriptions:
Current: {repr(description)}
Previous: {repr(previous_description)}"""
Current: {description!r}
Previous: {previous_description!r}"""
raise ValueError(message)
elif not previous_description:
previous_description = description
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/daskexecutor/executors/dask_executor.py
Expand Up @@ -113,7 +113,7 @@ def _process_future(self, future: Future) -> None:
if future.done():
key = self.futures[future]
if future.exception():
self.log.error("Failed to execute task: %s", repr(future.exception()))
self.log.error("Failed to execute task: %r", future.exception())
self.fail(key)
elif future.cancelled():
self.log.error("Failed to execute task")
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/docker/operators/docker.py
Expand Up @@ -408,7 +408,7 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> list[
)
elif result["StatusCode"] != 0:
joined_log_lines = "\n".join(log_lines)
raise AirflowException(f"Docker container failed: {repr(result)} lines {joined_log_lines}")
raise AirflowException(f"Docker container failed: {result!r} lines {joined_log_lines}")

if self.retrieve_output:
return self._attempt_to_retrieve_result()
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/docker/operators/docker_swarm.py
Expand Up @@ -161,7 +161,7 @@ def _run_service(self) -> None:
if self.service and self._service_status() != "complete":
if self.auto_remove == "success":
self.cli.remove_service(self.service["ID"])
raise AirflowException("Service did not complete: " + repr(self.service))
raise AirflowException(f"Service did not complete: {self.service!r}")
elif self.auto_remove == "success":
if not self.service:
raise Exception("The 'service' should be initialized before!")
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/grpc/operators/grpc.py
Expand Up @@ -91,6 +91,6 @@ def execute(self, context: Context) -> None:

def _handle_response(self, response: Any, context: Context) -> None:
if self.log_response:
self.log.info(repr(response))
self.log.info("%r", response)
if self.response_callback:
self.response_callback(response, context)
9 changes: 2 additions & 7 deletions airflow/providers/openlineage/utils/utils.py
Expand Up @@ -380,13 +380,8 @@ def _redact(self, item: Redactable, name: str | None, depth: int, max_depth: int
return item
else:
return super()._redact(item, name, depth, max_depth)
except Exception as e:
log.warning(
"Unable to redact %s. Error was: %s: %s",
repr(item),
type(e).__name__,
str(e),
)
except Exception as exc:
log.warning("Unable to redact %r. Error was: %s: %s", item, type(exc).__name__, exc)
return item


Expand Down
10 changes: 5 additions & 5 deletions airflow/utils/log/secrets_masker.py
Expand Up @@ -277,13 +277,13 @@ def _redact(self, item: Redactable, name: str | None, depth: int, max_depth: int
# I think this should never happen, but it does not hurt to leave it just in case
# Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373)
# but it caused infinite recursion, so we need to cast it to str first.
except Exception as e:
except Exception as exc:
log.warning(
"Unable to redact %s, please report this via <https://github.com/apache/airflow/issues>. "
"Unable to redact %r, please report this via <https://github.com/apache/airflow/issues>. "
"Error was: %s: %s",
repr(item),
type(e).__name__,
str(e),
item,
type(exc).__name__,
exc,
)
return item

Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/sqlalchemy.py
Expand Up @@ -69,7 +69,7 @@ def process_bind_param(self, value, dialect):
if not isinstance(value, datetime.datetime):
if value is None:
return None
raise TypeError("expected datetime.datetime, not " + repr(value))
raise TypeError(f"expected datetime.datetime, not {value!r}")
elif value.tzinfo is None:
raise ValueError("naive datetime is disallowed")
elif dialect.name == "mysql":
Expand Down
4 changes: 2 additions & 2 deletions airflow/www/views.py
Expand Up @@ -5137,8 +5137,8 @@ def varimport(self):
for k, v in variable_dict.items():
try:
models.Variable.set(k, v, serialize_json=not isinstance(v, str))
except Exception as e:
logging.info("Variable import failed: %s", repr(e))
except Exception as exc:
logging.info("Variable import failed: %r", exc)
fail_count += 1
else:
suc_count += 1
Expand Down
4 changes: 2 additions & 2 deletions tests/providers/grpc/operators/test_grpc.py
Expand Up @@ -77,8 +77,8 @@ def test_execute_with_log(self, mock_hook):
mock_hook.assert_called_once_with("grpc_default", interceptors=None, custom_connection_func=None)
mocked_hook.run.assert_called_once_with(StubClass, "stream_call", data={}, streaming=False)
mock_info.assert_any_call("Calling gRPC service")
mock_info.assert_any_call("'value1'")
mock_info.assert_any_call("'value2'")
mock_info.assert_any_call("%r", "value1")
mock_info.assert_any_call("%r", "value2")

@mock.patch("airflow.providers.grpc.operators.grpc.GrpcHook")
def test_execute_with_callback(self, mock_hook):
Expand Down

0 comments on commit abef61f

Please sign in to comment.