-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
socket.go
131 lines (113 loc) · 2.31 KB
/
socket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Package socket provides a pseudo socket
package socket
import (
"io"
"github.com/focalsolution/micro-go-micro/transport"
)
// Socket is our pseudo socket for transport.Socket
type Socket struct {
id string
// closed
closed chan bool
// remote addr
remote string
// local addr
local string
// send chan
send chan *transport.Message
// recv chan
recv chan *transport.Message
}
func (s *Socket) SetLocal(l string) {
s.local = l
}
func (s *Socket) SetRemote(r string) {
s.remote = r
}
// Accept passes a message to the socket which will be processed by the call to Recv
func (s *Socket) Accept(m *transport.Message) error {
select {
case s.recv <- m:
return nil
case <-s.closed:
return io.EOF
}
}
// Process takes the next message off the send queue created by a call to Send
func (s *Socket) Process(m *transport.Message) error {
select {
case msg := <-s.send:
*m = *msg
case <-s.closed:
// see if we need to drain
select {
case msg := <-s.send:
*m = *msg
return nil
default:
return io.EOF
}
}
return nil
}
func (s *Socket) Remote() string {
return s.remote
}
func (s *Socket) Local() string {
return s.local
}
func (s *Socket) Send(m *transport.Message) error {
// make copy
msg := &transport.Message{
Header: make(map[string]string),
Body: make([]byte, len(m.Body)),
}
// copy headers
for k, v := range m.Header {
msg.Header[k] = v
}
// copy body
copy(msg.Body, m.Body)
// send a message
select {
case s.send <- msg:
case <-s.closed:
return io.EOF
}
return nil
}
func (s *Socket) Recv(m *transport.Message) error {
// receive a message
select {
case msg := <-s.recv:
// set message
*m = *msg
case <-s.closed:
return io.EOF
}
// return nil
return nil
}
// Close closes the socket
func (s *Socket) Close() error {
select {
case <-s.closed:
// no op
default:
close(s.closed)
}
return nil
}
// New returns a new pseudo socket which can be used in the place of a transport socket.
// Messages are sent to the socket via Accept and receives from the socket via Process.
// SetLocal/SetRemote should be called before using the socket.
func New(id string) *Socket {
return &Socket{
id: id,
closed: make(chan bool),
local: "local",
remote: "remote",
send: make(chan *transport.Message, 128),
recv: make(chan *transport.Message, 128),
}
}