forked from gambit/gambit
-
Notifications
You must be signed in to change notification settings - Fork 1
/
dc.scm
703 lines (588 loc) · 23 KB
/
dc.scm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
;==============================================================================
; File: "dc.scm", Time-stamp: <2008-12-15 11:52:03 feeley>
; Copyright (c) 2005-2008 by Marc Feeley, All Rights Reserved.
;==============================================================================
(##namespace ("dc#"))
(##include "~~lib/gambit#.scm")
(##include "dc#.scm")
(declare
(standard-bindings)
(extended-bindings)
(block)
(not safe)
)
;==============================================================================
; Implementation of the basic Termite procedures:
;
; (self) returns a reference to the current process
;
; (pid? obj) returns #t iff obj is a reference to a process
;
; (make-tag) returns a unique object (testable with eq?)
;
; (! to msg) sends "msg" to the process referenced by "to"
;
; (?) retrieves oldest message from the process' mailbox
;
; (!? to msg) sends the message #(from tag msg) to the process referenced
; by "to" (where "from" is a reference to the current process
; and "tag" is a unique object) and waits for a message of the
; form #(tag result) and returns "result"
(define self current-thread)
(define pid? thread?)
(define make-tag gensym)
(define ! thread-send)
(define ? thread-receive)
(define (!? to msg)
(let ((tag (gensym)))
(thread-send to (vector (current-thread) tag msg))
(recv (#(,tag result) result))))
;==============================================================================
; Implementation of the distributed computing features of Termite:
;
; (make-tcp-node ip-address #!optional port-num name)
; Returns a reference to a node accessible with TCP/IP.
;
; (goto node)
; Moves the current process to the given node.
;------------------------------------------------------------------------------
; Types useful for identifying nodes in a distributed system.
(define-type node-locals
id: 37f40836-0e86-495d-b8c2-c0aeea3f0898
node ; thread representing the node
node-id ; node id
exported-objects-obj2descr ; table mapping objects to descriptors
exported-objects-descr2obj ; inverse mapping of previous table
exported-objects-mutex ; mutex for accessing the tables
last-serial-number ; serial number counter
initial-continuation ; initial continuation
)
(define (spawn-thread thunk)
(let* ((locals (current-node-locals))
(thread (make-thread thunk)))
(node-locals-set thread locals) ; all threads know on which node they are
(thread-start! thread)))
(define (current-node-locals)
(node-locals-get (current-thread)))
(define (node-locals-get thread)
(thread-specific thread))
(define (node-locals-set thread locals)
(thread-specific-set! thread locals))
; A TCP/IP address contains the IP address and the port number.
(define-type tcp-address
id: b721e7aa-ea98-4e07-aea3-aa073aebe32f
ip-address ; a 4 or 16 element u8vector, e.g. #u8(192 168 0 100)
port-num ; a 16 bit exact integer
)
; A "node id" identifies a node globally. In a distributed system
; there cannot be two different nodes with the same node id (where
; "same" is determined by equal?).
(define-type tcp-node-id
id: 4332e406-4f1f-498f-a7ae-cd8917d8fc67
tcp-address ; TCP/IP address of the server
name ; name of the node on this server (any Scheme object
; which can be tested with equal?)
)
;------------------------------------------------------------------------------
; Implementation of make-tcp-node procedure.
(define default-tcp-node-port-number 9000)
(define default-tcp-node-name #f)
(define (make-tcp-node
ip-address
#!optional
(port-num default-tcp-node-port-number)
(name default-tcp-node-name))
(let ((ip-adr
(if (string? ip-address)
(main-ip-address ip-address)
ip-address)))
(node-id->node
(make-tcp-node-id (make-tcp-address ip-adr port-num) name))))
;------------------------------------------------------------------------------
; Tables to maintain a global address space for objects.
(define (current-node)
(let ((locals (current-node-locals)))
(node-locals-node locals)))
(define (current-node-id)
(let ((locals (current-node-locals)))
(node-locals-node-id locals)))
(define (current-node-name)
(let ((locals (current-node-locals)))
(tcp-node-id-name (node-locals-node-id locals))))
(define (enter-exported-object obj descr)
(let ((locals (current-node-locals)))
(table-set! (node-locals-exported-objects-obj2descr locals) obj descr)
(table-set! (node-locals-exported-objects-descr2obj locals) descr obj)))
(define (object->descr obj create-descr)
(let ((locals (current-node-locals)))
(mutex-lock! (node-locals-exported-objects-mutex locals))
(let ((descr
(or (table-ref (node-locals-exported-objects-obj2descr locals)
obj
#f)
(let ((d (create-descr obj)))
(enter-exported-object obj d)
d))))
(mutex-unlock! (node-locals-exported-objects-mutex locals))
descr)))
(define (descr->object descr create-object)
(let ((locals (current-node-locals)))
(mutex-lock! (node-locals-exported-objects-mutex locals))
(let ((object
(or (table-ref (node-locals-exported-objects-descr2obj locals)
descr
#f)
(let ((obj (create-object descr)))
(enter-exported-object obj descr)
obj))))
(mutex-unlock! (node-locals-exported-objects-mutex locals))
object)))
(define (new-local-serial-number)
(let* ((locals (current-node-locals))
(n (+ (node-locals-last-serial-number locals) 1)))
(node-locals-last-serial-number-set! locals n)
n))
; Type definition for globalized object descriptors.
(define-type globalized-object
id: 1eb6750f-f6a5-444c-a05d-701809958f2e
extender: define-type-of-globalized-object
; the creator and serial-num denote the object's identity
creator ; the node or node id where this object was created
serial-num ; the serial-number of this object on that node
)
; Handling of globalized uninterned symbols.
(define-type-of-globalized-object globalized-uninterned-symbol
id: 857a112b-78f8-4940-936e-bc7021c49569
equality-skip: ; the following fields are not used by equal? (so that
; equal? tests object identity)
name
hash
global-var?
)
(define (globalize-uninterned-symbol obj)
(object->descr
obj
(lambda (obj)
(make-globalized-uninterned-symbol
(current-node-id)
(new-local-serial-number)
(symbol->string obj)
(symbol-hash obj)
(##global-var? obj)))))
(define (localize-uninterned-symbol descr)
(let ((sym
(descr->object
descr
(lambda (descr)
(make-uninterned-symbol
(string-append "new-" (globalized-uninterned-symbol-name descr))
(globalized-uninterned-symbol-hash descr))))))
(if (globalized-uninterned-symbol-global-var? descr)
(##make-global-var sym))
sym))
; Handling of globalized threads.
(define-type-of-globalized-object globalized-thread
id: eaf68ece-be2d-451d-b3be-c6f9106cac40
equality-skip: ; the following fields are not used by equal? (so that
; equal? tests object identity)
home ; the node where this thread was last located
stamp ; exact integer stamp on location information (so that more recent
; location information takes precedence over older information)
)
(define (globalize-thread obj)
(object->descr
obj
(lambda (obj)
(let ((cnode (current-node)))
(make-globalized-thread
(current-node-id)
(new-local-serial-number)
cnode
0)))))
(define (node-id->node node-id)
(localize-thread (node-id->descr node-id)))
(define (node-id->descr node-id)
(make-globalized-thread node-id #f #f #f))
(define (localize-thread descr)
(let ((thread
(descr->object descr create-thread-proxy-from-descr)))
(if (globalized-thread-home descr)
(update-home! (globalize-thread thread) descr))
thread))
(define (create-thread-proxy-from-descr descr)
(spawn-thread (lambda () (become-proxy descr))))
(define (become-proxy descr)
(if (globalized-thread-home descr)
(become-thread-proxy descr)
(become-node-proxy descr)))
(define (update-home! descr new-descr)
(if (> (globalized-thread-stamp new-descr)
(globalized-thread-stamp descr))
(let ((locals (current-node-locals)))
(mutex-lock! (node-locals-exported-objects-mutex locals))
(globalized-thread-home-set!
descr
(globalized-thread-home new-descr))
(globalized-thread-stamp-set!
descr
(globalized-thread-stamp new-descr))
(mutex-unlock! (node-locals-exported-objects-mutex locals)))))
(define (move-home! descr node)
(let ((locals (current-node-locals)))
(mutex-lock! (node-locals-exported-objects-mutex locals))
(globalized-thread-home-set! descr node)
(let ((stamp (globalized-thread-stamp descr)))
(globalized-thread-stamp-set! descr (+ stamp 1)))
(mutex-unlock! (node-locals-exported-objects-mutex locals))))
; Handling of node messages.
(define-type homecoming-message
id: f92c9d35-1aff-42d6-8652-3f22611bc99a
thread ; thread that is migrating to this node
cont ; thread's continuation
)
(define-type forward-message
id: 8c209f18-baf7-4964-b949-d3e9bbec64e6
message ; message that was sent
destination ; thread where the message is destined
path ; list of nodes that forwarded this message
)
(define-type update-home-message
id: 452ce3ce-fa2a-4f83-9977-e7ae506f2023
thread ; thread to update
)
; for debugging:
(##define-macro (debug . lst) #f)
'
(define (debug . lst)
(for-each display lst)
(newline))
(define (nod)
(tcp-node-id-name (current-node-id)))
(define (2str x)
(object->string x))
(define (become-node node-id thunk)
((call-with-current-continuation
(lambda (cont)
(let ((locals
(make-node-locals (current-thread)
node-id
(make-table test: eq?)
(make-table test: equal?)
(make-mutex)
0
cont)))
(node-locals-set (current-thread) locals))
(enter-exported-object (current-thread) (node-id->descr node-id))
(thunk)
(let loop ()
(let ((msg (thread-receive)))
(debug "NODE " (nod) " <--- " (2str msg))
(cond ((homecoming-message? msg)
(let ((thread (homecoming-message-thread msg)))
(thread-send thread msg)) ; let thread proxy handle it
(loop))
((forward-message? msg)
(let ((message (forward-message-message msg))
(destination (forward-message-destination msg))
(path (forward-message-path msg)))
(let* ((descr (globalize-thread destination))
(home (globalized-thread-home descr))
(node (current-node)))
(if (eq? home node)
(begin
(debug "NODE " (nod) " updating " (cdr path))
(for-each
(lambda (n)
(thread-send n
(make-update-home-message
destination)))
(cdr path))
(thread-send destination message))
(thread-send home
(make-forward-message
message
destination
(cons node path))))))
(loop))
((update-home-message? msg)
(loop))
(else
(error "node received unknown message" msg)))))))))
; Handling of node proxy messages.
(define (become-node-proxy descr)
(debug "NODE " (tcp-node-id-name (globalized-object-creator descr)) " PROXY STARTING " (2str (current-thread)))
(let loop1 ()
(let ((msg (thread-receive)))
(debug "NODE " (tcp-node-id-name (globalized-object-creator descr)) " PROXY opening TCP connection")
(let* ((node-id
(globalized-object-creator descr))
(connection
(open-tcp-client
(list server-address:
(tcp-address-ip-address
(tcp-node-id-tcp-address node-id))
port-number:
(tcp-address-port-num
(tcp-node-id-tcp-address node-id))))))
(write-object (tcp-node-id-name node-id) connection)
(let loop2 ((msg msg))
(debug "NODE " (tcp-node-id-name (globalized-object-creator descr)) " PROXY ===> " (2str msg))
(write-object msg connection)
(force-output connection)
(let ((next-msg (thread-receive 10 connection)))
(if (eq? next-msg connection)
(begin
(debug "NODE " (tcp-node-id-name (globalized-object-creator descr)) " PROXY closing TCP connection")
(close-port connection)
(loop1))
(loop2 next-msg))))))))
; Handling of thread proxy messages.
(define (become-thread-proxy descr)
(debug "THREAD " (tcp-node-id-name (globalized-object-creator descr)) " sn=" (globalized-object-serial-num descr) " PROXY STARTING " (2str (current-thread)))
(let loop ()
(let ((msg (thread-receive)))
(debug "THREAD " (tcp-node-id-name (globalized-object-creator descr)) " sn=" (globalized-object-serial-num descr) " PROXY <--- " msg)
(cond ((and (homecoming-message? msg)
(eq? (homecoming-message-thread msg)
(current-thread)))
(let ((cont (homecoming-message-cont msg)))
(cont (void)))) ; return from call/cc in goto
(else
(let ((home (globalized-thread-home descr)))
(thread-send home
(make-forward-message
msg
(current-thread)
(list (current-node))))
(loop)))))))
; Implementation of goto procedure.
(define (goto node)
(call-with-current-continuation
(lambda (cont)
((node-locals-initial-continuation ; reset current continuation
(current-node-locals))
(lambda ()
(let* ((ct (current-thread))
(descr (globalize-thread ct)))
(move-home! descr node) ; set the new home
(thread-send node (make-homecoming-message ct cont))
(become-thread-proxy descr)))))))
; Implementation of on procedure.
(define (on node thunk)
(let ((here (current-node)))
(goto node)
(let ((result (thunk)))
(goto here)
result)))
; Handling of globalized ports.
(define-type-of-globalized-object globalized-port
id: 4cef3774-be86-43f1-ab18-f32f3cdedd16
equality-skip: ; the following fields are not used by equal? (so that
; equal? tests object identity)
in-thread
out-thread
name
)
(define (make-port-to-thread-pump port thread)
(spawn-thread
(lambda ()
(let loop ()
(let ((x (read-char port)))
(thread-send thread x)
(if (not (eof-object? x))
(loop)
(close-input-port port)))))))
(define (make-thread-to-port-pump port)
(spawn-thread
(lambda ()
(let loop ()
(let ((x (thread-receive)))
(if (not (eof-object? x))
(begin
(write-char x port)
(force-output port)
(loop))
(close-output-port port)))))))
(define (make-input-port-server port)
(spawn-thread
(lambda ()
(let loop ()
(let ((t (thread-receive)))
(let ((x (read-char port)))
(thread-send t x)
(if (not (eof-object? x))
(loop)
(close-input-port port))))))))
(define (make-input-port-server-to-port-pump thread oport iport)
(define (wake-up-reader t)
(input-port-timeout-set!
iport
-inf.0 ; time out immediately if no input available
(lambda ()
(input-port-timeout-set! iport +inf.0) ; block on next read
(thread-send thread t) ; ask for some more input
#t))) ; thread will block when this procedure returns
(let ((t
(make-thread
(lambda ()
(let loop ()
(let ((x (thread-receive)))
(if (not (eof-object? x))
(begin
(write-char x oport)
(force-output oport)
(wake-up-reader (current-thread))
(loop))
(close-output-port oport))))))))
(wake-up-reader t)
(thread-start! t)))
(define (globalize-port obj)
(object->descr
obj
(lambda (obj)
(make-globalized-port
(current-node-id)
(new-local-serial-number)
(and (input-port? obj)
(make-input-port-server obj))
(and (output-port? obj)
(make-thread-to-port-pump obj))
(##port-name obj)))))
(define (localize-port descr)
(descr->object
descr
(lambda (descr)
(let ((in-thread (globalized-port-in-thread descr))
(out-thread (globalized-port-out-thread descr)))
(if out-thread
(if in-thread
(receive (ioport oiport) (open-string-pipe)
(make-port-to-thread-pump oiport out-thread)
(make-input-port-server-to-port-pump in-thread oiport ioport)
ioport)
(receive (oport iport) (open-string-pipe '(direction: output))
(make-port-to-thread-pump iport out-thread)
oport))
(receive (iport oport) (open-string-pipe '(direction: input))
(make-input-port-server-to-port-pump in-thread oport iport)
iport))))))
;------------------------------------------------------------------------------
; Serialization/deserialization of objects.
(define (externalize obj)
(cond ((uninterned-symbol? obj)
(globalize-uninterned-symbol obj))
((thread? obj)
(globalize-thread obj))
((port? obj)
(globalize-port obj))
(else
obj)))
(define (internalize obj)
(cond ((globalized-uninterned-symbol? obj)
(localize-uninterned-symbol obj))
((globalized-thread? obj)
(localize-thread obj))
((globalized-port? obj)
(localize-port obj))
(else
obj)))
(define (write-object obj port)
(let* ((serialized-obj
(object->u8vector obj externalize))
(len
(u8vector-length serialized-obj))
(serialized-len
(u8vector (bitwise-and len #xff)
(bitwise-and (arithmetic-shift len -8) #xff)
(bitwise-and (arithmetic-shift len -16) #xff)
(bitwise-and (arithmetic-shift len -24) #xff))))
(write-subu8vector serialized-len 0 4 port)
(write-subu8vector serialized-obj 0 len port)))
(define (read-object port)
(let* ((serialized-len
(u8vector 0 0 0 0))
(n
(read-subu8vector serialized-len 0 4 port)))
(cond ((= 0 n)
#!eof)
((not (= 4 n))
(error "deserialization error"))
(else
(let* ((len
(+ (u8vector-ref serialized-len 0)
(arithmetic-shift (u8vector-ref serialized-len 1) 8)
(arithmetic-shift (u8vector-ref serialized-len 2) 16)
(arithmetic-shift (u8vector-ref serialized-len 3) 24)))
(serialized-obj
(make-u8vector len))
(n
(read-subu8vector serialized-obj 0 len port)))
(if (not (eqv? len n))
(error "deserialization error")
(u8vector->object serialized-obj internalize)))))))
;------------------------------------------------------------------------------
(define (become-tcp-node port-num node-name thunk)
(become-node
(make-tcp-node-id
(make-tcp-address (main-ip-address (host-name)) port-num)
node-name)
(lambda ()
(tcp-server-register-node-locals port-num node-name (current-node-locals))
(start-tcp-server port-num)
(spawn-thread thunk))))
(define tcp-server-registry (make-table test: equal?))
(define tcp-server-registry-mutex (make-mutex))
(define (tcp-server-lookup-node-locals port-num node-name)
(let ((key (cons port-num node-name)))
(mutex-lock! tcp-server-registry-mutex)
(let ((result (table-ref tcp-server-registry key #f)))
(mutex-unlock! tcp-server-registry-mutex)
result)))
(define (tcp-server-register-node-locals port-num node-name locals)
(let ((key (cons port-num node-name)))
(mutex-lock! tcp-server-registry-mutex)
(table-set! tcp-server-registry key locals)
(mutex-unlock! tcp-server-registry-mutex)))
(define (start-tcp-server port-num)
(spawn-thread
(lambda ()
; The call to open-tcp-server will raise an exception and
; terminate this thread if a tcp-server is currently
; running. This ensures the uniqueness of the server.
(let ((connection-port
(with-exception-catcher
(lambda (e)
(thread-terminate! (current-thread)))
(lambda ()
(open-tcp-server
(list server-address: "*"
port-number: port-num
backlog: 1024
reuse-address: #t))))))
(let loop ()
(let ((connection (read connection-port)))
(tcp-server-connection-request-handle port-num connection))
(loop))))))
(define (tcp-server-connection-request-handle port-num connection)
(spawn-thread
(lambda ()
(let ((node-name (read-object connection)))
(debug "RELAY " node-name " STARTING " (2str (current-thread)))
(if (not (eof-object? node-name))
(let ((locals
(tcp-server-lookup-node-locals port-num node-name)))
(if locals
(begin
(node-locals-set (current-thread) locals)
(let loop ()
(let ((msg (read-object connection)))
(if (not (eof-object? msg))
(begin
(debug "RELAY " node-name " <=== " (2str msg))
(thread-send (node-locals-node locals) msg)
(loop))))))
(error "received connection request for nonexistent node"))))
(close-port connection)))))
(define (main-ip-address host)
(car (host-info-addresses (host-info host))))
;==============================================================================