Skip to content

Commit

Permalink
More client work.
Browse files Browse the repository at this point in the history
  • Loading branch information
gcr committed May 27, 2012
1 parent c27a0ed commit d640095
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 56 deletions.
75 changes: 53 additions & 22 deletions main.rkt
@@ -1,51 +1,82 @@
#lang racket

(require "client.rkt"
(for-syntax file/md5)
racket/serialize
mzlib/os
web-server/lang/serial-lambda)

(define workunit? string?)
(provide do-work connect-to-queue)
(provide current-client
do-work
for/work
)
(provide/contract
;; Ask for workers to dynamic-require the module path and call the
;; symbol with the given args. Registering this workunit costs one
;; roundtrip.
[do-work/call (->* (client? module-path? symbol?)
[connect-to-riot-server!
(->* (string?) (exact-integer? string?) any/c)]
[do-work/call (->* (module-path? symbol?)
#:rest (listof serializable?)
workunit?)]
;; Asks workers to eval the given datum. Costs one roundtrip.
[do-work/eval (-> client? serializable? workunit?)]
[do-work/eval (-> serializable? workunit?)]
;; Asks workers to evaluate the given serial-lambda.
[do-work/serial-lambda (-> client? serializable? workunit?)]
[do-work/serial-lambda (-> serializable? any/c workunit?)]
;; Wait until the cluster has finished the given workunit, and
;; either erorr with the given message or throw an exception
[wait-until-done (-> client? workunit? any/c)]
[wait-until-done (-> workunit? any/c)]
;; Calls the given thunk in a new thread when the workunit completes.
;; The first argument is #t if it error'd and #f if not. The second
;; argument is the client that finished this workunit, and the final
;; argument is the result.
[call-when-done (-> client? workunit? (-> boolean? any/c any/c any/c)
[call-when-done (-> workunit? (-> boolean? any/c any/c any/c)
any/c)])

(define-syntax-rule (do-work client body ...)
(do-work/serial-lambda client (serial-lambda () body ...)))
(define-syntax (do-work stx)
(syntax-case stx ()
[(_ body ...)
(let ([t (bytes->string/utf-8
(md5 (format "~s" (syntax->datum stx))))])
;; A 'tag' is an md5sum of the body of the code. Checking
;; this ensures that the client will run the same version of
;; the code that master does.
#`(do-work/serial-lambda #,t
(serial-lambda (tag)
(cond
[(equal? tag #,t)
body ...]
[else (error 'worker "This worker has the wrong version of the code! Please restart the worker with the correct version.")]))))]))

(define (do-work/call client module-path symbol . args)
(client-add-workunit client
(define-syntax-rule (for/work for-decl body ...)
(let ([workunits
(for/list for-decl (do-work body ...))])
(for/list ([p workunits]) (wait-until-done p))))

(define current-client (make-parameter "not connected to a server"))
(define (connect-to-riot-server! host [port 2355] [client-name (gethostname)])
(current-client (connect-to-queue host port client-name)))


(define (do-work/call module-path symbol . args)
(client-add-workunit (current-client)
(list* 'call-module module-path symbol
(map cleanup-serialize args))))

(define (do-work/serial-lambda client lambda)
(client-add-workunit client
(list* 'serial-lambda (cleanup-serialize lambda))))
(define (do-work/serial-lambda nonce lambda)
(client-add-workunit (current-client)
(list 'serial-lambda nonce
(cleanup-serialize lambda))))

(define (do-work/eval client datum)
(client-add-workunit client
(list* 'eval datum)))
(define (do-work/eval datum)
(client-add-workunit (current-client)
(list 'eval datum)))

(define (wait-until-done client workunit)
(define (wait-until-done workunit)
(match-define (list key status client-name result)
(client-wait-for-finished-workunit client workunit))
(client-wait-for-finished-workunit
(current-client) workunit))
(if (equal? status 'done)
;; Finished
result
Expand All @@ -56,12 +87,12 @@
client-name
result)))

(define (call-when-done client workunit thunk)
(client-call-with-finished-workunit client workunit
(define (call-when-done workunit thunk)
(client-call-with-finished-workunit (current-client) workunit
(λ (key status client-name result)
(if (equal? status 'done)
(thunk #f client result)
(thunk #t client result)))))
(thunk #f client-name result)
(thunk #t client-name result)))))

(define (cleanup-serialize datum)
;; Serialize datum, while being sure to clean up the paths
Expand Down
63 changes: 55 additions & 8 deletions test.rkt
@@ -1,14 +1,22 @@
#lang racket
(require (planet gcr/riot))
(require mzlib/os
(planet gcr/riot))

(define cloud (connect-to-queue "localhost" 2355))

(define workunits
(for/list ([i (in-range 10)])
(do-work cloud
(+ i 5))))
(define (run)
(displayln "Running")
(for/work ([i (in-range 10)])
(displayln "Hello from client")
(sleep 1)
(+ 5 i)))

(for/list ([p workunits]) (wait-until-done cloud p))
(module+ main
;; Note that we're NOT running this in the toplevel. If you use the
;; special `do-work' form, workers will (require) the module that
;; contains the code to run, and we don't want them submitting their
;; own workunits.
(connect-to-riot-server! "localhost" 2355)
(run))

;; Queue would be responsible for
;; - generating a UUID for this director node
Expand All @@ -21,4 +29,43 @@

;; Maybe use web-server/lang/serial-lambda ?

;; racket -p gcr/bonk/client --queue localhost:6344 --file test.rkt
;; racket -p gcr/bonk/client --queue localhost:6344 --file test.rkt

;; REMEMBER to NEVER put a (do-work) call in the toplevel! If this
;; file gets required by workers (which happens if you use do-work or
;; friends), each worker might queue up workunits themselves!


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

#|
Constraints on (do-work) and (for/work):
- Must be in its own file; won't work from toplevel.
- All variables that the body refers to must be serializable. This means
using serialize-struct instead of normal struct()s.
- Related: When workers attempt to execute a workunit created by a
do-work form, they (require) the module and search for the code to be
executed. This has a number of implications:
- If you're running workers on other machines accross a network, the
module you're running must be present on all of the worker machines.
- You must start the worker process in the same working directory
relative to your master, so each of the workers can find the module.
For example, if test.rkt lives in /home/michael/project/test.rkt on
the master machine and in /tmp/project/test.rkt on the workers, you
must start your racket process like this on the master:
cd /home/michael/project; racket test.rkt
and this on the workers:
cd /tmp/project; racket -p gcr/riot/worker
- If you change the code, you must copy the module to each of the
worker machines in turn and restart the workers.
- If a worker process crashes while executing a workunit, that
workunit may never complete.
|#
86 changes: 60 additions & 26 deletions worker.rkt
Expand Up @@ -4,48 +4,82 @@
racket/date
racket/cmdline
racket/match
racket/serialize
mzlib/os)

(define (process-workunit q key serialized-data)
;; Deserialize data
;; Parse it
;; Run it, with error handling
;; Submit results or failure message
(sleep 1)
(client-complete-workunit! q key #f 42)
)

(module+ main
(define port (make-parameter 2355))
(define name (make-parameter (gethostname)))
(define queue-host
(command-line
#:once-each
[("--port") p
"Queue port to connect to. Default: 2355"
(port (string->number p))]
[("--name") client-name
"Client name to report to server. Default is this machine's hostname"
(name client-name)]
#:args (host)
host))
(define (connect-and-work host name port)
(date-display-format 'iso-8601)
(define (log . msg)
(printf "[~a] ~a\n" (date->string (current-date) (current-seconds))
(apply format msg))
(flush-output))

(log "Connecting to ~a port ~a" queue-host (port))
(define q (connect-to-queue queue-host (port) (name)))
(log "Connecting to ~a port ~a" host port)
(define q (connect-to-queue host port name))
(log "Connected")

(define (process-data serialized-data)
(match serialized-data
[(list-rest 'call-module mod-path symbol sargs)
(define args (map deserialize sargs))
(log "Running mod: ~s symbol: ~a args: ~s" mod-path symbol args)
(apply (dynamic-require mod-path symbol) args)]
[(list 'serial-lambda tag sthunk)
(log "Running serialized lambda")
(match-define thunk (deserialize sthunk))
;; Thunk will error out if it has a different tag than it expects.
(thunk tag)]
[(list 'eval datum)
(log "Evaluating: ~s" datum)
(let ([ns (make-base-empty-namespace)])
(namespace-attach-module (current-namespace)
'racket/base
ns)
(parameterize ([current-namespace ns])
(namespace-require 'racket/base)
(eval datum)))]))

(define (run-workunit q key serialized-data)
;; Deserialize data
;; Parse it
;; Run it, with error handling
;; Submit results or failure message
(with-handlers ([exn:fail?
(λ (ex)
(log "Failed ~a:\n~a"
key
(exn-message ex))
(client-complete-workunit! q key #t
(exn-message ex))
#f)])
(client-complete-workunit! q key #f (process-data serialized-data))
#t))


(let loop ()
(log "Waiting for more work")
(match-define
(list key serialized-data)
(client-wait-for-work q))
(log "Starting workunit ~a" key)
(if (process-workunit q key serialized-data)
(if (run-workunit q key serialized-data)
(log "Successfully completed workunit ~a" key)
(log "Failed workunit ~a" key))
(loop)))

(module+ main
(define port (make-parameter 2355))
(define name (make-parameter (gethostname)))
(define queue-host
(command-line
#:once-each
[("--port") p
"Queue port to connect to. Default: 2355"
(port (string->number p))]
[("--name") client-name
"Client name to report to server. Default is this machine's hostname"
(name client-name)]
#:args (host)
host))

(connect-and-work queue-host (name) (port)))

0 comments on commit d640095

Please sign in to comment.