Permalink
Browse files

Add simple worker pool.

  • Loading branch information...
1 parent 608ad5b commit 86344202d55d644823cbce3e00fb6956b5f4c2d9 @greghendershott committed Sep 7, 2012
Showing with 98 additions and 0 deletions.
  1. +98 −0 pool.rkt
View
@@ -0,0 +1,98 @@
+#lang racket
+
+(require racket/async-channel
+ "util.rkt"
+ )
+
+(provide (struct-out pool)
+ make-worker-pool
+ delete-worker-pool
+ with-worker-pool
+ add-job
+ get-results
+ )
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+
+;; This probably should get moved to a general-purpose utility library.
+;;
+;; Perhaps someday it should get interface compatible variations that
+;; use (say) places as well as threads.
+
+(struct pool
+ (threads ;(listof thread?)
+ todo ;async-channel? expected to contain (-> any/c) job procs
+ done ;async-channel? expected to contain any/c results
+ ))
+;; Don't need to provide this; opaque to users.
+
+(define/contract (make-worker-pool num-threads)
+ (exact-positive-integer? . -> . pool?)
+ (define todo (make-async-channel))
+ (define done (make-async-channel))
+ (define (worker-thread)
+ (let loop ()
+ (let ([p (async-channel-get todo)])
+ (cond [(procedure? p)
+ (async-channel-put done (p))
+ (loop)]
+ [else
+ (error 'worker-thread "expected procedure, got" p)]))))
+ (pool (for/list ([n (in-range num-threads)])
+ (thread worker-thread))
+ todo
+ done))
+
+(define/contract (delete-worker-pool p)
+ (pool? . -> . any)
+ (for ([t (in-list (pool-threads p))])
+ (kill-thread t)))
+
+;; This is safer than using make-worker-pool and
+;; delete-worker-pool. If there is an exception, delete-worker-pool
+;; will still be called to clean up.
+(define/contract (with-worker-pool num-threads proc)
+ (exact-positive-integer? (pool? . -> . any) . -> . any)
+ (define p (make-worker-pool num-threads))
+ (dynamic-wind (lambda () (void))
+ (lambda () (proc p))
+ (lambda () (delete-worker-pool p))))
+
+;; A job is a procedure that takes no arguments, and returns any/c,
+;; which is put the done channel.
+(define/contract (add-job pool proc)
+ (pool? (-> any/c) . -> . any)
+ (async-channel-put (pool-todo pool) proc))
+
+;; Gets the specified number of results, blocking until they are
+;; available.
+(define/contract (get-results p n)
+ (pool? exact-nonnegative-integer? . -> . any/c)
+ (let loop ([xs '()]
+ [n n])
+ ;; (displayln (tr "get-results" n xs))
+ (cond [(zero? n) xs]
+ [else (loop (cons (async-channel-get (pool-done p)) xs)
+ (sub1 n))])))
+
+(module+ test
+ (require "run-suite.rkt")
+ (def/run-test-suite
+ (test-case
+ "worker pool"
+ (define (try num-threads num-jobs)
+ (define results
+ (with-worker-pool
+ num-threads
+ (lambda (pool)
+ (for ([i (in-range num-jobs)])
+ (add-job pool (lambda () (sleep (random)) i)))
+ (get-results pool num-jobs))))
+ (check-equal? (sort results <)
+ (for/list ([i (in-range num-jobs)])
+ i)))
+ ;; Try a few permutations of threads and jobs.
+ (try 4 20)
+ (try 1 10)
+ (try 10 1)
+ )))

0 comments on commit 8634420

Please sign in to comment.