Skip to content

Commit

Permalink
Merge pull request #432 from ghjm/run_from_receptor
Browse files Browse the repository at this point in the history
Allow running jobs via Receptor 

Reviewed-by: https://github.com/apps/ansible-zuul
  • Loading branch information
ansible-zuul[bot] committed May 8, 2020
2 parents 1e5eab5 + 41dabc4 commit 68e09e4
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 5 deletions.
36 changes: 35 additions & 1 deletion ansible_runner/__main__.py
Expand Up @@ -42,6 +42,11 @@
from ansible_runner.runner import Runner
from ansible_runner.exceptions import AnsibleRunnerException

if sys.version_info >= (3, 0):
from ansible_runner.receptor_plugin import receptor_import
else:
receptor_import = False

VERSION = pkg_resources.require("ansible_runner")[0].version

DEFAULT_ROLES_PATH = os.getenv('ANSIBLE_ROLES_PATH', None)
Expand Down Expand Up @@ -336,6 +341,26 @@ def main(sys_args=None):
"ansible-playbook output (default=None)"
)

# Receptor options

runner_group.add_argument(
"--via-receptor",
default=None,
help="Run the job on a Receptor node rather than locally"
)

runner_group.add_argument(
"--receptor-peer",
default=None,
help="peer connection to use to reach the Receptor network"
)

runner_group.add_argument(
"--receptor-node-id",
default=None,
help="Receptor node-id to use for the local node"
)

# ansible options

ansible_group = parser.add_argument_group(
Expand Down Expand Up @@ -516,6 +541,12 @@ def main(sys_args=None):
if not (args.module or args.role) and not args.playbook:
parser.exit(status=1, message="The -p option must be specified when not using -m or -r\n")

if args.via_receptor and not receptor_import:
parser.exit(status=1, message="The --via-receptor option requires Receptor to be installed.\n")

if args.via_receptor and args.command != 'run':
parser.exit(status=1, message="Only the 'run' command is supported via Receptor.\n")

output.configure()

# enable or disable debug mode
Expand Down Expand Up @@ -590,7 +621,10 @@ def main(sys_args=None):
resource_profiling_memory_poll_interval=args.resource_profiling_memory_poll_interval,
resource_profiling_pid_poll_interval=args.resource_profiling_pid_poll_interval,
resource_profiling_results_dir=args.resource_profiling_results_dir,
limit=args.limit)
limit=args.limit,
via_receptor=args.via_receptor,
receptor_peer=args.receptor_peer,
receptor_node_id=args.receptor_node_id)
if args.cmdline:
run_options['cmdline'] = args.cmdline

Expand Down
29 changes: 25 additions & 4 deletions ansible_runner/interface.py
Expand Up @@ -17,6 +17,7 @@
# specific language governing permissions and limitations
# under the License.
#
import sys
import threading
import logging

Expand All @@ -28,6 +29,11 @@
check_isolation_executable_installed,
)

if sys.version_info >= (3, 0):
from ansible_runner.receptor_plugin import run_via_receptor, receptor_import
else:
receptor_import = False

logging.getLogger('ansible-runner').addHandler(logging.NullHandler())


