Skip to content

Commit

Permalink
[AIRFLOW-3068] Remove deprecated imports
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko Driesprong authored and r39132 committed Sep 16, 2018
1 parent 7194c81 commit 0e5eee8
Show file tree
Hide file tree
Showing 9 changed files with 1 addition and 325 deletions.
13 changes: 0 additions & 13 deletions airflow/__init__.py
Expand Up @@ -79,16 +79,3 @@ class AirflowViewPlugin(BaseView):
class AirflowMacroPlugin(object):
def __init__(self, namespace):
self.namespace = namespace


from airflow import operators # noqa: E402
from airflow import sensors # noqa: E402
from airflow import hooks # noqa: E402
from airflow import executors # noqa: E402
from airflow import macros # noqa: E402

operators._integrate_plugins()
sensors._integrate_plugins() # noqa: E402
hooks._integrate_plugins()
executors._integrate_plugins()
macros._integrate_plugins()
51 changes: 0 additions & 51 deletions airflow/contrib/hooks/__init__.py
Expand Up @@ -17,54 +17,3 @@
# specific language governing permissions and limitations
# under the License.
#


# Contrib hooks are not imported by default. They should be accessed
# directly: from airflow.contrib.hooks.hook_module import Hook


import sys
import os

# ------------------------------------------------------------------------
#
# #TODO #FIXME Airflow 2.0
#
# Old import machinary below.
#
# This is deprecated but should be kept until Airflow 2.0
# for compatibility.
#
# ------------------------------------------------------------------------
_hooks = {
'docker_hook': ['DockerHook'],
'ftp_hook': ['FTPHook'],
'ftps_hook': ['FTPSHook'],
'vertica_hook': ['VerticaHook'],
'ssh_hook': ['SSHHook'],
'winrm_hook': ['WinRMHook'],
'sftp_hook': ['SFTPHook'],
'bigquery_hook': ['BigQueryHook'],
'qubole_hook': ['QuboleHook'],
'gcs_hook': ['GoogleCloudStorageHook'],
'datastore_hook': ['DatastoreHook'],
'gcp_cloudml_hook': ['CloudMLHook'],
'redshift_hook': ['RedshiftHook'],
'gcp_dataproc_hook': ['DataProcHook'],
'gcp_dataflow_hook': ['DataFlowHook'],
'spark_submit_operator': ['SparkSubmitOperator'],
'cloudant_hook': ['CloudantHook'],
'fs_hook': ['FSHook'],
'wasb_hook': ['WasbHook'],
'gcp_pubsub_hook': ['PubSubHook'],
'jenkins_hook': ['JenkinsHook'],
'aws_dynamodb_hook': ['AwsDynamoDBHook'],
'azure_data_lake_hook': ['AzureDataLakeHook'],
'azure_fileshare_hook': ['AzureFileShareHook'],
}


if not os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
from airflow.utils.helpers import AirflowImporter

airflow_importer = AirflowImporter(sys.modules[__name__], _hooks)
34 changes: 0 additions & 34 deletions airflow/contrib/operators/__init__.py
Expand Up @@ -17,37 +17,3 @@
# specific language governing permissions and limitations
# under the License.
#


# Contrib operators are not imported by default. They should be accessed
# directly: from airflow.contrib.operators.operator_module import Operator


import sys
import os

# ------------------------------------------------------------------------
#
# #TODO #FIXME Airflow 2.0
#
# Old import machinary below.
#
# This is deprecated but should be kept until Airflow 2.0
# for compatibility.
#
# ------------------------------------------------------------------------
_operators = {
'ssh_operator': ['SSHOperator'],
'winrm_operator': ['WinRMOperator'],
'vertica_operator': ['VerticaOperator'],
'vertica_to_hive': ['VerticaToHiveTransfer'],
'qubole_operator': ['QuboleOperator'],
'spark_submit_operator': ['SparkSubmitOperator'],
'file_to_wasb': ['FileToWasbOperator'],
'fs_operator': ['FileSensor'],
'hive_to_dynamodb': ['HiveToDynamoDBTransferOperator']
}

if not os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
from airflow.utils.helpers import AirflowImporter
airflow_importer = AirflowImporter(sys.modules[__name__], _operators)
2 changes: 1 addition & 1 deletion airflow/contrib/operators/mlengine_operator.py
Expand Up @@ -19,7 +19,7 @@

from airflow.contrib.hooks.gcp_mlengine_hook import MLEngineHook
from airflow.exceptions import AirflowException
from airflow.operators import BaseOperator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.log.logging_mixin import LoggingMixin

