forked from gammazero/nexus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
peer.go
64 lines (54 loc) · 1.53 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package wamp
import (
"context"
"errors"
"time"
)
// Peer is the interface implemented by endpoints communicating via WAMP.
type Peer interface {
// Sends the message to the peer.
Send(Message) error
// SendCtx sends the message to the peer, and uses a context to cancel or
// timeout sending the message when blocked waiting to write to the peer.
SendCtx(context.Context, Message) error
// TrySend performs a non-blocking send. Returns error if blocked.
TrySend(Message) error
// Closes the peer connection and the channel returned from Recv().
Close()
// Recv returns a channel of messages from the peer.
Recv() <-chan Message
// IsLocal returns true if the session is local.
IsLocal() bool
}
// RecvTimeout receives a message from a peer within the specified time.
func RecvTimeout(p Peer, t time.Duration) (Message, error) {
select {
case msg, open := <-p.Recv():
if !open {
return nil, errors.New("receive channel closed")
}
return msg, nil
case <-time.After(t):
return nil, errors.New("timeout waiting for message")
}
}
// SendCtx sends a message to the write-only channel, using a context to cancel
// sending if blocked.
func SendCtx(ctx context.Context, wr chan<- Message, msg Message) error {
select {
case <-ctx.Done():
return ctx.Err()
case wr <- msg:
}
return nil
}
// TrySend sends a message to the write-only channel and returns an error if
// the channel blocks.
func TrySend(wr chan<- Message, msg Message) error {
select {
case wr <- msg:
default:
return errors.New("blocked")
}
return nil
}