Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid non-recommended usage of logging #37792

Merged
merged 1 commit into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def _handle_dag_file_processing():
# gets sent to logs and logs are sent to stdout, this leads to an infinite loop. This
# necessitates this conditional based on the value of DAG_PROCESSOR_LOG_TARGET.
with redirect_stdout(StreamLogWriter(log, logging.INFO)), redirect_stderr(
StreamLogWriter(log, logging.WARN)
StreamLogWriter(log, logging.WARNING)
), Stats.timer() as timer:
_handle_dag_file_processing()
log.info("Processing %s took %.3f seconds", file_path, timer.duration)
Expand Down
6 changes: 2 additions & 4 deletions airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,11 +681,10 @@ def _per_task_process(key, ti: TaskInstance, session):
try:
session.commit()
except OperationalError:
self.log.error(
self.log.exception(
"Failed to commit task state due to operational error. "
"The job will retry this operation so if your backfill succeeds, "
"you can safely ignore this message.",
exc_info=True,
)
session.rollback()
if i == max_attempts - 1:
Expand Down Expand Up @@ -986,10 +985,9 @@ def _execute(self, session: Session = NEW_SESSION) -> None:
# state to failed.
self._set_unfinished_dag_runs_to_failed(ti_status.active_runs)
except OperationalError:
self.log.error(
self.log.exception(
"Backfill job dead-locked. The job will retry the job so it is likely "
"to heal itself. If your backfill succeeds you can ignore this exception.",
exc_info=True,
)
raise
finally:
Expand Down
8 changes: 6 additions & 2 deletions airflow/models/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,15 @@ def check_for_write_conflict(key: str) -> None:
try:
var_val = secrets_backend.get_variable(key=key)
if var_val is not None:
_backend_name = type(secrets_backend).__name__
log.warning(
"The variable {key} is defined in the {cls} secrets backend, which takes "
"The variable %s is defined in the %s secrets backend, which takes "
"precedence over reading from the database. The value in the database will be "
"updated, but to read it you have to delete the conflicting variable "
"from {cls}".format(key=key, cls=secrets_backend.__class__.__name__)
"from %s",
key,
_backend_name,
_backend_name,
)
return
except Exception:
Expand Down
10 changes: 5 additions & 5 deletions airflow/providers/amazon/aws/operators/sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,13 @@ def _extract_s3_dataset_identifiers(self, processing_inputs, processing_outputs)
for processing_input in processing_inputs:
inputs.append(self.path_to_s3_dataset(processing_input["S3Input"]["S3Uri"]))
except KeyError:
self.log.exception("Cannot find S3 input details", exc_info=True)
self.log.exception("Cannot find S3 input details")

try:
for processing_output in processing_outputs:
outputs.append(self.path_to_s3_dataset(processing_output["S3Output"]["S3Uri"]))
except KeyError:
self.log.exception("Cannot find S3 output details.", exc_info=True)
self.log.exception("Cannot find S3 output details.")
return inputs, outputs


Expand Down Expand Up @@ -777,15 +777,15 @@ def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:
try:
model_package_arn = self.serialized_model["PrimaryContainer"]["ModelPackageName"]
except KeyError:
self.log.error("Cannot find Model Package Name.", exc_info=True)
self.log.exception("Cannot find Model Package Name.")

try:
transform_input = self.serialized_transform["TransformInput"]["DataSource"]["S3DataSource"][
"S3Uri"
]
transform_output = self.serialized_transform["TransformOutput"]["S3OutputPath"]
except KeyError:
self.log.error("Cannot find some required input/output details.", exc_info=True)
self.log.exception("Cannot find some required input/output details.")

inputs = []

Expand Down Expand Up @@ -813,7 +813,7 @@ def _get_model_data_urls(self, model_package_arn) -> list:
for container in model_containers:
model_data_urls.append(container["ModelDataUrl"])
except KeyError:
self.log.exception("Cannot retrieve model details.", exc_info=True)
self.log.exception("Cannot retrieve model details.")

return model_data_urls

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,9 @@ def sync(self) -> None:
self.kube_scheduler.run_next(task)
self.task_publish_retries.pop(key, None)
except PodReconciliationError as e:
self.log.error(
self.log.exception(
"Pod reconciliation failed, likely due to kubernetes library upgrade. "
"Try clearing the task to re-run.",
exc_info=True,
)
self.fail(task[0], e)
except ApiException as e:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ def execute(self, context: Context) -> dict:
except AirflowException as ae_inner:
# We could get any number of failures here, including cluster not found and we
# can just ignore to ensure we surface the original cluster create failure
self.log.error(ae_inner, exc_info=True)
self.log.exception(ae_inner)
finally:
raise ae

Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

from airflow.models import Base

logger = logging.getLogger(__file__)
logger = logging.getLogger(__name__)

ARCHIVE_TABLE_PREFIX = "_airflow_deleted__"

Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/log/task_context_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def warn(self, msg: str, *args, ti: TaskInstance):
:param msg: the message to relay to task context log
:param ti: the task instance
"""
self._log(logging.WARN, msg, *args, ti=ti)
self._log(logging.WARNING, msg, *args, ti=ti)

def warning(self, msg: str, *args, ti: TaskInstance):
"""
Expand Down
10 changes: 8 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1303,8 +1303,14 @@ namespace-packages = ["airflow/providers"]
[tool.ruff.lint]
typing-modules = ["airflow.typing_compat"]
extend-select = [
# Enable entire ruff rule section
"I", # Missing required import (auto-fixable)
"UP", # Pyupgrade
"ISC", # Checks for implicit literal string concatenation (auto-fixable)
"TCH", # Rules around TYPE_CHECKING blocks
"G", # flake8-logging-format rules
"LOG", # flake8-logging rules, most of them autofixable
# Per rule enables
"RUF100", # Unused noqa (auto-fixable)
# We ignore more pydocstyle than we enable, so be more selective at what we enable
"D101",
Expand All @@ -1321,10 +1327,8 @@ extend-select = [
"D403",
"D412",
"D419",
"TCH", # Rules around TYPE_CHECKING blocks
"TID251", # Specific modules or module members that may not be imported or accessed
"TID253", # Ban certain modules from being imported at module level
"ISC", # Checks for implicit literal string concatenation (auto-fixable)
"B006", # Checks for uses of mutable objects as function argument defaults.
"PT001",
"PT003",
Expand All @@ -1337,6 +1341,8 @@ extend-select = [
"PT027",
]
ignore = [
"G003", # Logging statement uses + (not fixed yet)
"G004", # Logging statement uses f-string (not fixed yet)
"D203",
"D212",
"D213",
Expand Down