Expand Down
74 changes: 0 additions & 74 deletions airflow/operators/__init__.py
Expand Up @@ -16,77 +16,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import sys
import os
from airflow.models import BaseOperator # noqa: F401

# ------------------------------------------------------------------------
#
# #TODO #FIXME Airflow 2.0
#
# Old import machinary below.
#
# This is deprecated but should be kept until Airflow 2.0
# for compatibility.
#
# ------------------------------------------------------------------------

# Imports operators dynamically while keeping the package API clean,
# abstracting the underlying modules

_operators = {
'bash_operator': ['BashOperator'],
'check_operator': [
'CheckOperator',
'ValueCheckOperator',
'IntervalCheckOperator',
],
'python_operator': [
'PythonOperator',
'BranchPythonOperator',
'ShortCircuitOperator',
],
'hive_operator': ['HiveOperator'],
'pig_operator': ['PigOperator'],
'presto_check_operator': [
'PrestoCheckOperator',
'PrestoValueCheckOperator',
'PrestoIntervalCheckOperator',
],
'dagrun_operator': ['TriggerDagRunOperator'],
'dummy_operator': ['DummyOperator'],
'email_operator': ['EmailOperator'],
'hive_to_samba_operator': ['Hive2SambaOperator'],
'latest_only_operator': ['LatestOnlyOperator'],
'mysql_operator': ['MySqlOperator'],
'sqlite_operator': ['SqliteOperator'],
'mysql_to_hive': ['MySqlToHiveTransfer'],
'postgres_operator': ['PostgresOperator'],
'subdag_operator': ['SubDagOperator'],
'hive_stats_operator': ['HiveStatsCollectionOperator'],
's3_to_hive_operator': ['S3ToHiveTransfer'],
'hive_to_mysql': ['HiveToMySqlTransfer'],
'presto_to_mysql': ['PrestoToMySqlTransfer'],
's3_file_transform_operator': ['S3FileTransformOperator'],
'http_operator': ['SimpleHttpOperator'],
'hive_to_druid': ['HiveToDruidTransfer'],
'jdbc_operator': ['JdbcOperator'],
'mssql_operator': ['MsSqlOperator'],
'mssql_to_hive': ['MsSqlToHiveTransfer'],
'slack_operator': ['SlackAPIOperator', 'SlackAPIPostOperator'],
'generic_transfer': ['GenericTransfer'],
'oracle_operator': ['OracleOperator']
}

if not os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
from airflow.utils.helpers import AirflowImporter
airflow_importer = AirflowImporter(sys.modules[__name__], _operators)


def _integrate_plugins():
"""Integrate plugins to the context"""
from airflow.plugins_manager import operators_modules
for operators_module in operators_modules:
sys.modules[operators_module.__name__] = operators_module
globals()[operators_module._name] = operators_module
30 changes: 0 additions & 30 deletions airflow/sensors/__init__.py
Expand Up @@ -17,33 +17,3 @@
# specific language governing permissions and limitations
# under the License.
#
import sys
import os

_sensors = {
'base_sensor_operator': ['BaseSensorOperator'],
'external_task_sensor': ['ExternalTaskSensor'],
'hdfs_sensor': ['HdfsSensor'],
'hive_partition_sensor': ['HivePartitionSensor'],
'http_sensor': ['HttpSensor'],
'metastore_partition_sensor': ['MetastorePartitionSensor'],
'named_hive_partition_sensor': ['NamedHivePartitionSensor'],
's3_key_sensor': ['S3KeySensor'],
's3_prefix_sensor': ['S3PrefixSensor'],
'sql_sensor': ['SqlSensor'],
'time_delta_sensor': ['TimeDeltaSensor'],
'time_sensor': ['TimeSensor'],
'web_hdfs_sensor': ['WebHdfsSensor']
}

if not os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
from airflow.utils.helpers import AirflowImporter
airflow_importer = AirflowImporter(sys.modules[__name__], _sensors)


def _integrate_plugins():
"""Integrate plugins to the context"""
from airflow.plugins_manager import sensors_modules
for sensors_module in sensors_modules:
sys.modules[sensors_module.__name__] = sensors_module
globals()[sensors_module._name] = sensors_module
115 changes: 0 additions & 115 deletions airflow/utils/helpers.py
Expand Up @@ -259,118 +259,3 @@ def parse_template_string(template_string):
return None, Template(template_string)
else:
return template_string, None


