Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A simple plugin system for Airflow #32

Merged
merged 1 commit into from
Jun 17, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
__version__ = "1.0.1"

"""
Authentication is implemented using flask_login and different environments can
implement their own login mechanisms by providing an `airflow_login` module
in their PYTHONPATH. airflow_login should be based off the
`airflow.www.login`
"""
__version__ = "1.0.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe minor bump in the version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I bump as part of the process when I decide to upload to pypi


import logging
from airflow.configuration import conf
from airflow.models import DAG
from flask.ext.admin import BaseView


from airflow import default_login as login
if conf.getboolean('webserver', 'AUTHENTICATE'):
Expand All @@ -20,3 +22,19 @@
"authenticate is set to True in airflow.cfg, "
"but airflow_login failed to import")


class AirflowViewPlugin(BaseView):
pass

class AirflowMacroPlugin(object):
def __init__(self, namespace):
self.namespace = namespace

from airflow import operators
from airflow import hooks
from airflow import executors
from airflow import macros

operators.integrate_plugins()
hooks.integrate_plugins()
macros.integrate_plugins()
4 changes: 4 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class AirflowConfigException(Exception):
'unit_test_mode': False,
'parallelism': 32,
'load_examples': True,
'plugins_folder': None,
},
'webserver': {
'base_url': 'http://localhost:8080',
Expand Down Expand Up @@ -64,6 +65,9 @@ class AirflowConfigException(Exception):
# environment
load_examples = True

# Where your Airflow plugins are stored
{AIRFLOW_HOME}/plugins


[webserver]
# The base url of your website as airflow cannot guess what domain or
Expand Down
11 changes: 10 additions & 1 deletion airflow/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
from airflow.executors.local_executor import LocalExecutor
from airflow.executors.celery_executor import CeleryExecutor
from airflow.executors.sequential_executor import SequentialExecutor
Expand All @@ -15,6 +16,14 @@
elif _EXECUTOR == 'SequentialExecutor':
DEFAULT_EXECUTOR = SequentialExecutor()
else:
raise AirflowException("Executor {0} not supported.".format(_EXECUTOR))
# Loading plugins
from airflow.plugins_manager import get_plugins
executor_plugins = {}
for _plugin in get_plugins(BaseExecutor):
globals()[_plugin.__name__] = _plugin
if _EXECUTOR in globals():
DEFAULT_EXECUTOR = globals()[_EXECUTOR]
else:
raise AirflowException("Executor {0} not supported.".format(_EXECUTOR))

logging.info("Using executor " + _EXECUTOR)
8 changes: 8 additions & 0 deletions airflow/hooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
abstracting the underlying modules
'''
from airflow.utils import import_module_attrs as _import_module_attrs
from airflow.hooks.base_hook import BaseHook as _BaseHook

_hooks = {
'hive_hooks': [
Expand All @@ -20,3 +21,10 @@
}

_import_module_attrs(globals(), _hooks)


def integrate_plugins():
"""Integrate plugins to the context"""
from airflow.plugins_manager import get_plugins
for _plugin in get_plugins(_BaseHook):
globals()[_plugin.__name__] = _plugin
1 change: 1 addition & 0 deletions airflow/hooks/sqlite_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def get_records(self, sql):
def get_pandas_df(self, sql):
"""
Executes the sql and returns a pandas dataframe

>>> h = SqliteHook()
>>> sql = "SELECT * FROM test_table WHERE i=1 LIMIT 1;"
>>> h.get_pandas_df(sql)
Expand Down
8 changes: 8 additions & 0 deletions airflow/macros/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,11 @@ def ds_format(ds, input_format, output_format):
'2015-01-05'
"""
return datetime.strptime(ds, input_format).strftime(output_format)


def integrate_plugins():
"""Integrate plugins to the context"""
from airflow.plugins_manager import get_plugins
from airflow import AirflowMacroPlugin
for _plugin in get_plugins(AirflowMacroPlugin, expect_class=False):
globals()[_plugin.namespace] = _plugin
7 changes: 7 additions & 0 deletions airflow/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
abstracting the underlying modules
'''
from airflow.utils import import_module_attrs as _import_module_attrs
from airflow.models import BaseOperator as _BaseOperator

_operators = {
'bash_operator': ['BashOperator'],
Expand Down Expand Up @@ -37,3 +38,9 @@
}

_import_module_attrs(globals(), _operators)

def integrate_plugins():
"""Integrate plugins to the context"""
from airflow.plugins_manager import get_plugins
for _plugin in get_plugins(_BaseOperator):
globals()[_plugin.__name__] = _plugin
58 changes: 58 additions & 0 deletions airflow/plugins_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import imp
import inspect
import logging
import os

from airflow.configuration import conf


plugins_folder = conf.get('core', 'plugins_folder')
if not plugins_folder:
plugins_folder = conf.get('core', 'airflow_home') + '/plugins'
plugins_folder = os.path.expanduser(plugins_folder)

plugin_modules = []
# Crawl through the plugins folder to find pluggable_classes
templates_dirs = []

for root, dirs, files in os.walk(plugins_folder):
if os.path.basename(root) == 'templates':
templates_dirs.append(root)
for f in files:
try:
filepath = os.path.join(root, f)
if not os.path.isfile(filepath):
continue
mod_name, file_ext = os.path.splitext(
os.path.split(filepath)[-1])
if file_ext != '.py':
continue
m = imp.load_source(mod_name, filepath)
plugin_modules.append(m)
except Exception() as e:
logging.exception(e)
logging.error('Failed to import plugin ' + filepath)


def register_templates_folders(app):
from jinja2 import ChoiceLoader, FileSystemLoader
new_loaders = [FileSystemLoader(s) for s in templates_dirs]
app.jinja_env.loader = ChoiceLoader([app.jinja_env.loader] + new_loaders)


def get_plugins(baseclass, expect_class=True):
"""
Set expect_class=False if you want instances of baseclass
"""
# Late Imports to aoid circular imort
plugins = []
for m in plugin_modules:
for obj in m.__dict__.values():
if ((
expect_class and inspect.isclass(obj) and
issubclass(obj, baseclass) and
obj is not baseclass)
or
(not expect_class and isinstance(obj, baseclass))):
plugins.append(obj)
return plugins
24 changes: 24 additions & 0 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@
logout_user = login.logout_user


from airflow import default_login as login
if conf.getboolean('webserver', 'AUTHENTICATE'):
try:
# Environment specific login
import airflow_login as login
except ImportError:
logging.error(
"authenticate is set to True in airflow.cfg, "
"but airflow_login failed to import")
login_required = login.login_required
current_user = login.current_user
logout_user = login.logout_user

AUTHENTICATE = conf.getboolean('webserver', 'AUTHENTICATE')
if AUTHENTICATE is False:
login_required = lambda x: x
Expand Down Expand Up @@ -1762,3 +1775,14 @@ class PoolModelView(SuperUserMixin, ModelView):
named_filter_urls = True
mv = PoolModelView(models.Pool, Session, name="Pools", category="Admin")
admin.add_view(mv)



def integrate_plugins():
"""Integrate plugins to the context"""
from airflow.plugins_manager import get_plugins, register_templates_folders
from airflow import AirflowViewPlugin
for view in get_plugins(AirflowViewPlugin, expect_class=False):
admin.add_view(view)
register_templates_folders(app)
integrate_plugins()
3 changes: 2 additions & 1 deletion docs/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,10 @@ accessible and modifiable through the UI.


.. code:: python

from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("foo", deser_json=True)
bar = Variable.get("bar", deser_json=True)

The second call assumes ``json`` content and will be deserialized into
``bar``. Note that ``Variable`` is a sqlalchemy model and can be used
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ Content
profiling
cli
scheduler
plugins
code
101 changes: 101 additions & 0 deletions docs/plugins.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
Plugins
=======

Airflow has a simple plugin manager built-in that can integrate external
features at its core by simply dropping files in your
``$AIRFLOW_HOME/plugins`` folder.

The python modules in the ``plugins`` folder get imported,
and **hooks**, **operators**, **macros**, **executors** and web **views**
get integrated to Airflow's main collections and become available for use.

Objects
-------

* Classes derived from ``BaseOperator`` land in ``airflow.operators``
* Classes derived from ``BaseHook`` land in ``airflow.hooks``
* Classes derived from ``BaseExecutor`` land ``airflow.executors``
* object created from a class derived from ``airflow.PluginView`` get integrated in the Flask app
* object created from ``AirflowMacroPlugin(namespace="foo")`` land in ``airflow.macros.foo``
* Files located in subfolders named ``templates`` folders become available when rendering pages
* (upcoming) CLI subcommands


What for?
---------

Airflow offers a generic toolbox for working with data. Different
organizations have different stacks and different needs. Using Airflow
plugins can be a way for companies to customize their Airflow installation
to reflect their ecosystem.

Plugins can be used as an easy way to write, share and activate new sets of
features.

There's also a need for a set of more complex application to interact with
different flavors of data and metadata.

Examples:

* A set of tools to parse Hive logs and expose Hive metadata (CPU /IO / phases/ skew /...)
* An anomaly detection framework, allowing people to collect metrics, set thresholds and alerts
* An auditing tool, helping understand who accesses what
* A config-driven SLA monitoring tool, allowing you to set monitored tables and at what time
they should land, alert people and exposes visualization of outages
* ...


Why build on top Airflow?
-------------------------

Airflow has many components that can be reused when building an application:

* A web server you can use to render your views
* A metadata database to store your models
* Access to your database, and knowledge of how to connect to them
* An array of workers that can allow your application to push workload to
* Airflow is deployed, you can just piggy back on it's deployment logistics
* Basic charting capabilities, underlying libraries and abstractions


Example
-------

The code bellow defines a plugin that injects a set of dummy object
definitions in Airflow.

.. code:: python

# Importing base classes that we need to derive
from airflow.hooks.base_hook import BaseHook
from airflow.models import BaseOperator
from airflow.executors.base_executor import BaseExecutor
from airflow import AirflowViewPlugin, AirflowMacroPlugin
from flask_admin import expose

# Will show up under airflow.hooks.PluginHook
class PluginHook(BaseHook):
pass

# Will show up under airflow.operators.PluginOperator
class PluginOperator(BaseOperator):
pass

# Will show up under airflow.executors.PluginExecutor
class PluginExecutor(BaseExecutor):
pass

# Shows up in the UI in menu -> Plugins -> Test
class TestView(AirflowViewPlugin):
@expose('/')
def query(self):
return self.render("test.html", content="Hello galaxy!")
v = TestView(category="Plugins", name="Test")


# Available as other macros in templates {{ macros.foo_plugin.success() }}
def success():
return "Success!"
obj = AirflowMacroPlugin(namespace="foo_plugin")
obj.success = success