-
Notifications
You must be signed in to change notification settings - Fork 62
/
registry.go
150 lines (121 loc) · 3.2 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package p2p
import (
"fmt"
"io"
net "gx/ipfs/QmNa31VPzC561NWwRsJLE7nGYZYuuD2QfpK2b1q9BK54J1/go-libp2p-net"
manet "gx/ipfs/QmX3U3YXCQ6UYBxq2LVWF8dARS1hPUTEYLrSx654Qyxyw6/go-multiaddr-net"
ma "gx/ipfs/QmXY77cVe7rVRQXZZQRioukUM7aRW3BTcAgJe12MCtb3Ji/go-multiaddr"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
)
// ListenerInfo holds information on a p2p listener.
type ListenerInfo struct {
// Application protocol identifier.
Protocol string
// Node identity
Identity peer.ID
// Local protocol stream address.
Address ma.Multiaddr
// Local protocol stream listener.
Closer io.Closer
// Flag indicating whether we're still accepting incoming connections, or
// whether this application listener has been shutdown.
Running bool
Registry *ListenerRegistry
}
// Close closes the listener. Does not affect child streams
func (c *ListenerInfo) Close() error {
c.Closer.Close()
err := c.Registry.Deregister(c.Protocol)
return err
}
// ListenerRegistry is a collection of local application protocol listeners.
type ListenerRegistry struct {
Listeners []*ListenerInfo
}
// Register registers listenerInfo2 in this registry
func (c *ListenerRegistry) Register(listenerInfo *ListenerInfo) {
c.Listeners = append(c.Listeners, listenerInfo)
}
// Deregister removes p2p listener from this registry
func (c *ListenerRegistry) Deregister(proto string) error {
foundAt := -1
for i, a := range c.Listeners {
if a.Protocol == proto {
foundAt = i
break
}
}
if foundAt != -1 {
c.Listeners = append(c.Listeners[:foundAt], c.Listeners[foundAt+1:]...)
return nil
}
return fmt.Errorf("failed to deregister proto %s", proto)
}
// StreamInfo holds information on active incoming and outgoing p2p streams.
type StreamInfo struct {
HandlerID uint64
Protocol string
LocalPeer peer.ID
LocalAddr ma.Multiaddr
RemotePeer peer.ID
RemoteAddr ma.Multiaddr
Local manet.Conn
Remote net.Stream
Registry *StreamRegistry
}
// Close closes stream endpoints and deregisters it
func (s *StreamInfo) Close() error {
s.Local.Close()
s.Remote.Close()
s.Registry.Deregister(s.HandlerID)
return nil
}
// Reset closes stream endpoints and deregisters it
func (s *StreamInfo) Reset() error {
s.Local.Close()
s.Remote.Reset()
s.Registry.Deregister(s.HandlerID)
return nil
}
func (s *StreamInfo) startStreaming() {
go func() {
_, err := io.Copy(s.Local, s.Remote)
if err != nil {
s.Reset()
} else {
s.Close()
}
}()
go func() {
_, err := io.Copy(s.Remote, s.Local)
if err != nil {
s.Reset()
} else {
s.Close()
}
}()
}
// StreamRegistry is a collection of active incoming and outgoing protocol app streams.
type StreamRegistry struct {
Streams []*StreamInfo
nextID uint64
}
// Register registers a stream to the registry
func (c *StreamRegistry) Register(streamInfo *StreamInfo) {
streamInfo.HandlerID = c.nextID
c.Streams = append(c.Streams, streamInfo)
c.nextID++
}
// Deregister deregisters stream from the registry
func (c *StreamRegistry) Deregister(handlerID uint64) {
foundAt := -1
for i, s := range c.Streams {
if s.HandlerID == handlerID {
foundAt = i
break
}
}
if foundAt != -1 {
c.Streams = append(c.Streams[:foundAt], c.Streams[foundAt+1:]...)
}
}