class AirflowImporter(object):
"""
Importer that dynamically loads a class and module from its parent. This
allows Airflow to support ``from airflow.operators import BashOperator``
even though BashOperator is actually in
``airflow.operators.bash_operator``.
The importer also takes over for the parent_module by wrapping it. This is
required to support attribute-based usage:
.. code:: python
from airflow import operators
operators.BashOperator(...)
"""

def __init__(self, parent_module, module_attributes):
"""
:param parent_module: The string package name of the parent module. For
example, 'airflow.operators'
:type parent_module: str
:param module_attributes: The file to class mappings for all importable
classes.
:type module_attributes: str
"""
self._parent_module = parent_module
self._attribute_modules = self._build_attribute_modules(module_attributes)
self._loaded_modules = {}

# Wrap the module so we can take over __getattr__.
sys.modules[parent_module.__name__] = self

@staticmethod
def _build_attribute_modules(module_attributes):
"""
Flips and flattens the module_attributes dictionary from:
module => [Attribute, ...]
To:
Attribute => module
This is useful so that we can find the module to use, given an
attribute.
"""
attribute_modules = {}

for module, attributes in list(module_attributes.items()):
for attribute in attributes:
attribute_modules[attribute] = module

return attribute_modules

def _load_attribute(self, attribute):
"""
Load the class attribute if it hasn't been loaded yet, and return it.
"""
module = self._attribute_modules.get(attribute, False)

if not module:
# This shouldn't happen. The check happens in find_modules, too.
raise ImportError(attribute)
elif module not in self._loaded_modules:
# Note that it's very important to only load a given modules once.
# If they are loaded more than once, the memory reference to the
# class objects changes, and Python thinks that an object of type
# Foo that was declared before Foo's module was reloaded is no
# longer the same type as Foo after it's reloaded.
path = os.path.realpath(self._parent_module.__file__)
folder = os.path.dirname(path)
f, filename, description = imp.find_module(module, [folder])
self._loaded_modules[module] = imp.load_module(module, f, filename, description)

# This functionality is deprecated, and AirflowImporter should be
# removed in 2.0.
warnings.warn(
"Importing '{i}' directly from '{m}' has been "
"deprecated. Please import from "
"'{m}.[operator_module]' instead. Support for direct "
"imports will be dropped entirely in Airflow 2.0.".format(
i=attribute, m=self._parent_module.__name__),
DeprecationWarning)

loaded_module = self._loaded_modules[module]

return getattr(loaded_module, attribute)

def __getattr__(self, attribute):
"""
Get an attribute from the wrapped module. If the attribute doesn't
exist, try and import it as a class from a submodule.
This is a Python trick that allows the class to pretend it's a module,
so that attribute-based usage works:
from airflow import operators
operators.BashOperator(...)
It also allows normal from imports to work:
from airflow.operators.bash_operator import BashOperator
"""
if hasattr(self._parent_module, attribute):
# Always default to the parent module if the attribute exists.
return getattr(self._parent_module, attribute)
elif attribute in self._attribute_modules:
# Try and import the attribute if it's got a module defined.
loaded_attribute = self._load_attribute(attribute)
setattr(self, attribute, loaded_attribute)
return loaded_attribute

raise AttributeError
4 changes: 0 additions & 4 deletions run_unit_tests.sh
Expand Up @@ -27,9 +27,6 @@ export AIRFLOW__CORE__UNIT_TEST_MODE=True
# configuration test
export AIRFLOW__TESTSECTION__TESTKEY=testvalue

# use Airflow 2.0-style imports
export AIRFLOW_USE_NEW_IMPORTS=1

# add test/contrib to PYTHONPATH
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
export PYTHONPATH=$PYTHONPATH:${DIR}/tests/test_utils
Expand Down Expand Up @@ -75,4 +72,3 @@ nosetests $nose_args

# To run individual tests:
# nosetests tests.core:CoreTest.test_scheduler_job

3 changes: 0 additions & 3 deletions scripts/ci/5-run-tests.sh
Expand Up @@ -48,9 +48,6 @@ export AIRFLOW__CORE__UNIT_TEST_MODE=True
# configuration test
export AIRFLOW__TESTSECTION__TESTKEY=testvalue

# use Airflow 2.0-style imports
export AIRFLOW_USE_NEW_IMPORTS=1

# any argument received is overriding the default nose execution arguments:
nose_args=$@

Expand Down

0 comments on commit 0e5eee8

Please sign in to comment.