Skip to content


Subversion checkout URL

You can clone with
Download ZIP
Browse files

[Distributed Places] add solo connections and tcp-connection-died han…

  • Loading branch information...
commit 033cdc141378535c50dfdfd35c2a721e5333041f 1 parent ee46305
@tewk tewk authored
Showing with 65 additions and 23 deletions.
  1. +65 −23 collects/racket/place/distributed.rkt
88 collects/racket/place/distributed.rkt
@@ -335,6 +335,7 @@
(define/public (get-pid) pid)
+ (define/public (kill [force #t]) (subprocess-kill s force))
(define/public (wait-for-die) (subprocess-wait s))
(define/public (register nes)
(for/filter/fold/cons nes ([x (list s (list o "OUT") (list e "ERR"))])
@@ -389,6 +390,7 @@
(init-field [sub-ecs null])
(init-field [psbs null])
(init-field [spawned-nodes (make-hash)])
+ (init-field [solo-nodes (make-hash)])
(init-field [named-places (make-hash)])
(init-field [beacon #f])
(init-field [owner #f])
@@ -406,6 +408,10 @@
(hash-set! spawned-nodes key ec))
(define (find-spawned-node key)
(hash-ref spawned-nodes key #f))
+ (define (add-solo-node key ec)
+ (hash-set! solo-nodes key ec))
+ (define (find-solo-node key)
+ (hash-ref solo-nodes key #f))
(define (add-psb ec)
(set! psbs (append psbs (list ec))))
(define (add-named-place name np)
@@ -427,7 +433,8 @@
(add-place-channel-socket-bridge pch d ch-id)
(sconn-write-flush d (dcgm DCGM-TYPE-NEW-INTER-DCHANNEL src dest ch-id))]
[(or (place-channel? d) (place? d))
- (place-channel-put d m)])]
+ (place-channel-put d m)]
+ [else (raise (format "Unexpected channel type ~a" d))])]
[(dcgm 9 #;(== DCGM-TYPE-NEW-PLACE) -1 (and place-exec (list-rest type rest)) ch-id)
(match place-exec
[(list 'connect name)
@@ -470,8 +477,7 @@
(send pch forward msg)]
[(th-place-channel? pch)
(th-place-channel-put pch msg)]
- [else
- (raise "OOPS\n")])]
+ [else (raise (format "Unexpected channel type ~a" pch))])]
[(dcgm 6 #;(== DCGM-TYPE-SPAWN-REMOTE-PROCESS) src (list node-name node-port mod-path funcname) ch1)
(define node (spawn-remote-racket-node node-name #:listen-port node-port))
(for ([x (in-hash-values spawned-nodes)])
@@ -492,11 +498,15 @@
(set! owner src-channel)]
[(dcgm #;50 (== DCGM-NEW-NODE-CONNECT) -1 -1 (list node-name node-port))
(add-spawned-node (list node-name node-port) (new remote-node% [host-name node-name] [listen-port node-port]))]
- [(dcgm #;100 (== DCGM-CONTROL-NEW-NODE) -1 -1 (list node-name node-port))
+ [(dcgm #;100 (== DCGM-CONTROL-NEW-NODE) -1 solo (list node-name node-port))
(define node (spawn-remote-racket-node node-name #:listen-port node-port))
- (for ([x (in-hash-values spawned-nodes)])
- (send x notify-of-new-node node-name node-port))
- (add-spawned-node (list node-name node-port) node)]
+ (cond
+ [solo
+ (add-solo-node (list node-name node-port) node)]
+ [else
+ (for ([x (in-hash-values spawned-nodes)])
+ (send x notify-of-new-node node-name node-port))
+ (add-spawned-node (list node-name node-port) node)])]
[(dcgm #;101 (== DCGM-CONTROL-NEW-PLACE) dest -1 place-exec)
(define node (find-spawned-node dest))
(send node launch-place place-exec)]
@@ -547,11 +557,9 @@
[(is-a? x socket-connection%)
(sconn-get-forward-event x forward-mesg)]
[(or (place-channel? x) (place? x))
- (wrap-evt x (lambda (e)
- (forward-mesg e x)))]
+ (wrap-evt x (lambda (e) (forward-mesg e x)))]
[(channel? x)
- (wrap-evt x (lambda (e)
- (forward-mesg e x)))]
+ (wrap-evt x (lambda (e) (forward-mesg e x)))]
[else (raise (format "Unexpected channel type ~a" x))])
@@ -596,6 +604,11 @@
(for/fold ([n nes]) ([x (in-hash-values spawned-nodes)])
(send x register n))
+ [nes
+ (if solo-nodes
+ (for/fold ([n nes]) ([x (in-hash-values solo-nodes)])
+ (send x register n))
+ nes)]
[nes (register-beacon nes)]
@@ -642,7 +655,8 @@
[delay 1]
[background-connect #t]
[in #f]
- [out #f])
+ [out #f]
+ [remote-node #f])
(field [subchannels null]
[connecting #f]
[ch #f])
@@ -667,13 +681,20 @@
(set! out _out)
(set! connecting #f)])))
+ (define (handle-error e)
+ (cond
+ [remote-node => (lambda (n)
+ (send n tcp-connection-died host port))]
+ [else (raise (format "TCP connection to ~a:~a failed.\n" host port))]))
(define/public (add-subchannel id pch)
(set! subchannels (append subchannels (list (cons id pch)))))
(define/public (lookup-subchannel id) (cdr (assoc id subchannels)))
(define/public (_write-flush x)
(when (equal? out #f) (ensure-connected))
;(printf/f "SC ~a ~a\n" x out)
- (write-flush x out))
+ (with-handlers ([exn:fail? handle-error])
+ (write-flush x out)))
(define/public (remove-subchannel id)
(set! subchannels (filter-map
(lambda (x) (and (not (= (car x) id)) x))
@@ -682,7 +703,10 @@
(define/public (get-forward-event forwarder)
(when (equal? out #f) (ensure-connected))
(wrap-evt in (lambda (e)
- (forwarder (read in) this))))
+ (forwarder
+ (with-handlers ([exn:fail? handle-error])
+ (read in))
+ this))))
(define/public (read-message)
(when (equal? out #f) (ensure-connected))
@@ -730,9 +754,10 @@
(define (add-remote-place rp)
(set! remote-places (append remote-places(list rp))))
(define (spawn-node)
- (set! sp (new spawned-process% [cmdline-list cmdline-list] [parent this])))
+ (and cmdline-list
+ (set! sp (new spawned-process% [cmdline-list cmdline-list] [parent this]))))
(define (setup-socket-connection)
- (set! sc (new socket-connection% [host host-name] [port listen-port]))
+ (set! sc (new socket-connection% [host host-name] [port listen-port] [remote-node this]))
(sconn-write-flush sc (dcgm DCGM-TYPE-SET-OWNER -1 -1 "")))
(define (restart-node)
@@ -774,8 +799,7 @@
(th-place-channel-put pch msg)]
[(async-bi-channel? pch)
(async-bi-channel-put pch msg)]
- [else
- (raise "OOPS\n")])]
+ [else (raise (format "Unexpected channel type ~a" pch))])]
[(dcgm 8 #;(== DCGM-TYPE-LOG-TO-PARENT) _ _ (list severity msg))
(define parent (send this get-router))
@@ -791,6 +815,19 @@
[else (log-debug (format"received message ~a" it))]))
(define/public (get-log-prefix) (format "PLACE ~a:~a" host-name listen-port))
+ (define/public (tcp-connection-died host port)
+ (log-debug (format "TCP connection~a:~a died, restarting node/connection" host-name listen-port))
+ (and sp (send sp kill))
+ (set! sp #f)
+ (cond
+ [cmdline-list (process-died null)]
+ [restart-on-exit
+ (if (equal? restart-on-exit #t)
+ (restart-node)
+ (send restart-on-exit restart restart-node))]
+ [else
+ (log-debug (format "No restart condition for ~a" (get-log-prefix)))]))
(define/public (process-died child)
(log-debug (format "Remote node pid ~a ~a:~a died" (get-sp-pid) host-name listen-port))
(set! sp #f)
@@ -799,8 +836,8 @@
(if (equal? restart-on-exit #t)
- (restart-node)
- (send restart-on-exit restart restart-node))]
+ (restart-node)
+ (send restart-on-exit restart restart-node))]
(log-debug (format "No restart cmdline arguments for ~a" (get-log-prefix)))])]
@@ -1452,14 +1489,19 @@
[(channel? ch) channel-put])
ch msg))
-(define/provide (mr-spawn-remote-node mrch host #:listen-port [listen-port DEFAULT-ROUTER-PORT])
- (*channel-put mrch (dcgm DCGM-CONTROL-NEW-NODE -1 -1 (list host listen-port))))
+(define/provide (mr-spawn-remote-node mrch host #:listen-port [listen-port DEFAULT-ROUTER-PORT]
+ #:solo [solo #f])
+ (*channel-put mrch (dcgm DCGM-CONTROL-NEW-NODE -1 solo (list host listen-port))))
(define/provide (mr-supervise-named-dynamic-place-at mrch dest name path func)
(*channel-put mrch (dcgm DCGM-CONTROL-NEW-PLACE dest -1 (list 'dynamic-place path func name))))
(define/provide (mr-connect-to mrch dest name)
- (define-values (ch1 ch2) (make-async-bi-channel))
+ (define-values (ch1 ch2)
+ (cond
+ [(channel? mrch) (make-async-bi-channel)]
+ [(place-channel? mrch) (place-channel)]
+ [else (raise (format "Unexpected channel type ~a" mrch))]))
(*channel-put mrch (dcgm DCGM-CONTROL-NEW-CONNECTION dest -1 (list name ch2)))
Please sign in to comment.
Something went wrong with that request. Please try again.