Skip to content

Commit

Permalink
Lazy load operator extra links (apache#7327) (apache#10318)
Browse files Browse the repository at this point in the history
Co-authored-by: Kamil Breguła <mik-laj@users.noreply.github.com>
Backported from apache#7327
cherry-picked from b180e4b
  • Loading branch information
turbaszek authored and Chris Fei committed Mar 5, 2021
1 parent 5bc93ec commit 17dd3e6
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 38 deletions.
4 changes: 1 addition & 3 deletions airflow/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@

def _integrate_plugins():
"""Integrate plugins to the context"""
from airflow.plugins_manager import operators_modules, register_inbuilt_operator_links
from airflow.plugins_manager import operators_modules
for operators_module in operators_modules:
sys.modules[operators_module.__name__] = operators_module
globals()[operators_module._name] = operators_module
Expand All @@ -121,5 +121,3 @@ def _integrate_plugins():
"import from 'airflow.operators.[plugin_module]' "
"instead. Support for direct imports will be dropped "
"entirely in Airflow 2.0.".format(i=operator_name))

register_inbuilt_operator_links()
29 changes: 1 addition & 28 deletions airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import logging
import os
import re
from typing import Any, Dict, List, Set, Type
from typing import Any, Dict, List, Type

import pkg_resources

Expand Down Expand Up @@ -114,33 +114,6 @@ def load_entrypoint_plugins(entry_points, airflow_plugins):
return airflow_plugins


def register_inbuilt_operator_links():
"""
Register all the Operators Links that are already defined for the operators
in the "airflow" project. Example: QDSLink (Operator Link for Qubole Operator)
This is required to populate the "allowed list" of allowed classes when deserializing operator links
"""
inbuilt_operator_links = set() # type: Set[Type]

try:
from airflow.contrib.operators.bigquery_operator import BigQueryConsoleLink, BigQueryConsoleIndexableLink # noqa E501 # pylint: disable=R0401,line-too-long
inbuilt_operator_links.update([BigQueryConsoleLink, BigQueryConsoleIndexableLink])
except ImportError:
pass

try:
from airflow.contrib.operators.qubole_operator import QDSLink # pylint: disable=R0401
inbuilt_operator_links.update([QDSLink])
except ImportError:
pass

registered_operator_link_classes.update({
"{}.{}".format(link.__module__, link.__name__): link
for link in inbuilt_operator_links
})


def is_valid_plugin(plugin_obj, existing_plugins):
"""
Check whether a potential object is a subclass of
Expand Down
27 changes: 20 additions & 7 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import enum
import logging
import six
from typing import TYPE_CHECKING, Optional, Union, Dict
from typing import TYPE_CHECKING, Optional, Union, Dict, List

import cattr
import pendulum
Expand All @@ -36,6 +36,7 @@
from airflow.serialization.helpers import serialize_template_field
from airflow.serialization.json_schema import Validator, load_dag_schema
from airflow.settings import json
from airflow.utils.module_loading import import_string
from airflow.www.utils import get_python_source

try:
Expand All @@ -49,6 +50,17 @@
log = logging.getLogger(__name__)


BUILTIN_OPERATOR_EXTRA_LINKS = [
"airflow.contrib.operators.bigquery_operator.BigQueryConsoleLink",
"airflow.contrib.operators.bigquery_operator.BigQueryConsoleIndexableLink",
"airflow.contrib.operators.qubole_operator.QDSLink",
# providers new paths
"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink",
"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink",
"airflow.providers.qubole.operators.qubole.QDSLink"
] # type: List[str]


class BaseSerialization:
"""BaseSerialization provides utils for serialization."""

Expand Down Expand Up @@ -459,15 +471,16 @@ def _deserialize_operator_extra_links(
# list(_operator_links_source.items())[0] =
# ('airflow.gcp.operators.bigquery.BigQueryConsoleIndexableLink', {'index': 0})

_operator_link_class, data = list(_operator_links_source.items())[0]

if _operator_link_class in registered_operator_link_classes:
single_op_link_class_name = registered_operator_link_classes[_operator_link_class]
_operator_link_class_path, data = list(_operator_links_source.items())[0]
if _operator_link_class_path in BUILTIN_OPERATOR_EXTRA_LINKS:
single_op_link_class = import_string(_operator_link_class_path)
elif _operator_link_class_path in registered_operator_link_classes:
single_op_link_class = registered_operator_link_classes[_operator_link_class_path]
else:
raise KeyError("Operator Link class %r not registered" % _operator_link_class)
raise KeyError("Operator Link class %r not registered" % _operator_link_class_path)

op_predefined_extra_link = cattr.structure(
data, single_op_link_class_name) # type: BaseOperatorLink
data, single_op_link_class) # type: BaseOperatorLink

op_predefined_extra_links.update(
{op_predefined_extra_link.name: op_predefined_extra_link}
Expand Down

0 comments on commit 17dd3e6

Please sign in to comment.