Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-2193] Add ROperator for using R #3115

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
99 changes: 99 additions & 0 deletions airflow/contrib/operators/r_operator.py
@@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from builtins import bytes
import os
from tempfile import NamedTemporaryFile

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.file import TemporaryDirectory
from airflow.utils.operator_helpers import context_to_airflow_vars

import rpy2.robjects as robjects
from rpy2.rinterface import RRuntimeError


class ROperator(BaseOperator):
"""
Execute an R script or command

If BaseOperator.do_xcom_push is True, the last line written to stdout
will also be pushed to an XCom when the R command completes

:param r_command: The command or a reference to an R script (must have
'.r' extension) to be executed (templated)
:type r_command: string
:param env: Optional list of environment variables and their (string)
values to set (templated). Unlike `BashOperator`, this does not
replace the current environment, although it can be used to override
existing values. Values can be read in R with `Sys.getenv()`.
:type env: dict
:param output_encoding: encoding output from R (default: 'utf-8')
:type output_encoding: string

"""

template_fields = ('r_command', 'env',)
template_ext = ('.r', '.R')
ui_color = '#C8D5E6'

@apply_defaults
def __init__(
self,
r_command,
env={},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutable defaults should be avoided in Python

Suggested change
env={},
env=None,

and then in the function

    self.env = env or {}

output_encoding='utf-8',
*args, **kwargs):

super(ROperator, self).__init__(*args, **kwargs)
self.r_command = r_command
self.env = env
self.output_encoding = output_encoding

def execute(self, context):
"""
Execute the R command or script in a temporary directory
"""

# Export additional environment variables
os.environ.update(self.env)

# Export context as environment variables
airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
self.log.info('Exporting the following env vars:\n%s',
'\n'.join(["{}={}".format(k, v)
for k, v in
airflow_context_vars.items()]))
os.environ.update(airflow_context_vars)

with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:

f.write(bytes(self.r_command, 'utf_8'))
f.flush()
fname = f.name
script_location = os.path.abspath(fname)

self.log.info("Temporary script location: %s", script_location)
self.log.info("Running command(s):\n%s", self.r_command)

try:
res = robjects.r.source(fname, echo=False)
Copy link
Contributor

@dlamblin dlamblin Feb 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a comment with a question, is it uncommon to write R scripts that take arguments or use environment variables?

The BashOperator has a similar limitation, but can be just flexible enough to take a script written to be called with arguments and have templated arguments used instead with the script as is.

That is you can either write:

bash_command = 'some_command {{params.arg}}',  # this command is templated
bash_command = 'script.sh',  # this file's contents are templated
bash_command = 'script.sh {{params.arg}}', # this file is executable, it is not templated but passed a templated arg.

This PR supports the first two cases for R only.

People with pre-existing R scripts, that use arguments, will likely need to rewrite them with templating in mind or fall back to calling their R scripts with BashOperator and templated parameters. Alternatively this operator could have a r_args list parameter, but I don't really see a way that source can accept parameters.
Also commandArgs doesn't seem to be something that could be setup for the source.

If you agree that most new and existing users of R in Airflow would end up needing to rewrite their R scripts for this operator, and that's an issue, then this should be addressed (maybe in a later PR, really). And while doing that, it would seem that also taking an env dict for setting with sys.setenv could help in the on-boarding of R tasks into Airflow DAGs..

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use environment variables quite a bit, so I am planning to re-introduce support for them through an env list parameter. They were included in the original version of the operator, but were removed when I switched to rpy2.

Similarly, I'm going to look into passing values to the R environment, which is very similar, but different.

Although I do occasionally use command line arguments for R scripts, since this operator is essentially sourceing the R code, they seem like they'd be a less natural fit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-introduced the env parameter for exporting environment variables in
9f997a2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks it looks good.
I had a nit I forgot to mention with the comment referring to pickling xcom. That's a configuration setting, xcom can also be marshaled to json and not pickled. But really… never mind that. LGTM.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not feeling 100% about the xcom. I'll update the comment about pickling for sure.

There aren't a lot of operators to use as an example for how to properly respect do_xcom_push. The BashOperator just unconditionally returns a value, so I followed its lead. But in testing, my tasks always push the value, even when I've set do_xcom_push=False.

@ashb, any suggestions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be okay if you are based off the latest master:

result = task_copy.execute(context=context)
# If the task returns a result, push an XCom containing it
if task_copy.do_xcom_push and result is not None:
self.xcom_push(key=XCOM_RETURN_KEY, value=result)

If collecting the result is expensive then it sometimes makes sense to check self.do_xcom_push in the operator (i.e. the docker operator does this as it doesn't make sense to collect the logs and then just throw them away)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@briandconnelly I'm happy to find this ROperator. Thanks. Now I use the DockerOperator to execute R script. But the docker container seems to consumes a lot of memory than I suspect (concurrent tasks).

Just look at the code though I'm not familiar with Python. It seems the code copy R script to a temporary file and then source it. I'm wonder do you consider the following use case: There are three files: .Rprofile: specify some setting and functions. report.Rmd: generate report file. send.R: render the Rmd and send email to someone.

except RRuntimeError as e:
self.log.error("Received R error: %s", e)
res = None

# This will be a pickled rpy2.robjects.vectors.ListVector
return res
4 changes: 3 additions & 1 deletion setup.py
Expand Up @@ -210,6 +210,7 @@ def write_version(filename=os.path.join(*['airflow',
pinot = ['pinotdb==0.1.1']
postgres = ['psycopg2>=2.7.4']
qds = ['qds-sdk>=1.10.4']
r = ['rpy2>=2.9.5']
rabbitmq = ['librabbitmq>=1.6.1']
redis = ['redis>=2.10.5,<3.0.0']
salesforce = ['simple-salesforce>=0.72']
Expand Down Expand Up @@ -261,7 +262,7 @@ def write_version(filename=os.path.join(*['airflow',
docker + ssh + kubernetes + celery + redis + gcp_api +
datadog + zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins +
druid + pinot + segment + snowflake + elasticsearch +
atlas + azure + aws)
atlas + azure + aws + r)

# Snakebite & Google Cloud Dataflow are not Python 3 compatible :'(
if PY3:
Expand Down Expand Up @@ -369,6 +370,7 @@ def do_setup():
'pinot': pinot,
'postgres': postgres,
'qds': qds,
'r': r,
'rabbitmq': rabbitmq,
'redis': redis,
'salesforce': salesforce,
Expand Down
102 changes: 102 additions & 0 deletions tests/contrib/operators/test_r_operator.py
@@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function, unicode_literals

import unittest

from airflow import configuration, DAG
from airflow.contrib.operators.r_operator import ROperator
from airflow.models import TaskInstance
from airflow.utils import timezone


DEFAULT_DATE = timezone.datetime(2016, 1, 1)


class ROperatorTest(unittest.TestCase):
"""Test the ROperator"""

def setUp(self):
super(ROperatorTest, self).setUp()
configuration.load_test_config()
self.dag = DAG(
'test_roperator_dag',
default_args={
'owner': 'airflow',
'start_date': DEFAULT_DATE
},
schedule_interval='@once'
)

self.xcom_test_str = 'Hello Airflow'
self.task_xcom = ROperator(
task_id='test_r_xcom',
r_command='cat("Ignored Line\n{}")'.format(self.xcom_test_str),
xcom_push=True,
dag=self.dag
)

def test_xcom_output(self):
"""Test whether Xcom output is produced using last line"""

self.task_xcom.do_xcom_push = True

ti = TaskInstance(
task=self.task_xcom,
execution_date=timezone.utcnow()
)

ti.run()
self.assertIsNotNone(ti.duration)

self.assertEqual(
ti.xcom_pull(task_ids=self.task_xcom.task_id, key='return_value'),
self.xcom_test_str
)

def test_xcom_none(self):
"""Test whether no Xcom output is produced when push=False"""

self.task_xcom.do_xcom_push = False

ti = TaskInstance(
task=self.task_xcom,
execution_date=timezone.utcnow(),
)

ti.run()
self.assertIsNotNone(ti.duration)
self.assertIsNone(ti.xcom_pull(task_ids=self.task_xcom.task_id))

def test_command_template(self):
"""Test whether templating works properly with r_command"""

task = ROperator(
task_id='test_cmd_template',
r_command='cat("{{ ds }}")',
dag=self.dag
)

ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.render_templates()

self.assertEqual(
ti.task.r_command,
'cat("{}")'.format(DEFAULT_DATE.date().isoformat())
)


if __name__ == '__main__':
unittest.main()