diff --git a/airflow/cli/commands/db_command.py b/airflow/cli/commands/db_command.py index dad068547c23e..be7dff62d74a3 100644 --- a/airflow/cli/commands/db_command.py +++ b/airflow/cli/commands/db_command.py @@ -45,7 +45,7 @@ def initdb(args): "airflow connections create-default-connections to create the default connections", DeprecationWarning, ) - print("DB: " + repr(settings.engine.url)) + print(f"DB: {settings.engine.url!r}") db.initdb() print("Initialization done") @@ -53,7 +53,7 @@ def initdb(args): @providers_configuration_loaded def resetdb(args): """Reset the metadata database.""" - print("DB: " + repr(settings.engine.url)) + print(f"DB: {settings.engine.url!r}") if not (args.yes or input("This will drop existing tables if they exist. Proceed? (y/n)").upper() == "Y"): raise SystemExit("Cancelled") db.resetdb(skip_init=args.skip_init) @@ -69,7 +69,7 @@ def upgradedb(args): @providers_configuration_loaded def migratedb(args): """Migrates the metadata database.""" - print("DB: " + repr(settings.engine.url)) + print(f"DB: {settings.engine.url!r}") if args.to_revision and args.to_version: raise SystemExit("Cannot supply both `--to-revision` and `--to-version`.") if args.from_version and args.from_revision: @@ -97,7 +97,7 @@ def migratedb(args): to_revision = args.to_revision if not args.show_sql_only: - print("Performing upgrade to the metadata database " + repr(settings.engine.url)) + print(f"Performing upgrade to the metadata database {settings.engine.url!r}") else: print("Generating sql for upgrade -- upgrade commands will *not* be submitted.") @@ -139,7 +139,7 @@ def downgrade(args): elif args.to_revision: to_revision = args.to_revision if not args.show_sql_only: - print("Performing downgrade with database " + repr(settings.engine.url)) + print(f"Performing downgrade with database {settings.engine.url!r}") else: print("Generating sql for downgrade -- downgrade commands will *not* be submitted.") @@ -170,7 +170,7 @@ def check_migrations(args): def shell(args): """Run a shell that allows to access metadata database.""" url = settings.engine.url - print("DB: " + repr(url)) + print(f"DB: {url!r}") if url.get_backend_name() == "mysql": with NamedTemporaryFile(suffix="my.cnf") as f: diff --git a/airflow/cli/commands/variable_command.py b/airflow/cli/commands/variable_command.py index eb3d353e41b17..db0e92de282c1 100644 --- a/airflow/cli/commands/variable_command.py +++ b/airflow/cli/commands/variable_command.py @@ -90,7 +90,7 @@ def variables_import(args): try: Variable.set(k, v, serialize_json=not isinstance(v, str)) except Exception as e: - print(f"Variable import failed: {repr(e)}") + print(f"Variable import failed: {e!r}") fail_count += 1 else: suc_count += 1 diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 2508ba69ab3b7..0e39fb0e0246f 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -1036,8 +1036,8 @@ def query(result, items): reset_tis = helpers.reduce_in_chunks(query, tis_to_reset, [], self.job.max_tis_per_query) - task_instance_str = "\n\t".join(repr(x) for x in reset_tis) + task_instance_str = "\n".join(f"\t{x!r}" for x in reset_tis) session.flush() - self.log.info("Reset the following %s TaskInstances:\n\t%s", len(reset_tis), task_instance_str) + self.log.info("Reset the following %s TaskInstances:\n%s", len(reset_tis), task_instance_str) return len(reset_tis) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index ffcaf4f1a384e..947fec0a7b4b3 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -410,10 +410,8 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - break # Put one task instance on each line - task_instance_str = "\n\t".join(repr(x) for x in task_instances_to_examine) - self.log.info( - "%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str - ) + task_instance_str = "\n".join(f"\t{x!r}" for x in task_instances_to_examine) + self.log.info("%s tasks up for execution:\n%s", len(task_instances_to_examine), task_instance_str) for task_instance in task_instances_to_examine: pool_name = task_instance.pool @@ -591,8 +589,8 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - Stats.gauge("scheduler.tasks.executable", len(executable_tis)) if len(executable_tis) > 0: - task_instance_str = "\n\t".join(repr(x) for x in executable_tis) - self.log.info("Setting the following tasks to queued state:\n\t%s", task_instance_str) + task_instance_str = "\n".join(f"\t{x!r}" for x in executable_tis) + self.log.info("Setting the following tasks to queued state:\n%s", task_instance_str) # set TIs to queued state filter_for_tis = TI.filter_for_tis(executable_tis) diff --git a/airflow/providers/apache/hive/hooks/hive.py b/airflow/providers/apache/hive/hooks/hive.py index a57429f830ef1..71a60cd6d3ff2 100644 --- a/airflow/providers/apache/hive/hooks/hive.py +++ b/airflow/providers/apache/hive/hooks/hive.py @@ -923,8 +923,8 @@ def _get_results( description = cur.description if previous_description and previous_description != description: message = f"""The statements are producing different descriptions: - Current: {repr(description)} - Previous: {repr(previous_description)}""" + Current: {description!r} + Previous: {previous_description!r}""" raise ValueError(message) elif not previous_description: previous_description = description diff --git a/airflow/providers/daskexecutor/executors/dask_executor.py b/airflow/providers/daskexecutor/executors/dask_executor.py index 1998e7c7df126..3e93f0161fc99 100644 --- a/airflow/providers/daskexecutor/executors/dask_executor.py +++ b/airflow/providers/daskexecutor/executors/dask_executor.py @@ -113,7 +113,7 @@ def _process_future(self, future: Future) -> None: if future.done(): key = self.futures[future] if future.exception(): - self.log.error("Failed to execute task: %s", repr(future.exception())) + self.log.error("Failed to execute task: %r", future.exception()) self.fail(key) elif future.cancelled(): self.log.error("Failed to execute task") diff --git a/airflow/providers/docker/operators/docker.py b/airflow/providers/docker/operators/docker.py index 7683530a4cfc7..70fc299421c78 100644 --- a/airflow/providers/docker/operators/docker.py +++ b/airflow/providers/docker/operators/docker.py @@ -408,7 +408,7 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> list[ ) elif result["StatusCode"] != 0: joined_log_lines = "\n".join(log_lines) - raise AirflowException(f"Docker container failed: {repr(result)} lines {joined_log_lines}") + raise AirflowException(f"Docker container failed: {result!r} lines {joined_log_lines}") if self.retrieve_output: return self._attempt_to_retrieve_result() diff --git a/airflow/providers/docker/operators/docker_swarm.py b/airflow/providers/docker/operators/docker_swarm.py index 4ee5872239ad7..9a44a3174988c 100644 --- a/airflow/providers/docker/operators/docker_swarm.py +++ b/airflow/providers/docker/operators/docker_swarm.py @@ -161,7 +161,7 @@ def _run_service(self) -> None: if self.service and self._service_status() != "complete": if self.auto_remove == "success": self.cli.remove_service(self.service["ID"]) - raise AirflowException("Service did not complete: " + repr(self.service)) + raise AirflowException(f"Service did not complete: {self.service!r}") elif self.auto_remove == "success": if not self.service: raise Exception("The 'service' should be initialized before!") diff --git a/airflow/providers/grpc/operators/grpc.py b/airflow/providers/grpc/operators/grpc.py index 9e7d77894cff6..294bf8584b3c4 100644 --- a/airflow/providers/grpc/operators/grpc.py +++ b/airflow/providers/grpc/operators/grpc.py @@ -91,6 +91,6 @@ def execute(self, context: Context) -> None: def _handle_response(self, response: Any, context: Context) -> None: if self.log_response: - self.log.info(repr(response)) + self.log.info("%r", response) if self.response_callback: self.response_callback(response, context) diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index 3ff7610c4f1e7..07484d994ded2 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -380,13 +380,8 @@ def _redact(self, item: Redactable, name: str | None, depth: int, max_depth: int return item else: return super()._redact(item, name, depth, max_depth) - except Exception as e: - log.warning( - "Unable to redact %s. Error was: %s: %s", - repr(item), - type(e).__name__, - str(e), - ) + except Exception as exc: + log.warning("Unable to redact %r. Error was: %s: %s", item, type(exc).__name__, exc) return item diff --git a/airflow/utils/log/secrets_masker.py b/airflow/utils/log/secrets_masker.py index efd612c04da2e..a0f0da847c1b5 100644 --- a/airflow/utils/log/secrets_masker.py +++ b/airflow/utils/log/secrets_masker.py @@ -277,13 +277,13 @@ def _redact(self, item: Redactable, name: str | None, depth: int, max_depth: int # I think this should never happen, but it does not hurt to leave it just in case # Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373) # but it caused infinite recursion, so we need to cast it to str first. - except Exception as e: + except Exception as exc: log.warning( - "Unable to redact %s, please report this via . " + "Unable to redact %r, please report this via . " "Error was: %s: %s", - repr(item), - type(e).__name__, - str(e), + item, + type(exc).__name__, + exc, ) return item diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py index b4b726ca327ab..f97570b94c429 100644 --- a/airflow/utils/sqlalchemy.py +++ b/airflow/utils/sqlalchemy.py @@ -69,7 +69,7 @@ def process_bind_param(self, value, dialect): if not isinstance(value, datetime.datetime): if value is None: return None - raise TypeError("expected datetime.datetime, not " + repr(value)) + raise TypeError(f"expected datetime.datetime, not {value!r}") elif value.tzinfo is None: raise ValueError("naive datetime is disallowed") elif dialect.name == "mysql": diff --git a/airflow/www/views.py b/airflow/www/views.py index 05a88115dd76b..951ed496da646 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -5137,8 +5137,8 @@ def varimport(self): for k, v in variable_dict.items(): try: models.Variable.set(k, v, serialize_json=not isinstance(v, str)) - except Exception as e: - logging.info("Variable import failed: %s", repr(e)) + except Exception as exc: + logging.info("Variable import failed: %r", exc) fail_count += 1 else: suc_count += 1 diff --git a/tests/providers/grpc/operators/test_grpc.py b/tests/providers/grpc/operators/test_grpc.py index 62540bbfeae22..053866037652b 100644 --- a/tests/providers/grpc/operators/test_grpc.py +++ b/tests/providers/grpc/operators/test_grpc.py @@ -77,8 +77,8 @@ def test_execute_with_log(self, mock_hook): mock_hook.assert_called_once_with("grpc_default", interceptors=None, custom_connection_func=None) mocked_hook.run.assert_called_once_with(StubClass, "stream_call", data={}, streaming=False) mock_info.assert_any_call("Calling gRPC service") - mock_info.assert_any_call("'value1'") - mock_info.assert_any_call("'value2'") + mock_info.assert_any_call("%r", "value1") + mock_info.assert_any_call("%r", "value2") @mock.patch("airflow.providers.grpc.operators.grpc.GrpcHook") def test_execute_with_callback(self, mock_hook):