diff --git a/airflow/providers/openlineage/extractors/base.py b/airflow/providers/openlineage/extractors/base.py index ae6e267f20e51..3e18bb2399f21 100644 --- a/airflow/providers/openlineage/extractors/base.py +++ b/airflow/providers/openlineage/extractors/base.py @@ -82,7 +82,7 @@ def extract(self) -> OperatorLineage | None: self.operator.__class__.__module__ + "." + self.operator.__class__.__name__ ) if fully_qualified_class_name in self.disabled_operators: - self.log.warning( + self.log.debug( f"Skipping extraction for operator {self.operator.task_type} " "due to its presence in [openlineage] openlineage_disabled_for_operators." ) @@ -116,7 +116,7 @@ def _execute_extraction(self) -> OperatorLineage | None: ) return None except AttributeError: - self.log.warning( + self.log.debug( f"Operator {self.operator.task_type} does not have the " "get_openlineage_facets_on_start method." ) @@ -149,5 +149,5 @@ def _get_openlineage_facets(self, get_facets_method, *args) -> OperatorLineage | "This should not happen." ) except Exception: - self.log.exception("OpenLineage provider method failed to extract data from provider. ") + self.log.warning("OpenLineage provider method failed to extract data from provider. ") return None diff --git a/airflow/providers/openlineage/extractors/manager.py b/airflow/providers/openlineage/extractors/manager.py index 17a8923f5b21f..480ffc9f39981 100644 --- a/airflow/providers/openlineage/extractors/manager.py +++ b/airflow/providers/openlineage/extractors/manager.py @@ -101,7 +101,7 @@ def extract_metadata(self, dagrun, task, complete: bool = False, task_instance=N return task_metadata except Exception as e: - self.log.exception( + self.log.warning( "Failed to extract metadata using found extractor %s - %s %s", extractor, e, task_info ) else: @@ -157,7 +157,8 @@ def extract_inlets_and_outlets( inlets: list, outlets: list, ): - self.log.debug("Manually extracting lineage metadata from inlets and outlets") + if inlets or outlets: + self.log.debug("Manually extracting lineage metadata from inlets and outlets") for i in inlets: d = self.convert_to_ol_dataset(i) if d: @@ -193,5 +194,5 @@ def validate_task_metadata(self, task_metadata) -> OperatorLineage | None: job_facets=task_metadata.job_facets, ) except AttributeError: - self.log.error("Extractor returns non-valid metadata: %s", task_metadata) + self.log.warning("Extractor returns non-valid metadata: %s", task_metadata) return None diff --git a/airflow/providers/openlineage/extractors/python.py b/airflow/providers/openlineage/extractors/python.py index e3aa6ab33b359..56e4e19f1f25d 100644 --- a/airflow/providers/openlineage/extractors/python.py +++ b/airflow/providers/openlineage/extractors/python.py @@ -87,7 +87,7 @@ def get_source_code(self, callable: Callable) -> str | None: # Trying to extract source code of builtin_function_or_method return str(callable) except OSError: - self.log.exception( + self.log.warning( "Can't get source code facet of PythonOperator %s", self.operator.task_id, ) diff --git a/airflow/providers/openlineage/plugins/adapter.py b/airflow/providers/openlineage/plugins/adapter.py index f5037a8559c8d..fabb14eaa3413 100644 --- a/airflow/providers/openlineage/plugins/adapter.py +++ b/airflow/providers/openlineage/plugins/adapter.py @@ -20,7 +20,6 @@ import uuid from typing import TYPE_CHECKING -import requests.exceptions import yaml from openlineage.client import OpenLineageClient, set_producer from openlineage.client.facet import ( @@ -115,8 +114,9 @@ def emit(self, event: RunEvent): redacted_event: RunEvent = self._redacter.redact(event, max_depth=20) # type: ignore[assignment] try: return self._client.emit(redacted_event) - except requests.exceptions.RequestException: - self.log.exception(f"Failed to emit OpenLineage event of id {event.run.runId}") + except Exception as e: + self.log.warning("Failed to emit OpenLineage event of id %s", event.run.runId) + self.log.debug("OpenLineage emission failure: %s", e) def start_task( self, diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 7d0be0e843f70..7810fcf257801 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -28,7 +28,7 @@ get_airflow_run_facet, get_custom_facets, get_job_name, - print_exception, + print_warning, ) from airflow.utils.timeout import timeout @@ -65,7 +65,7 @@ def on_task_instance_running( task = task_instance.task dag = task.dag - @print_exception + @print_warning(self.log) def on_running(): # that's a workaround to detect task running from deferred state # we return here because Airflow 2.3 needs task from deferred state @@ -117,7 +117,7 @@ def on_task_instance_success(self, previous_state, task_instance: TaskInstance, task.task_id, task_instance.execution_date, task_instance.try_number - 1 ) - @print_exception + @print_warning(self.log) def on_success(): task_metadata = self.extractor_manager.extract_metadata( dagrun, task, complete=True, task_instance=task_instance @@ -145,7 +145,7 @@ def on_task_instance_failed(self, previous_state, task_instance: TaskInstance, s task.task_id, task_instance.execution_date, task_instance.try_number - 1 ) - @print_exception + @print_warning(self.log) def on_failure(): task_metadata = self.extractor_manager.extract_metadata( dagrun, task, complete=True, task_instance=task_instance @@ -194,14 +194,14 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str): @hookimpl def on_dag_run_success(self, dag_run: DagRun, msg: str): if not self.executor: - self.log.error("Executor have not started before `on_dag_run_success`") + self.log.debug("Executor have not started before `on_dag_run_success`") return self.executor.submit(self.adapter.dag_success, dag_run=dag_run, msg=msg) @hookimpl def on_dag_run_failed(self, dag_run: DagRun, msg: str): if not self.executor: - self.log.error("Executor have not started before `on_dag_run_failed`") + self.log.debug("Executor have not started before `on_dag_run_failed`") return self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg) diff --git a/airflow/providers/openlineage/utils/sql.py b/airflow/providers/openlineage/utils/sql.py index b31d8da2408ab..f0bfbff6f5e63 100644 --- a/airflow/providers/openlineage/utils/sql.py +++ b/airflow/providers/openlineage/utils/sql.py @@ -16,7 +16,6 @@ # under the License. from __future__ import annotations -import logging from collections import defaultdict from contextlib import closing from enum import IntEnum @@ -34,9 +33,6 @@ from airflow.hooks.base import BaseHook -logger = logging.getLogger(__name__) - - class ColumnIndex(IntEnum): """Enumerates the indices of columns in information schema view.""" diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index 837c5eb615e9b..7c0ea6506ae04 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -397,15 +397,18 @@ def _is_name_redactable(name, redacted): return name not in redacted.skip_redact -def print_exception(f): - @wraps(f) - def wrapper(*args, **kwargs): - try: - return f(*args, **kwargs) - except Exception as e: - log.exception(e) - - return wrapper +def print_warning(log): + def decorator(f): + @wraps(f) + def wrapper(*args, **kwargs): + try: + return f(*args, **kwargs) + except Exception as e: + log.warning(e) + + return wrapper + + return decorator @cache