Skip to content

Commit

Permalink
testsuite: add a python test for service.add/remove
Browse files Browse the repository at this point in the history
Add python/t1000-service-add-remove.py to the testsuite to test
the connector-local usage of `service.add` and `service.remove`.
  • Loading branch information
grondo committed Oct 29, 2018
1 parent f80cf68 commit 1cfcb5d
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 2 deletions.
6 changes: 4 additions & 2 deletions t/Makefile.am
Expand Up @@ -120,7 +120,8 @@ TESTS = \
python/t0005-kvs.py \
python/t0006-request.py \
python/t0007-watchers.py \
python/t0008-jsc.py
python/t0008-jsc.py \
python/t1000-service-add-remove.py


if ENABLE_JOBSPEC
Expand Down Expand Up @@ -240,7 +241,8 @@ check_SCRIPTS = \
python/t0006-request.py \
python/t0007-watchers.py \
python/t0008-jsc.py \
python/t0009-security.py
python/t0009-security.py \
python/t1000-service-add-remove.py

check_PROGRAMS = \
shmem/backtoback.t \
Expand Down
131 changes: 131 additions & 0 deletions t/python/t1000-service-add-remove.py
@@ -0,0 +1,131 @@
#!/usr/bin/env python
import unittest
import errno
import flux.core as core
from flux.message import Message
from flux.core.inner import ffi
from flux.constants import(FLUX_MSGTYPE_REQUEST,
FLUX_MSGTYPE_RESPONSE)

from subflux import rerun_under_flux

def __flux_size():
return 2

def service_add(f, name):
future = f.service_register(name)
return f.future_get(future, ffi.NULL)

def service_remove(f, name):
future = f.service_unregister(name)
return f.future_get(future, ffi.NULL)

class TestServiceAddRemove(unittest.TestCase):
@classmethod
def setUpClass(self):
self.f = core.Flux()

@classmethod
def tearDownClass(self):
self.f.close()

def test_001_register_service(self):
rc = service_add(self.f, "foo")
self.assertEqual(rc, 0)

def test_002_service_add_eexist(self):
with self.assertRaises(Exception) as e:
service_add(self.f, "foo")
self.assertEqual(e.exception.errno, errno.EEXIST)

def test_003_add_service_message_watcher(self):
def service_cb(f, t, msg, arg):
rc = f.respond(msg, 0, msg.payload_str)
self.assertEqual(rc, 0)

self.f.watcher = self.f.msg_watcher_create(service_cb,
FLUX_MSGTYPE_REQUEST,
"foo.*")
self.assertIsNotNone(self.f.watcher)
self.f.watcher.start()

def test_004_service_rpc(self):
cb_called = [False] # So that cb_called[0] is mutable in inner function
p = {"test": "foo"}

def cb(f, t, msg, arg):
cb_called[0] = True
self.assertEqual(msg.payload, p)
f.reactor_stop(f.get_reactor())

w2 = self.f.msg_watcher_create(cb, FLUX_MSGTYPE_RESPONSE,
"foo.echo")
w2.start()
self.assertIsNotNone(w2, msg="msg_watcher_create response handler")

m = Message()
m.topic = "foo.echo"
m.payload = p
self.assertTrue(m is not None)
ret = self.f.send(m)
self.assertEqual(ret, 0)

ret = self.f.reactor_run(self.f.get_reactor(), 0)
self.assertTrue(ret >= 0)
self.assertTrue(cb_called[0])
w2.stop()
w2.destroy()

def test_005_unregister_service(self):
rc = service_remove(self.f, "foo")
self.assertEqual(rc, 0)

# done with message handler
self.f.watcher.destroy()

def test_006_unregister_service_enoent(self):
with self.assertRaises(EnvironmentError) as e:
service_remove(self.f, "foo")
self.assertEqual(e.exception.errno, errno.ENOENT)

def test_007_service_rpc_enosys(self):
fut = self.f.rpc_create("foo.echo")
with self.assertRaises(EnvironmentError) as e:
fut.get()
self.assertEqual(e.exception.errno, errno.ENOSYS)

def test_008_service_remove_on_disconnect(self):
from multiprocessing import Process
# Add "baz" service in a different process and let the
# process exit to cause a disconnect.
# then, ensure "baz" service was removed.
#
def add_service_and_disconnect():
import sys
h = core.Flux()
sys.exit(service_add(h, "baz"))
p = Process(target=add_service_and_disconnect)
p.start()
p.join()
self.assertEqual(p.exitcode, 0)

# Ensure no "baz" service remains
fut = self.f.rpc_create("baz.echo")
with self.assertRaises(EnvironmentError) as e:
fut.get()
self.assertEqual(e.exception.errno, errno.ENOSYS)

def test_009_service_add_remove_eproto(self):
fut = self.f.rpc_create("service.add")
with self.assertRaises(EnvironmentError) as e:
fut.get()
self.assertEqual(e.exception.errno, errno.EPROTO)
fut = self.f.rpc_create("service.remove")
with self.assertRaises(EnvironmentError) as e:
fut.get()
self.assertEqual(e.exception.errno, errno.EPROTO)

if __name__ == '__main__':
if rerun_under_flux(__flux_size()):
from pycotap import TAPTestRunner
unittest.main(testRunner=TAPTestRunner(), verbosity=4)

0 comments on commit 1cfcb5d

Please sign in to comment.