Skip to content

Commit

Permalink
Refactor: Remove useless str() calls (#33629)
Browse files Browse the repository at this point in the history
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
  • Loading branch information
eumiro and uranusjr committed Aug 24, 2023
1 parent 1953648 commit 85acbb4
Show file tree
Hide file tree
Showing 51 changed files with 91 additions and 97 deletions.
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/get_code.py
Expand Up @@ -37,5 +37,5 @@ def get_code(dag_id: str) -> str:
try:
return DagCode.get_code_by_fileloc(dag.fileloc)
except (OSError, DagCodeNotFound) as exception:
error_message = f"Error {str(exception)} while reading Dag id {dag_id} Code"
error_message = f"Error {exception} while reading Dag id {dag_id} Code"
raise AirflowException(error_message, exception)
36 changes: 18 additions & 18 deletions airflow/auth/managers/fab/security_manager/modules/db.py
Expand Up @@ -86,7 +86,7 @@ def create_db(self):
if self.count_users() == 0 and self.auth_role_public != self.auth_role_admin:
log.warning(const.LOGMSG_WAR_SEC_NO_USER)
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_CREATE_DB.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_CREATE_DB.format(e))
exit(1)

"""
Expand All @@ -106,7 +106,7 @@ def update_role(self, role_id, name: str) -> Role | None:
self.get_session.commit()
log.info(const.LOGMSG_INF_SEC_UPD_ROLE.format(role))
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_UPD_ROLE.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_UPD_ROLE.format(e))
self.get_session.rollback()
return None
return role
Expand All @@ -123,7 +123,7 @@ def add_role(self, name: str) -> Role:
log.info(const.LOGMSG_INF_SEC_ADD_ROLE.format(name))
return role
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_ROLE.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_ADD_ROLE.format(e))
self.get_session.rollback()
return role

Expand Down Expand Up @@ -190,7 +190,7 @@ def add_user(
log.info(const.LOGMSG_INF_SEC_ADD_USER.format(username))
return user
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_USER.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_ADD_USER.format(e))
self.get_session.rollback()
return False

Expand Down Expand Up @@ -226,7 +226,7 @@ def add_register_user(self, username, first_name, last_name, email, password="",
self.get_session.commit()
return register_user
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_REGISTER_USER.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_ADD_REGISTER_USER.format(e))
self.get_session.rollback()
return None

Expand Down Expand Up @@ -269,7 +269,7 @@ def update_user(self, user):
self.get_session.commit()
log.info(const.LOGMSG_INF_SEC_UPD_USER.format(user))
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_UPD_USER.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_UPD_USER.format(e))
self.get_session.rollback()
return False

Expand All @@ -284,7 +284,7 @@ def del_register_user(self, register_user):
self.get_session.commit()
return True
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_DEL_REGISTER_USER.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_DEL_REGISTER_USER.format(e))
self.get_session.rollback()
return False

Expand Down Expand Up @@ -322,7 +322,7 @@ def create_action(self, name):
self.get_session.commit()
return action
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_PERMISSION.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_ADD_PERMISSION.format(e))
self.get_session.rollback()
return action

Expand All @@ -349,7 +349,7 @@ def delete_action(self, name: str) -> bool:
self.get_session.commit()
return True
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_DEL_PERMISSION.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_DEL_PERMISSION.format(e))
self.get_session.rollback()
return False

Expand Down Expand Up @@ -383,7 +383,7 @@ def create_resource(self, name) -> Resource:
self.get_session.commit()
return resource
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_VIEWMENU.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_ADD_VIEWMENU.format(e))
self.get_session.rollback()
return resource

Expand Down Expand Up @@ -419,7 +419,7 @@ def delete_resource(self, name: str) -> bool:
self.get_session.commit()
return True
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_DEL_PERMISSION.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_DEL_PERMISSION.format(e))
self.get_session.rollback()
return False

Expand Down Expand Up @@ -481,10 +481,10 @@ def create_permission(self, action_name, resource_name) -> Permission | None:
try:
self.get_session.add(perm)
self.get_session.commit()
log.info(const.LOGMSG_INF_SEC_ADD_PERMVIEW.format(str(perm)))
log.info(const.LOGMSG_INF_SEC_ADD_PERMVIEW.format(perm))
return perm
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_PERMVIEW.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_ADD_PERMVIEW.format(e))
self.get_session.rollback()
return None

Expand Down Expand Up @@ -518,7 +518,7 @@ def delete_permission(self, action_name: str, resource_name: str) -> None:
self.delete_action(perm.action.name)
log.info(const.LOGMSG_INF_SEC_DEL_PERMVIEW.format(action_name, resource_name))
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_DEL_PERMVIEW.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_DEL_PERMVIEW.format(e))
self.get_session.rollback()

def add_permission_to_role(self, role: Role, permission: Permission | None) -> None:
Expand All @@ -534,9 +534,9 @@ def add_permission_to_role(self, role: Role, permission: Permission | None) -> N
role.permissions.append(permission)
self.get_session.merge(role)
self.get_session.commit()
log.info(const.LOGMSG_INF_SEC_ADD_PERMROLE.format(str(permission), role.name))
log.info(const.LOGMSG_INF_SEC_ADD_PERMROLE.format(permission, role.name))
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_PERMROLE.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_ADD_PERMROLE.format(e))
self.get_session.rollback()

def remove_permission_from_role(self, role: Role, permission: Permission) -> None:
Expand All @@ -551,7 +551,7 @@ def remove_permission_from_role(self, role: Role, permission: Permission) -> Non
role.permissions.remove(permission)
self.get_session.merge(role)
self.get_session.commit()
log.info(const.LOGMSG_INF_SEC_DEL_PERMROLE.format(str(permission), role.name))
log.info(const.LOGMSG_INF_SEC_DEL_PERMROLE.format(permission, role.name))
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_DEL_PERMROLE.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_DEL_PERMROLE.format(e))
self.get_session.rollback()
2 changes: 1 addition & 1 deletion airflow/cli/commands/task_command.py
Expand Up @@ -548,7 +548,7 @@ def task_states_for_dag_run(args, session: Session = NEW_SESSION) -> None:
select(DagRun).where(DagRun.execution_date == execution_date, DagRun.dag_id == args.dag_id)
)
except (ParserError, TypeError) as err:
raise AirflowException(f"Error parsing the supplied execution_date. Error: {str(err)}")
raise AirflowException(f"Error parsing the supplied execution_date. Error: {err}")

if dag_run is None:
raise DagRunNotFound(
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_bash_operator.py
Expand Up @@ -50,7 +50,7 @@

for i in range(3):
task = BashOperator(
task_id="runme_" + str(i),
task_id=f"runme_{i}",
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
)
task >> run_this
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_short_circuit_decorator.py
Expand Up @@ -32,8 +32,8 @@ def example_short_circuit_decorator():
def check_condition(condition):
return condition

ds_true = [EmptyOperator(task_id="true_" + str(i)) for i in [1, 2]]
ds_false = [EmptyOperator(task_id="false_" + str(i)) for i in [1, 2]]
ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]

condition_is_true = check_condition.override(task_id="condition_is_true")(condition=True)
condition_is_false = check_condition.override(task_id="condition_is_false")(condition=False)
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_short_circuit_operator.py
Expand Up @@ -42,8 +42,8 @@
python_callable=lambda: False,
)

ds_true = [EmptyOperator(task_id="true_" + str(i)) for i in [1, 2]]
ds_false = [EmptyOperator(task_id="false_" + str(i)) for i in [1, 2]]
ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]

chain(cond_true, *ds_true)
chain(cond_false, *ds_false)
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/base_executor.py
Expand Up @@ -308,7 +308,7 @@ def change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=None
try:
self.running.remove(key)
except KeyError:
self.log.debug("Could not find key: %s", str(key))
self.log.debug("Could not find key: %s", key)
self.event_buffer[key] = state, info

def fail(self, key: TaskInstanceKey, info=None) -> None:
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/debug_executor.py
Expand Up @@ -92,7 +92,7 @@ def _run_task(self, ti: TaskInstance) -> bool:
except Exception as e:
ti.set_state(TaskInstanceState.FAILED)
self.change_state(key, TaskInstanceState.FAILED)
self.log.exception("Failed to execute task: %s.", str(e))
self.log.exception("Failed to execute task: %s.", e)
return False

def queue_task_instance(
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/local_executor.py
Expand Up @@ -99,7 +99,7 @@ def _execute_work_in_subprocess(self, command: CommandType) -> TaskInstanceState
subprocess.check_call(command, close_fds=True)
return TaskInstanceState.SUCCESS
except subprocess.CalledProcessError as e:
self.log.error("Failed to execute task %s.", str(e))
self.log.error("Failed to execute task %s.", e)
return TaskInstanceState.FAILED

def _execute_work_in_fork(self, command: CommandType) -> TaskInstanceState:
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/sequential_executor.py
Expand Up @@ -78,7 +78,7 @@ def sync(self) -> None:
self.change_state(key, TaskInstanceState.SUCCESS)
except subprocess.CalledProcessError as e:
self.change_state(key, TaskInstanceState.FAILED)
self.log.error("Failed to execute task %s.", str(e))
self.log.error("Failed to execute task %s.", e)

self.commands_to_run = []

Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/job.py
Expand Up @@ -153,7 +153,7 @@ def kill(self, session: Session = NEW_SESSION) -> NoReturn:
try:
self.on_kill()
except Exception as e:
self.log.error("on_kill() method failed: %s", str(e))
self.log.error("on_kill() method failed: %s", e)
session.merge(job)
session.commit()
raise AirflowException("Job shut down externally.")
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/local_task_job_runner.py
Expand Up @@ -285,7 +285,7 @@ def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
else:
dagrun_timeout = None
if dagrun_timeout and execution_time > dagrun_timeout:
self.log.warning("DagRun timed out after %s.", str(execution_time))
self.log.warning("DagRun timed out after %s.", execution_time)

# potential race condition, the _run_raw_task commits `success` or other state
# but task_runner does not exit right away due to slow process shutdown or any other reasons
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/scheduler_job_runner.py
Expand Up @@ -899,7 +899,7 @@ def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION)
if callback_to_run:
self._send_dag_callbacks_to_processor(dag, callback_to_run)
except Exception as e: # should not fail the scheduler
self.log.exception("Failed to update dag run state for paused dags due to %s", str(e))
self.log.exception("Failed to update dag run state for paused dags due to %s", e)

def _run_scheduler_loop(self) -> None:
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/policies.py
Expand Up @@ -171,7 +171,7 @@ def _make_shim_fn(name, desired_sig, target):
#
codestr = textwrap.dedent(
f"""
def {name}_name_mismatch_shim{str(desired_sig)}:
def {name}_name_mismatch_shim{desired_sig}:
return __target({' ,'.join(desired_sig.parameters)})
"""
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/hooks/dynamodb.py
Expand Up @@ -68,4 +68,4 @@ def write_batch_data(self, items: Iterable) -> bool:
batch.put_item(Item=item)
return True
except Exception as general_error:
raise AirflowException(f"Failed to insert items in dynamodb, error: {str(general_error)}")
raise AirflowException(f"Failed to insert items in dynamodb, error: {general_error}")
Expand Up @@ -99,7 +99,7 @@ def _read(self, task_instance, try_number, metadata=None):
except Exception as e:
log = (
f"*** Unable to read remote logs from Cloudwatch (log_group: {self.log_group}, log_stream: "
f"{stream_name})\n*** {str(e)}\n\n"
f"{stream_name})\n*** {e}\n\n"
)
self.log.error(log)
local_log, metadata = super()._read(task_instance, try_number, metadata)
Expand Down
4 changes: 1 addition & 3 deletions airflow/providers/amazon/aws/sensors/sqs.py
Expand Up @@ -202,9 +202,7 @@ def poke(self, context: Context):
response = self.hook.conn.delete_message_batch(QueueUrl=self.sqs_queue, Entries=entries)

if "Successful" not in response:
raise AirflowException(
"Delete SQS Messages failed " + str(response) + " for messages " + str(messages)
)
raise AirflowException(f"Delete SQS Messages failed {response} for messages {messages}")
if not len(message_batch):
return False

Expand Down
4 changes: 1 addition & 3 deletions airflow/providers/amazon/aws/triggers/sqs.py
Expand Up @@ -149,9 +149,7 @@ async def poke(self, client: Any):
response = await client.delete_message_batch(QueueUrl=self.sqs_queue, Entries=entries)

if "Successful" not in response:
raise AirflowException(
f"Delete SQS Messages failed {str(response)} for messages {str(messages)}"
)
raise AirflowException(f"Delete SQS Messages failed {response} for messages {messages}")

return message_batch

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/livy/triggers/livy.py
Expand Up @@ -101,7 +101,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
{
"status": "error",
"batch_id": self._batch_id,
"response": f"Batch {self._batch_id} did not succeed with {str(exc)}",
"response": f"Batch {self._batch_id} did not succeed with {exc}",
"log_lines": None,
}
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/arangodb/hooks/arangodb.py
Expand Up @@ -108,7 +108,7 @@ def query(self, query, **kwargs) -> Cursor:
f"Failed to execute AQLQuery, error connecting to database: {self.database}"
)
except AQLQueryExecuteError as error:
raise AirflowException(f"Failed to execute AQLQuery, error: {str(error)}")
raise AirflowException(f"Failed to execute AQLQuery, error: {error}")

def create_collection(self, name):
if not self.db_conn.has_collection(name):
Expand Down
Expand Up @@ -516,7 +516,7 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li
if log:
messages.append("Found logs through kube API")
except Exception as e:
messages.append(f"Reading from k8s pod logs failed: {str(e)}")
messages.append(f"Reading from k8s pod logs failed: {e}")
return messages, ["\n".join(log)]

def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
Expand Down
8 changes: 3 additions & 5 deletions airflow/providers/dbt/cloud/hooks/dbt.py
Expand Up @@ -254,7 +254,7 @@ async def get_job_status(
Valid values are "trigger", "job", "repository", and "environment".
"""
try:
self.log.info("Getting the status of job run %s.", str(run_id))
self.log.info("Getting the status of job run %s.", run_id)
response = await self.get_job_details(
run_id, account_id=account_id, include_related=include_related
)
Expand Down Expand Up @@ -490,14 +490,12 @@ def get_job_run_status(self, run_id: int, account_id: int | None = None) -> int:
:param account_id: Optional. The ID of a dbt Cloud account.
:return: The status of a dbt Cloud job run.
"""
self.log.info("Getting the status of job run %s.", str(run_id))
self.log.info("Getting the status of job run %s.", run_id)

job_run = self.get_job_run(account_id=account_id, run_id=run_id)
job_run_status = job_run.json()["data"]["status"]

self.log.info(
"Current status of job run %s: %s", str(run_id), DbtCloudJobRunStatus(job_run_status).name
)
self.log.info("Current status of job run %s: %s", run_id, DbtCloudJobRunStatus(job_run_status).name)

return job_run_status

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/ftp/sensors/ftp.py
Expand Up @@ -73,10 +73,10 @@ def poke(self, context: Context) -> bool:
self.log.info("Poking for %s", self.path)
try:
mod_time = hook.get_mod_time(self.path)
self.log.info("Found File %s last modified: %s", str(self.path), str(mod_time))
self.log.info("Found File %s last modified: %s", self.path, mod_time)

except ftplib.error_perm as e:
self.log.error("Ftp error encountered: %s", str(e))
self.log.error("Ftp error encountered: %s", e)
error_code = self._get_error_code(e)
if (error_code != 550) and (
self.fail_on_transient_errors or (error_code not in self.transient_errors)
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/github/operators/github.py
Expand Up @@ -74,6 +74,6 @@ def execute(self, context: Context) -> Any:
return github_result

except GithubException as github_error:
raise AirflowException(f"Failed to execute GithubOperator, error: {str(github_error)}")
raise AirflowException(f"Failed to execute GithubOperator, error: {github_error}")
except Exception as e:
raise AirflowException(f"GitHub operator error: {str(e)}")
raise AirflowException(f"GitHub operator error: {e}")
4 changes: 2 additions & 2 deletions airflow/providers/github/sensors/github.py
Expand Up @@ -136,9 +136,9 @@ def tag_checker(self, repo: Any) -> bool | None:
result = self.tag_name in all_tags

except GithubException as github_error: # type: ignore[misc]
raise AirflowException(f"Failed to execute GithubSensor, error: {str(github_error)}")
raise AirflowException(f"Failed to execute GithubSensor, error: {github_error}")
except Exception as e:
raise AirflowException(f"GitHub operator error: {str(e)}")
raise AirflowException(f"GitHub operator error: {e}")

if result is True:
self.log.info("Tag %s exists in %s repository, Success.", self.tag_name, self.repository_name)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/log/gcs_task_handler.py
Expand Up @@ -247,7 +247,7 @@ def gcs_write(self, log, remote_log_location) -> bool:
pass
else:
log += self._add_message(
f"Error checking for previous log; if exists, may be overwritten: {str(e)}"
f"Error checking for previous log; if exists, may be overwritten: {e}"
)
self.log.warning("Error checking for previous log: %s", e)
try:
Expand Down

0 comments on commit 85acbb4

Please sign in to comment.