Skip to content

Commit

Permalink
Avoid non-recommended usage of logging (apache#37792)
Browse files Browse the repository at this point in the history
  • Loading branch information
Taragolis authored and utkarsharma2 committed Apr 22, 2024
1 parent 5b4b22e commit 23b6749
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 19 deletions.
2 changes: 1 addition & 1 deletion airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def _handle_dag_file_processing():
# gets sent to logs and logs are sent to stdout, this leads to an infinite loop. This
# necessitates this conditional based on the value of DAG_PROCESSOR_LOG_TARGET.
with redirect_stdout(StreamLogWriter(log, logging.INFO)), redirect_stderr(
StreamLogWriter(log, logging.WARN)
StreamLogWriter(log, logging.WARNING)
), Stats.timer() as timer:
_handle_dag_file_processing()
log.info("Processing %s took %.3f seconds", file_path, timer.duration)
Expand Down
6 changes: 2 additions & 4 deletions airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,11 +681,10 @@ def _per_task_process(key, ti: TaskInstance, session):
try:
session.commit()
except OperationalError:
self.log.error(
self.log.exception(
"Failed to commit task state due to operational error. "
"The job will retry this operation so if your backfill succeeds, "
"you can safely ignore this message.",
exc_info=True,
)
session.rollback()
if i == max_attempts - 1:
Expand Down Expand Up @@ -986,10 +985,9 @@ def _execute(self, session: Session = NEW_SESSION) -> None:
# state to failed.
self._set_unfinished_dag_runs_to_failed(ti_status.active_runs)
except OperationalError:
self.log.error(
self.log.exception(
"Backfill job dead-locked. The job will retry the job so it is likely "
"to heal itself. If your backfill succeeds you can ignore this exception.",
exc_info=True,
)
raise
finally:
Expand Down
8 changes: 6 additions & 2 deletions airflow/models/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,15 @@ def check_for_write_conflict(key: str) -> None:
try:
var_val = secrets_backend.get_variable(key=key)
if var_val is not None:
_backend_name = type(secrets_backend).__name__
log.warning(
"The variable {key} is defined in the {cls} secrets backend, which takes "
"The variable %s is defined in the %s secrets backend, which takes "
"precedence over reading from the database. The value in the database will be "
"updated, but to read it you have to delete the conflicting variable "
"from {cls}".format(key=key, cls=secrets_backend.__class__.__name__)
"from %s",
key,
_backend_name,
_backend_name,
)
return
except Exception:
Expand Down
10 changes: 5 additions & 5 deletions airflow/providers/amazon/aws/operators/sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,13 @@ def _extract_s3_dataset_identifiers(self, processing_inputs, processing_outputs)
for processing_input in processing_inputs:
inputs.append(self.path_to_s3_dataset(processing_input["S3Input"]["S3Uri"]))
except KeyError:
self.log.exception("Cannot find S3 input details", exc_info=True)
self.log.exception("Cannot find S3 input details")

try:
for processing_output in processing_outputs:
outputs.append(self.path_to_s3_dataset(processing_output["S3Output"]["S3Uri"]))
except KeyError:
self.log.exception("Cannot find S3 output details.", exc_info=True)
self.log.exception("Cannot find S3 output details.")
return inputs, outputs


Expand Down Expand Up @@ -777,15 +777,15 @@ def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:
try:
model_package_arn = self.serialized_model["PrimaryContainer"]["ModelPackageName"]
except KeyError:
self.log.error("Cannot find Model Package Name.", exc_info=True)
self.log.exception("Cannot find Model Package Name.")

try:
transform_input = self.serialized_transform["TransformInput"]["DataSource"]["S3DataSource"][
"S3Uri"
]
transform_output = self.serialized_transform["TransformOutput"]["S3OutputPath"]
except KeyError:
self.log.error("Cannot find some required input/output details.", exc_info=True)
self.log.exception("Cannot find some required input/output details.")

inputs = []

Expand Down Expand Up @@ -813,7 +813,7 @@ def _get_model_data_urls(self, model_package_arn) -> list:
for container in model_containers:
model_data_urls.append(container["ModelDataUrl"])
except KeyError:
self.log.exception("Cannot retrieve model details.", exc_info=True)
self.log.exception("Cannot retrieve model details.")

return model_data_urls

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,9 @@ def sync(self) -> None:
self.kube_scheduler.run_next(task)
self.task_publish_retries.pop(key, None)
except PodReconciliationError as e:
self.log.error(
self.log.exception(
"Pod reconciliation failed, likely due to kubernetes library upgrade. "
"Try clearing the task to re-run.",
exc_info=True,
)
self.fail(task[0], e)
except ApiException as e:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ def execute(self, context: Context) -> dict:
except AirflowException as ae_inner:
# We could get any number of failures here, including cluster not found and we
# can just ignore to ensure we surface the original cluster create failure
self.log.error(ae_inner, exc_info=True)
self.log.exception(ae_inner)
finally:
raise ae

Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

from airflow.models import Base

logger = logging.getLogger(__file__)
logger = logging.getLogger(__name__)

ARCHIVE_TABLE_PREFIX = "_airflow_deleted__"

Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/log/task_context_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def warn(self, msg: str, *args, ti: TaskInstance):
:param msg: the message to relay to task context log
:param ti: the task instance
"""
self._log(logging.WARN, msg, *args, ti=ti)
self._log(logging.WARNING, msg, *args, ti=ti)

def warning(self, msg: str, *args, ti: TaskInstance):
"""
Expand Down
10 changes: 8 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1303,8 +1303,14 @@ namespace-packages = ["airflow/providers"]
[tool.ruff.lint]
typing-modules = ["airflow.typing_compat"]
extend-select = [
# Enable entire ruff rule section
"I", # Missing required import (auto-fixable)
"UP", # Pyupgrade
"ISC", # Checks for implicit literal string concatenation (auto-fixable)
"TCH", # Rules around TYPE_CHECKING blocks
"G", # flake8-logging-format rules
"LOG", # flake8-logging rules, most of them autofixable
# Per rule enables
"RUF100", # Unused noqa (auto-fixable)
# We ignore more pydocstyle than we enable, so be more selective at what we enable
"D101",
Expand All @@ -1321,10 +1327,8 @@ extend-select = [
"D403",
"D412",
"D419",
"TCH", # Rules around TYPE_CHECKING blocks
"TID251", # Specific modules or module members that may not be imported or accessed
"TID253", # Ban certain modules from being imported at module level
"ISC", # Checks for implicit literal string concatenation (auto-fixable)
"B006", # Checks for uses of mutable objects as function argument defaults.
"PT001",
"PT003",
Expand All @@ -1337,6 +1341,8 @@ extend-select = [
"PT027",
]
ignore = [
"G003", # Logging statement uses + (not fixed yet)
"G004", # Logging statement uses f-string (not fixed yet)
"D203",
"D212",
"D213",
Expand Down

0 comments on commit 23b6749

Please sign in to comment.