forked from gammazero/nexus
/
localpeer.go
70 lines (57 loc) · 2.01 KB
/
localpeer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package transport
import (
"errors"
"github.com/gammazero/nexus/wamp"
)
const linkedPeersOutQueueSize = 16
// LinkedPeers creates two connected peers. Messages sent to one peer appear
// in the Recv of the other. This is used for connecting client sessions to
// the router.
func LinkedPeers() (wamp.Peer, wamp.Peer) {
// The channel used for the router to send messages to the client should be
// large enough to prevent blocking while waiting for a slow client, as a
// client may block on I/O. If the client does block, then the message
// should be dropped.
rToC := make(chan wamp.Message, linkedPeersOutQueueSize)
// The router will read from this channen and immediately dispatch the
// message to the broker or dealer. Therefore this channel can be
// unbuffered.
cToR := make(chan wamp.Message)
// router reads from and writes to client
r := &localPeer{rd: cToR, wr: rToC}
// client reads from and writes to router
c := &localPeer{rd: rToC, wr: cToR}
return c, r
}
// IsLocal returns true is the wamp.Peer is a localPeer. These do not need
// authentication since they are part of the same process.
func IsLocal(p wamp.Peer) bool {
_, ok := p.(*localPeer)
return ok
}
// localPeer implements Peer
type localPeer struct {
rd <-chan wamp.Message
wr chan<- wamp.Message
}
// Recv returns the channel this peer reads incoming messages from.
func (p *localPeer) Recv() <-chan wamp.Message { return p.rd }
// TrySend writes a message to the peer's outbound message channel.
func (p *localPeer) TrySend(msg wamp.Message) error {
select {
case p.wr <- msg:
default:
return errors.New("blocked")
}
return nil
}
// Send writes a message to the peer's outbound message channel.
// Typically called by clients, since it is OK for the router to block a client
// since this will not block other clients.
func (p *localPeer) Send(msg wamp.Message) error {
p.wr <- msg
return nil
}
// Close closes the outgoing channel, waking any readers waiting on data from
// this peer.
func (p *localPeer) Close() { close(p.wr) }