Skip to content

Commit

Permalink
Merge pull request #3215 from grondo/issue#3186
Browse files Browse the repository at this point in the history
python: allow Future.get() and wait_for() to be interrupted by Ctrl-C
  • Loading branch information
mergify[bot] committed Sep 18, 2020
2 parents 0f18d03 + 21edf51 commit 4fee45e
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 6 deletions.
4 changes: 3 additions & 1 deletion src/bindings/python/flux/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from typing import Dict

from flux.util import check_future_error
from flux.util import check_future_error, interruptible
from flux.wrapper import Wrapper, WrapperPimpl
from flux.core.inner import ffi, lib, raw

Expand Down Expand Up @@ -141,10 +141,12 @@ def reset(self):
def is_ready(self):
return self.pimpl.is_ready()

@interruptible
def wait_for(self, timeout=-1.0):
self.pimpl.wait_for(timeout)
return self

@interruptible
def get(self):
"""
Base Future.get() method. Does not return a result, just blocks
Expand Down
2 changes: 2 additions & 0 deletions src/bindings/python/flux/job/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ def get_event(self, autoreset=True):
"""
result = ffi.new("char *[1]")
try:
# Block until Future is ready:
self.wait_for()
RAW.event_watch_get(self.pimpl, result)
except OSError as exc:
if exc.errno == errno.ENODATA:
Expand Down
4 changes: 3 additions & 1 deletion src/bindings/python/flux/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from flux.future import Future
from flux.core.inner import ffi, lib, raw
import flux.constants
from flux.util import encode_payload, encode_topic
from flux.util import encode_payload, encode_topic, interruptible


class RPC(Future):
Expand Down Expand Up @@ -66,13 +66,15 @@ def __init__(
pimpl_t=self.RPCInnerWrapper,
)

@interruptible
def get_str(self):
payload_str = ffi.new("char *[1]")
self.pimpl.flux_rpc_get(payload_str)
if payload_str[0] == ffi.NULL:
return None
return ffi.string(payload_str[0]).decode("utf-8")

@interruptible
def get(self):
resp_str = self.get_str()
if resp_str is None:
Expand Down
17 changes: 17 additions & 0 deletions src/bindings/python/flux/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import math
import argparse
import traceback
import signal
from datetime import timedelta
from string import Formatter

Expand Down Expand Up @@ -55,6 +56,22 @@ def func_wrapper(calling_obj, *args, **kwargs):
return func_wrapper


def interruptible(func):
"""Make a method interruptible via Ctrl-C
Necessary for methods that may block when calling into the C API.
"""

def func_wrapper(calling_obj, *args, **kwargs):
handler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, signal.SIG_DFL)
retval = func(calling_obj, *args, **kwargs)
signal.signal(signal.SIGINT, handler)
return retval

return func_wrapper


def encode_payload(payload):
if payload is None or payload == ffi.NULL:
payload = ffi.NULL
Expand Down
7 changes: 4 additions & 3 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,14 @@ EXTRA_DIST= \

dist_check_SCRIPTS = \
$(TESTSCRIPTS) \
issues/t0441-kvs-put-get.sh \
issues/t0505-msg-handler-reg.lua \
issues/t0821-kvs-segfault.sh \
issues/t0441-kvs-put-get.sh \
issues/t0505-msg-handler-reg.lua \
issues/t0821-kvs-segfault.sh \
issues/t1760-kvs-use-after-free.sh \
issues/t2281-service-add-crash.sh \
issues/t2284-initial-program-format-chars.sh \
issues/t2686-shell-input-race.sh \
issues/t3186-python-future-get-sigint.sh \
python/__init__.py \
python/subflux.py \
python/tap \
Expand Down
37 changes: 37 additions & 0 deletions t/issues/t3186-python-future-get-sigint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/bin/sh
# future/rpc.get() should be interruptible:

SERVICE="t3186"
waitfile=${SHARNESS_TEST_SRCDIR}/scripts/waitfile.lua

log() { echo "t3186: $@" >&2; }
die() { log "$@"; exit 1; }

cat <<EOF >get-intr.py || die "Failed to create test script"
import flux
from flux.future import Future
f = flux.Flux()
Future(f.service_register("$SERVICE")).get()
print("get-intr.py: Added service $SERVICE", flush=True)
# The following should block until interrupted:
f.rpc("${SERVICE}.echo").get()
EOF

log "Created test script get-intr.py"

flux python get-intr.py >t3186.log 2>&1 &
pid=$!

log "Started PID=$pid"

$waitfile --timeout=10 --pattern="Added service" t3186.log

log "Sending SIGINT to $pid"
kill -INT $pid || die "Failed to kill PID $pid"
wait $!
STATUS=$?
test $STATUS -eq 130 || die "process exited with $STATUS expected 130"

log "Python script exited with status $STATUS"
2 changes: 1 addition & 1 deletion t/t4000-issues-test-driver.t
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ echo "# $0: flux session size will be ${SIZE}"

for testscript in ${FLUX_SOURCE_DIR}/t/issues/*; do
testname=`basename $testscript`
test_expect_success $testname $testscript
test_expect_success $testname "run_timeout 30 $testscript"
done

test_done

0 comments on commit 4fee45e

Please sign in to comment.