/
main.rkt
210 lines (193 loc) · 7.77 KB
/
main.rkt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
#lang racket/base
(require racket/contract
racket/list
racket/match
racket/tcp
racket/async-channel
racket/date
mzlib/thread
file/md5)
;; A queue is a list of workunits along with an asynchronous channel
;; that serializes unsafe actions on that queue.
(struct queue (workunits clients-waiting-for-work) #:mutable)
(define workunit-key? any/c)
(define workunit-status? (or/c 'waiting 'running 'done 'error))
;; A workunit is a (potentially completed) item of work to be handed
;; out to clients.
(struct workunit (key
status
client
result
data
on-complete-thunks
last-status-change)
#:mutable)
(provide (struct-out workunit))
(provide/contract
[start-queue-server (-> exact-integer? any/c)]
;; don't use these:
[make-queue (-> queue?)]
[queue-ref (-> queue? workunit-key? (or/c workunit? #f))]
[make-workunit-key (-> any/c workunit-key?)]
[queue-add-workunit! (-> queue? workunit-key? any/c any/c)]
[queue-on-workunit-completion (-> queue? workunit-key? any/c any/c)]
[queue-call-with-work! (-> queue? any/c (-> workunit-key? boolean?) any/c)]
[queue-complete-workunit! (-> queue? workunit-key? boolean? any/c any/c)])
(define (make-queue)
(queue (make-hash) (list)))
;; Get the given workunit.
(define (queue-ref queue key [default #f])
(hash-ref (queue-workunits queue) key default))
(define (make-workunit-key data)
(bytes->string/utf-8 (md5 (format "~s" data))))
(define (queue-pick-workunit queue status)
(for/first ([(key wu) (in-hash (queue-workunits queue))]
#:when (eq? status (workunit-status wu)))
wu))
(define (queue-dispatch-work! queue)
;; If there are clients waiting for work, well send it to them gosh
;; golly
(when (not (empty? (queue-clients-waiting-for-work queue)))
(define next-wu (queue-pick-workunit queue 'waiting))
(when next-wu
(match-define (list client client-thunk)
(first (queue-clients-waiting-for-work queue)))
(set-queue-clients-waiting-for-work!
queue
(rest (queue-clients-waiting-for-work queue)))
;; The client thunk can choose to reject this workunit, for
;; example if the client disconnects before we can give them
;; something to work on. In that case, we'll just remove their
;; thunk from our list of idle clients.
(when (client-thunk next-wu)
(set-workunit-status! next-wu 'running)
(set-workunit-client! next-wu client)
(set-workunit-last-status-change!
next-wu
(current-inexact-milliseconds)))
(queue-dispatch-work! queue))))
;; Add work to the queue
(define (queue-add-workunit! queue key data)
(unless (hash-has-key? (queue-workunits queue) key)
(define wu (workunit key 'waiting #f #f data '()
(current-inexact-milliseconds)))
(hash-set! (queue-workunits queue) key wu)
(queue-dispatch-work! queue))
key)
;; Call thunk with a workunit key when there's more work available.
;; A client will call this to register their willingness to perform
;; work, for example.
(define (queue-call-with-work! queue client thunk)
(define client-thunk-data (list client thunk))
(set-queue-clients-waiting-for-work!
queue
(append (queue-clients-waiting-for-work queue)
(list client-thunk-data)))
(queue-dispatch-work! queue))
;; Add a thunk to be called when the given workunit finishes.
(define (queue-on-workunit-completion queue key thunk)
(define wu (queue-ref queue key))
(when wu
(case (workunit-status wu)
[(done error) (thunk wu)]
[else (set-workunit-on-complete-thunks! wu
(cons thunk (workunit-on-complete-thunks wu)))])))
;; Called when a client finishes a workunit.
(define (queue-complete-workunit! queue key error? result)
(define wu (queue-ref queue key))
(when wu
(set-workunit-status! wu (if error? 'error 'done))
(set-workunit-result! wu result)
(set-workunit-last-status-change! wu
(current-inexact-milliseconds))
(for ([thunk (in-list (workunit-on-complete-thunks wu))])
(thunk wu))
(set-workunit-on-complete-thunks! wu '())))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; The actual server.
(define (start-queue-server port)
(date-display-format 'iso-8601)
(define (log . msg)
(printf "[~a] ~a\n" (date->string (current-date) (current-seconds))
(apply format msg))
(flush-output))
(define chan (make-async-channel))
(define q (make-queue))
(thread (λ() (let loop () ((async-channel-get chan)) (loop))))
;; put thunks on this channel to execute them.
(define (handle-cxn in out)
(define handler-cust (current-custodian))
(define-syntax-rule (errguard-λ (args ...) body ...)
(λ (args ...)
(with-handlers ([exn:fail:network? (λ(ex) (displayln "Net err"))]
[exn:fail?
(λ(ex)
((error-display-handler)
(format "Client error: ~a"
(exn-message ex))
ex)
(flush-output)
(custodian-shutdown-all handler-cust)
#f)])
body ...)))
(define-syntax-rule (q-action body ...)
(async-channel-put chan
(errguard-λ () body ...)))
(custodian-limit-memory handler-cust (* 10 1024 1024))
(define (send datum)
(write datum out)
(display "\n" out)
(flush-output out))
(let/ec exit
(match-define (list 'hello-from client) (read in))
(define-values (my-ip your-ip) (tcp-addresses out))
(log "new connection: ~s ip: ~a" client your-ip)
(let loop ()
(match (read in)
[(? eof-object?) (log "disconnect: ~a" client) (exit)]
[(list 'workunit-info wu-key)
(q-action
(define wu (queue-ref q wu-key))
(match-define
(workunit key status wu-client result data _ last-change)
(or wu (workunit wu-key #f #f #f #f #f #f)))
(send (list 'workunit key status wu-client result last-change)))]
[(list 'wait-for-work)
(log "~s is waiting for work" client)
(q-action
(queue-call-with-work! q client
(errguard-λ (wu)
(log "assigned ~a to ~s" (workunit-key wu) client)
(send (list 'assigned-workunit
(workunit-key wu)
(workunit-data wu)))
(flush-output)
#t ;; accept this one
)))]
[(list 'add-workunit! data)
(q-action
(define new-key (make-workunit-key data))
(log "workunit: ~a" new-key)
(queue-add-workunit! q new-key data)
(send (list 'added-workunit new-key)))]
[(list 'monitor-workunit-completion key)
(q-action
(queue-on-workunit-completion
q
key
(errguard-λ (wu)
(send (list 'workunit-complete
key
(workunit-status wu) ;; may be error, for ex
(workunit-client wu)
(workunit-result wu))))))]
[(list 'complete-workunit! key error? result)
(if error?
(log "~s FAILED workunit: ~a" client key)
(log "~s completed workunit: ~a" client key))
(q-action
(queue-complete-workunit! q key error? result))]
[other (error "wasn't expecting this from client:" other)])
(loop))))
(log "Listening for connections on port ~a" port)
(run-server port handle-cxn #f))