Expand Down Expand Up @@ -132,6 +138,9 @@ def run(**kwargs):
:param fact_cache_type: A string of the type of fact cache to use. Defaults to 'jsonfile'.
:param omit_event_data: Omits extra ansible event data from event payload (stdout and event still included)
:param only_failed_event_data: Omits extra ansible event data unless it's a failed event (stdout and event still included)
:param via_receptor: If set, specifies a Receptor node-id on which the job will be run remotely
:param receptor_peer: Specifies the Receptor listener, in URL format, to use to connect to the Receptor network
:param receptor_node_id: Specifies the node-id to assign to the local Receptor ephemeral node
:type private_data_dir: str
:type ident: str
:type json_mode: bool
Expand Down Expand Up @@ -171,12 +180,24 @@ def run(**kwargs):
:type fact_cache_type: str
:type omit_event_data: bool
:type only_failed_event_data: bool
:type via_receptor: str
:type receptor_peer: str
:type receptor_node_id: str
:returns: A :py:class:`ansible_runner.runner.Runner` object
:returns: A :py:class:`ansible_runner.runner.Runner` object, or a simple object containing `rc` if run remotely
'''
r = init_runner(**kwargs)
r.run()
return r
via_receptor = kwargs.pop('via_receptor', None)
receptor_peer = kwargs.pop('receptor_peer', None)
receptor_node_id = kwargs.pop('receptor_node_id', None)
if via_receptor:
if not receptor_import:
raise RuntimeError('Receptor is not installed or could not be imported')
r = run_via_receptor(via_receptor, receptor_peer, receptor_node_id, kwargs)
return r
else:
r = init_runner(**kwargs)
r.run()
return r


def run_async(**kwargs):
Expand Down
173 changes: 173 additions & 0 deletions ansible_runner/receptor_plugin.py
@@ -0,0 +1,173 @@
import logging
import json
import os
import time
import zipfile
import tempfile
import uuid
import asyncio

import ansible_runner.interface

try:
import receptor
from receptor.config import ReceptorConfig
from receptor.controller import Controller
receptor_import = True
except ImportError:
receptor_import = False

logger = logging.getLogger(__name__)


class UUIDEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, uuid.UUID):
return obj.hex
return json.JSONEncoder.default(self, obj)


# List of kwargs options to the run method that should be sent to the remote executor.
remote_run_options = (
'forks',
'host_pattern',
'ident',
'ignore_logging',
'inventory',
'limit',
'module',
'module_args',
'omit_event_data',
'only_failed_event_data',
'playbook',
'verbosity',
)


def run_via_receptor(via_receptor, receptor_peer, receptor_node_id, run_options):

async def read_responses():
event_handler = run_options.get('event_handler', None)
status_handler = run_options.get('status_handler', None)
while True:
message = await controller.recv()
if message.header.get("eof", False):
break
elif message.payload:
content = json.loads(message.payload.readall())
c_header = content[0]
c_type = c_header['type']
if c_type == 'event':
data = content[1]
if event_handler:
event_handler(data)
if 'stdout' in data and data['stdout']:
print(data['stdout'])
elif c_type == 'status':
data = content[1]
if status_handler:
status_handler(data, None)
elif c_type == 'error':
result.errored = True
print("Error from remote:", content[1])

async def run_func():
if receptor_node_id != via_receptor:
add_peer_task = controller.add_peer(receptor_peer)
start_wait = time.time()
while True:
if add_peer_task and add_peer_task.done() and not add_peer_task.result():
raise RuntimeError('Cannot connect to peer')
if controller.receptor.router.node_is_known(via_receptor):
break
if time.time() - start_wait > 5:
if not add_peer_task.done():
add_peer_task.cancel()
raise RuntimeError('Timed out waiting for peer')
await asyncio.sleep(0.1)
await controller.send(payload=tmpf.name, recipient=via_receptor, directive='ansible_runner:execute')
await controller.loop.create_task(read_responses())

if not receptor_peer:
receptor_peer = 'receptor://localhost'
remote_options = {key: value for key, value in run_options.items() if key in remote_run_options}

with tempfile.NamedTemporaryFile() as tmpf:

# Create archive
with zipfile.ZipFile(tmpf, 'w', compression=zipfile.ZIP_DEFLATED, allowZip64=True) as zip:
private_data_dir = run_options.get('private_data_dir', None)
if private_data_dir:
for dirpath, dirs, files in os.walk(private_data_dir):
relpath = os.path.relpath(dirpath, private_data_dir)
if relpath == ".":
relpath = ""
for file in files:
zip.write(os.path.join(dirpath, file), arcname=os.path.join(relpath, file))
kwargs = json.dumps(remote_options, cls=UUIDEncoder)
zip.writestr('kwargs', kwargs)
zip.close()
tmpf.flush()

# Run the job via Receptor
if receptor_node_id:
receptor_args = f"--node-id {receptor_node_id} node --server-disable".split()
else:
receptor_args = "node --server-disable".split()
config = ReceptorConfig(receptor_args)
config._is_ephemeral = True
controller = Controller(config)
try:
result = type('Receptor_Runner_Result', (), {'rc': 0, 'errored': False})
controller.run(run_func)
except Exception as exc:
result.errored = True
setattr(result, 'exception', exc)
print(str(exc))
finally:
controller.cleanup_tmpdir()

return result


# We set these parameters locally rather than using receptor.plugin_utils
# because this still needs to parse even when our import of receptor failed.
def receptor_plugin_export(func):
if receptor_import:
func.receptor_export = True
func.payload_type = receptor.FILE_PAYLOAD
return func


@receptor_plugin_export
def execute(message, config, result_queue):
private_dir = None
try:
private_dir = tempfile.TemporaryDirectory()
with zipfile.ZipFile(message, 'r') as zip:
zip.extractall(path=private_dir.name)

kwargs_path = os.path.join(private_dir.name, 'kwargs')
if os.path.exists(kwargs_path):
with open(kwargs_path, "r") as kwf:
kwargs = json.load(kwf)
if not isinstance(kwargs, dict):
raise ValueError("Invalid kwargs data")
else:
kwargs = dict()

kwargs["quiet"] = True
kwargs["private_data_dir"] = private_dir.name
kwargs["event_handler"] = lambda item: result_queue.put(json.dumps([{'type': 'event'}, item]))
kwargs["status_handler"] = lambda item, runner_config: result_queue.put(json.dumps([{'type': 'status'}, item]))
kwargs["finished_callback"] = lambda runner: result_queue.put(json.dumps([{'type': 'finished'}]))

ansible_runner.interface.run(**kwargs)

except Exception as exc:
logger.exception(exc)
result_queue.put(json.dumps([{'type': 'error'}, str(exc)]))

finally:
if private_dir:
private_dir.cleanup()
1 change: 1 addition & 0 deletions docs/index.rst
Expand Up @@ -43,6 +43,7 @@ Examples of this could include:
standalone
python_interface
container
receptor


Indices and tables
Expand Down
51 changes: 51 additions & 0 deletions docs/receptor.rst
@@ -0,0 +1,51 @@
.. _receptor:

Remote job execution via Receptor
=================================

`Receptor <http://www.github.com/project-receptor/receptor>`_ is a system for remotely executing jobs and returning results,
over a mesh overlay network that can easily be extended across complex underlying network. Receptor support within
**Ansible Runner** enables running Ansible jobs remotely, via a Receptor network.

Command Line Interface
----------------------

To run a job remotely, the new `--via-receptor` command-line parameter is used. For example, to run the demo job on
a remote Receptor node named node1, the following command can be used::

$ ansible-runner run demo/ -p test.yml --via-receptor node1

By default, Ansible Runner will attempt to contact the Receptor mesh by connecting to a local Receptor node listening
on the default port. This behavior can be controlled using the `--receptor-peer` option::

$ ansible-runner run demo/ -p test.yml --via-receptor node1 --receptor-peer receptor://host.example.com:1234

In this example, Ansible Runner will access the Receptor mesh by connecting to TCP port 1234 on host.example.com and
sending a message addressed to the Receptor node-id `node1`. The Receptor instance on port 1234 does not need to be
`node1` - reaching `node1` may involve additional hops on the Receptor overlay network.

In the current implementation, Ansible Runner communicates with a Receptor peer by launching a new, short-duration
Receptor node within Ansible Runner's local Python interpreter. Normally, this new node will be given a unique
temporary node-id generated by the system. If it is necessary to use a specific node-id, this can be
achieved as follows::

$ ansible-runner run demo/ -p test.yml --via-receptor node1 --receptor-node-id runner-node

In this example, Ansible Runner will connect to the default Receptor port on localhost, identifying itself as
`runner-node`, and will then send the job to the Receptor node `node1`.

Python API
----------

Jobs can be executed via Receptor by passing the `via_receptor`, `receptor_peer` and `receptor_node_id` parameters to
`receptor.run`. The parameters work as described in the above section. Note that the Receptor interface does not yet
support `run_async`.

Server Configuration
--------------------

To receive jobs, all that is needed is an operational Receptor node, with Ansible Runner installed in the Python
environment from which Receptor is running. Receptor will detect, via setuptools, that Ansible Runner is installed.



1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -26,6 +26,7 @@
],
zip_safe=False,
entry_points={
'receptor.worker': 'ansible_runner = ansible_runner.receptor_plugin',
'console_scripts': [
'ansible-runner = ansible_runner.__main__:main'
]
Expand Down

0 comments on commit 68e09e4

Please sign in to comment.