Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6706] Lazy load operator extra links (#7327) #10318

Merged
merged 1 commit into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 1 addition & 3 deletions airflow/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@

def _integrate_plugins():
"""Integrate plugins to the context"""
from airflow.plugins_manager import operators_modules, register_inbuilt_operator_links
from airflow.plugins_manager import operators_modules
for operators_module in operators_modules:
sys.modules[operators_module.__name__] = operators_module
globals()[operators_module._name] = operators_module
Expand All @@ -121,5 +121,3 @@ def _integrate_plugins():
"import from 'airflow.operators.[plugin_module]' "
"instead. Support for direct imports will be dropped "
"entirely in Airflow 2.0.".format(i=operator_name))

register_inbuilt_operator_links()
29 changes: 1 addition & 28 deletions airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import logging
import os
import re
from typing import Any, Dict, List, Set, Type
from typing import Any, Dict, List, Type

import pkg_resources

Expand Down Expand Up @@ -114,33 +114,6 @@ def load_entrypoint_plugins(entry_points, airflow_plugins):
return airflow_plugins


def register_inbuilt_operator_links():
"""
Register all the Operators Links that are already defined for the operators
in the "airflow" project. Example: QDSLink (Operator Link for Qubole Operator)

This is required to populate the "allowed list" of allowed classes when deserializing operator links
"""
inbuilt_operator_links = set() # type: Set[Type]

try:
from airflow.contrib.operators.bigquery_operator import BigQueryConsoleLink, BigQueryConsoleIndexableLink # noqa E501 # pylint: disable=R0401,line-too-long
inbuilt_operator_links.update([BigQueryConsoleLink, BigQueryConsoleIndexableLink])
except ImportError:
pass

try:
from airflow.contrib.operators.qubole_operator import QDSLink # pylint: disable=R0401
inbuilt_operator_links.update([QDSLink])
except ImportError:
pass

registered_operator_link_classes.update({
"{}.{}".format(link.__module__, link.__name__): link
for link in inbuilt_operator_links
})


def is_valid_plugin(plugin_obj, existing_plugins):
"""
Check whether a potential object is a subclass of
Expand Down
27 changes: 20 additions & 7 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import enum
import logging
import six
from typing import TYPE_CHECKING, Optional, Union, Dict
from typing import TYPE_CHECKING, Optional, Union, Dict, List

import cattr
import pendulum
Expand All @@ -36,6 +36,7 @@
from airflow.serialization.helpers import serialize_template_field
from airflow.serialization.json_schema import Validator, load_dag_schema
from airflow.settings import json
from airflow.utils.module_loading import import_string
from airflow.www.utils import get_python_source

try:
Expand All @@ -49,6 +50,17 @@
log = logging.getLogger(__name__)


BUILTIN_OPERATOR_EXTRA_LINKS = [
"airflow.contrib.operators.bigquery_operator.BigQueryConsoleLink",
"airflow.contrib.operators.bigquery_operator.BigQueryConsoleIndexableLink",
"airflow.contrib.operators.qubole_operator.QDSLink",
# providers new paths
"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink",
"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink",
"airflow.providers.qubole.operators.qubole.QDSLink"
] # type: List[str]


class BaseSerialization:
"""BaseSerialization provides utils for serialization."""

Expand Down Expand Up @@ -459,15 +471,16 @@ def _deserialize_operator_extra_links(
# list(_operator_links_source.items())[0] =
# ('airflow.gcp.operators.bigquery.BigQueryConsoleIndexableLink', {'index': 0})

_operator_link_class, data = list(_operator_links_source.items())[0]

if _operator_link_class in registered_operator_link_classes:
single_op_link_class_name = registered_operator_link_classes[_operator_link_class]
_operator_link_class_path, data = list(_operator_links_source.items())[0]
if _operator_link_class_path in BUILTIN_OPERATOR_EXTRA_LINKS:
single_op_link_class = import_string(_operator_link_class_path)
elif _operator_link_class_path in registered_operator_link_classes:
single_op_link_class = registered_operator_link_classes[_operator_link_class_path]
else:
raise KeyError("Operator Link class %r not registered" % _operator_link_class)
raise KeyError("Operator Link class %r not registered" % _operator_link_class_path)

op_predefined_extra_link = cattr.structure(
data, single_op_link_class_name) # type: BaseOperatorLink
data, single_op_link_class) # type: BaseOperatorLink

op_predefined_extra_links.update(
{op_predefined_extra_link.name: op_predefined_extra_link}
Expand Down