Skip to content

Commit

Permalink
Adjust log levels in OpenLineage provider (#34801)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
  • Loading branch information
JDarDagran committed Oct 11, 2023
1 parent e239dcf commit 73dd877
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 29 deletions.
6 changes: 3 additions & 3 deletions airflow/providers/openlineage/extractors/base.py
Expand Up @@ -82,7 +82,7 @@ def extract(self) -> OperatorLineage | None:
self.operator.__class__.__module__ + "." + self.operator.__class__.__name__
)
if fully_qualified_class_name in self.disabled_operators:
self.log.warning(
self.log.debug(
f"Skipping extraction for operator {self.operator.task_type} "
"due to its presence in [openlineage] openlineage_disabled_for_operators."
)
Expand Down Expand Up @@ -116,7 +116,7 @@ def _execute_extraction(self) -> OperatorLineage | None:
)
return None
except AttributeError:
self.log.warning(
self.log.debug(
f"Operator {self.operator.task_type} does not have the "
"get_openlineage_facets_on_start method."
)
Expand Down Expand Up @@ -149,5 +149,5 @@ def _get_openlineage_facets(self, get_facets_method, *args) -> OperatorLineage |
"This should not happen."
)
except Exception:
self.log.exception("OpenLineage provider method failed to extract data from provider. ")
self.log.warning("OpenLineage provider method failed to extract data from provider. ")
return None
7 changes: 4 additions & 3 deletions airflow/providers/openlineage/extractors/manager.py
Expand Up @@ -101,7 +101,7 @@ def extract_metadata(self, dagrun, task, complete: bool = False, task_instance=N
return task_metadata

except Exception as e:
self.log.exception(
self.log.warning(
"Failed to extract metadata using found extractor %s - %s %s", extractor, e, task_info
)
else:
Expand Down Expand Up @@ -157,7 +157,8 @@ def extract_inlets_and_outlets(
inlets: list,
outlets: list,
):
self.log.debug("Manually extracting lineage metadata from inlets and outlets")
if inlets or outlets:
self.log.debug("Manually extracting lineage metadata from inlets and outlets")
for i in inlets:
d = self.convert_to_ol_dataset(i)
if d:
Expand Down Expand Up @@ -193,5 +194,5 @@ def validate_task_metadata(self, task_metadata) -> OperatorLineage | None:
job_facets=task_metadata.job_facets,
)
except AttributeError:
self.log.error("Extractor returns non-valid metadata: %s", task_metadata)
self.log.warning("Extractor returns non-valid metadata: %s", task_metadata)
return None
2 changes: 1 addition & 1 deletion airflow/providers/openlineage/extractors/python.py
Expand Up @@ -87,7 +87,7 @@ def get_source_code(self, callable: Callable) -> str | None:
# Trying to extract source code of builtin_function_or_method
return str(callable)
except OSError:
self.log.exception(
self.log.warning(
"Can't get source code facet of PythonOperator %s",
self.operator.task_id,
)
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/openlineage/plugins/adapter.py
Expand Up @@ -20,7 +20,6 @@
import uuid
from typing import TYPE_CHECKING

import requests.exceptions
import yaml
from openlineage.client import OpenLineageClient, set_producer
from openlineage.client.facet import (
Expand Down Expand Up @@ -115,8 +114,9 @@ def emit(self, event: RunEvent):
redacted_event: RunEvent = self._redacter.redact(event, max_depth=20) # type: ignore[assignment]
try:
return self._client.emit(redacted_event)
except requests.exceptions.RequestException:
self.log.exception(f"Failed to emit OpenLineage event of id {event.run.runId}")
except Exception as e:
self.log.warning("Failed to emit OpenLineage event of id %s", event.run.runId)
self.log.debug("OpenLineage emission failure: %s", e)

def start_task(
self,
Expand Down
12 changes: 6 additions & 6 deletions airflow/providers/openlineage/plugins/listener.py
Expand Up @@ -28,7 +28,7 @@
get_airflow_run_facet,
get_custom_facets,
get_job_name,
print_exception,
print_warning,
)
from airflow.utils.timeout import timeout

Expand Down Expand Up @@ -65,7 +65,7 @@ def on_task_instance_running(
task = task_instance.task
dag = task.dag

@print_exception
@print_warning(self.log)
def on_running():
# that's a workaround to detect task running from deferred state
# we return here because Airflow 2.3 needs task from deferred state
Expand Down Expand Up @@ -117,7 +117,7 @@ def on_task_instance_success(self, previous_state, task_instance: TaskInstance,
task.task_id, task_instance.execution_date, task_instance.try_number - 1
)

@print_exception
@print_warning(self.log)
def on_success():
task_metadata = self.extractor_manager.extract_metadata(
dagrun, task, complete=True, task_instance=task_instance
Expand Down Expand Up @@ -145,7 +145,7 @@ def on_task_instance_failed(self, previous_state, task_instance: TaskInstance, s
task.task_id, task_instance.execution_date, task_instance.try_number - 1
)

@print_exception
@print_warning(self.log)
def on_failure():
task_metadata = self.extractor_manager.extract_metadata(
dagrun, task, complete=True, task_instance=task_instance
Expand Down Expand Up @@ -194,14 +194,14 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str):
@hookimpl
def on_dag_run_success(self, dag_run: DagRun, msg: str):
if not self.executor:
self.log.error("Executor have not started before `on_dag_run_success`")
self.log.debug("Executor have not started before `on_dag_run_success`")
return
self.executor.submit(self.adapter.dag_success, dag_run=dag_run, msg=msg)

@hookimpl
def on_dag_run_failed(self, dag_run: DagRun, msg: str):
if not self.executor:
self.log.error("Executor have not started before `on_dag_run_failed`")
self.log.debug("Executor have not started before `on_dag_run_failed`")
return
self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg)

Expand Down
4 changes: 0 additions & 4 deletions airflow/providers/openlineage/utils/sql.py
Expand Up @@ -16,7 +16,6 @@
# under the License.
from __future__ import annotations

import logging
from collections import defaultdict
from contextlib import closing
from enum import IntEnum
Expand All @@ -34,9 +33,6 @@
from airflow.hooks.base import BaseHook


logger = logging.getLogger(__name__)


class ColumnIndex(IntEnum):
"""Enumerates the indices of columns in information schema view."""

Expand Down
21 changes: 12 additions & 9 deletions airflow/providers/openlineage/utils/utils.py
Expand Up @@ -397,15 +397,18 @@ def _is_name_redactable(name, redacted):
return name not in redacted.skip_redact


def print_exception(f):
@wraps(f)
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception as e:
log.exception(e)

return wrapper
def print_warning(log):
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception as e:
log.warning(e)

return wrapper

return decorator


@cache
Expand Down

0 comments on commit 73dd877

Please sign in to comment.