Skip to content

Commit

Permalink
Log instead of raise an Error for unregistered OperatorLinks (#11959)
Browse files Browse the repository at this point in the history
Currently, if someone uses OperatorLinks that are not registered,
it will break the UI when someone clicks on that DAG.

This commit will instead log an error in the Webserver logs so that
someone can still see the DAG in different Views (graph, tree, etc).

(cherry picked from commit 44f6e6f)
  • Loading branch information
kaxil committed Nov 18, 2020
1 parent d2c423a commit 5b65c17
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
3 changes: 2 additions & 1 deletion airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ def _deserialize_operator_extra_links(
elif _operator_link_class_path in registered_operator_link_classes:
single_op_link_class = registered_operator_link_classes[_operator_link_class_path]
else:
raise KeyError("Operator Link class %r not registered" % _operator_link_class_path)
log.error("Operator Link class %r not registered", _operator_link_class_path)
return {}

op_predefined_extra_link = cattr.structure(
data, single_op_link_class) # type: BaseOperatorLink
Expand Down
37 changes: 36 additions & 1 deletion tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

from airflow.hooks.base_hook import BaseHook
from airflow.models import DAG, Connection, DagBag, TaskInstance
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
from airflow.operators.bash_operator import BashOperator
from airflow.serialization.json_schema import load_dag_schema_dict
from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG
Expand Down Expand Up @@ -560,6 +560,41 @@ def test_extra_serialized_field_and_operator_links(self):
google_link_from_plugin = simple_task.get_extra_links(test_date, GoogleLink.name)
self.assertEqual("https://www.google.com", google_link_from_plugin)

@unittest.skipIf(six.PY2, 'self.assertLogs not available for Python 2')
def test_extra_operator_links_logs_error_for_non_registered_extra_links(self):
"""
Assert OperatorLinks not registered via Plugins and if it is not an inbuilt Operator Link,
it can still deserialize the DAG (does not error) but just logs an error
"""

class TaskStateLink(BaseOperatorLink):
"""OperatorLink not registered via Plugins nor a built-in OperatorLink"""
name = 'My Link'

def get_link(self, operator, dttm):
return 'https://www.google.com'

class MyOperator(BaseOperator):
"""Just a DummyOperator using above defined Extra Operator Link"""
operator_extra_links = [TaskStateLink()]

def execute(self, context):
pass

with DAG(dag_id='simple_dag', start_date=datetime(2019, 8, 1)) as dag:
MyOperator(task_id='blah')

serialized_dag = SerializedDAG.to_dict(dag)

with self.assertLogs("airflow.serialization.serialized_objects", level="ERROR") as log_output:
SerializedDAG.from_dict(serialized_dag)
received_logs = log_output.output[0]
expected_err_msg = (
"Operator Link class 'tests.serialization.test_dag_serialization.TaskStateLink' "
"not registered"
)
assert expected_err_msg in received_logs

def test_extra_serialized_field_and_multiple_operator_links(self):
"""
Assert extra field exists & OperatorLinks defined in Plugins and inbuilt Operator Links.
Expand Down

0 comments on commit 5b65c17

Please sign in to comment.