Skip to content

Commit

Permalink
pmap - Terminate threads on timeout in fully concurrent mapper
Browse files Browse the repository at this point in the history
  • Loading branch information
shirok committed Nov 22, 2023
1 parent d799f1b commit fcb86b3
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 14 deletions.
4 changes: 4 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
2023-11-21 Shiro Kawai <shiro@acm.org>

* lib/control/pmap.scm (run-map): Terminate threads when
timeout reaches with fully-concurrent-mapper.
https://github.com/shirok/Gauche/issues/964

* src/libsys.scm (absolute-time): Added. Handy to canonicalize
'timeout' argument.

Expand Down
43 changes: 33 additions & 10 deletions doc/modutil.texi
Original file line number Diff line number Diff line change
Expand Up @@ -2773,16 +2773,39 @@ CPUを有効活用できます。
@c COMMON

@c EN
The optional @var{timeout} and @var{timeout-val} arguments are
passed to @code{thread-join!} (@pxref{Thread procedures}).
It is useful when I/O operations may take too long and you
want to guarantee the entire operation finishes within certain
time limit.
@c JP
省略可能な@var{timeout}と@var{timeout-val}は@code{thread-join!}に渡されます
(@ref{Thread procedures}参照)。
I/O操作が非常に長くかかる可能性があり、@code{pmap}呼び出しにかかる時間を
制限したい場合に便利です。
The optional @var{timeout} argument can specify
the timeout value for threads to run. As in @code{thread-join!},
it can be either a @code{<time>} object to specify an absolute
point of time, a real number for the number of seconds from
the current time, or @code{#f} to run it indefinitely.
The default is @code{#f}.

If timeout reaches, @var{timeout-val} is used in place of
the result of @var{proc}, which is defaulted to @code{#f}.
@c JP
省略可能な@var{timeout}引数で、スレッドにタイムアウトを指定できます。
@code{thread-join!}のtimeout引数と同様に、@code{<time>}オブジェクトによる
絶対時刻、実数による現在時刻からの相対秒、そしてタイムアウトを設けない@code{#f}が
指定できます。省略時はタイムアウトしません。

タイムアウトした場合、@var{timeout-val}が@var{proc}の結果の代わりに使われます。
@var{timeout-val}の省略時値は@code{#f}です。
@c COMMON

@c EN
NB: If @var{timeout} reaches, the running threads are terminated by
@code{thread-terminate!}. If the thread is locking a mutex
when it occurs, the mutex becomes `abandoned' state. The resources
being used then may not be properly cleaned up. If you need
to ensure proper cleanup in bounded time, you need to code it in @var{proc}
explicitly.
@c JP
註: スレッドがタイムアウトした場合、
走行中のスレッドは@code{thread-terminate!}で終了させられます。
この時スレッドがmutexをロックしていればそれはabandoned状態になりますし、
リソースを掴んでいたら適切なクリーンアップがなされないかもしれません。
スレッドの実行を制限時間内に納め、かつクリーンアップを確実にしたい場合は、
そういうロジックを明示的に@var{proc}に組み込んでください。
@c COMMON
@end defun

Expand Down
13 changes: 10 additions & 3 deletions lib/control/pmap.scm
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
(use gauche.mop.singleton)
(use data.queue)
(use scheme.list)
(use srfi.19)
(use control.thread-pool)
(use control.job)
(export pmap pfind pany
Expand Down Expand Up @@ -255,11 +256,17 @@
(make <fully-concurrent-mapper> :timeout timeout :timeout-val timeout-val))

(define-method run-map ((mapper <fully-concurrent-mapper>) proc coll)
(let ([ts (map (^e (make-thread (^[] (proc e)))) coll)]
[timeout (~ mapper'timeout)]
(let ([unique (list #f)]
[ts (map (^e (make-thread (^[] (proc e)))) coll)]
[timeout (absolute-time (~ mapper'timeout))]
[timeout-val (~ mapper'timeout-val)])
(%start-threads ts)
(map (cut thread-join! <> timeout timeout-val) ts)))
(if timeout
($ map (^r (if (and (pair? r) (eq? (car r) unique))
(begin (thread-terminate! (cdr r)) timeout-val)
r))
$ map (^t (thread-join! t timeout (cons unique t))) ts)
(map thread-join! ts))))

(define-method run-select ((mapper <fully-concurrent-mapper>) proc coll)
(define signaled (atom #f))
Expand Down
9 changes: 8 additions & 1 deletion test/control.scm
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,14 @@
(terminate-all! pool))))
(test* "pmap (fully concurrent)"
(map (cut * <> 2) (iota 25))
(pmap (cut * <> 2) (iota 25) :mapper (make-fully-concurrent-mapper)))]
(pmap (cut * <> 2) (iota 25) :mapper (make-fully-concurrent-mapper)))
(let ([flag #t])
(test* "pmap (fully concurrent, timeout)"
(and flag '(timeout timeout timeout timeout timeout))
(pmap (^n (begin (sys-pause) (set! flag #t) n))
(iota 5)
:mapper (make-fully-concurrent-mapper 0.2 'timeout))))
]
[else])


Expand Down

0 comments on commit fcb86b3

Please sign in to comment.