/
gossip.go
154 lines (124 loc) · 3.65 KB
/
gossip.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package gossip
import (
"net"
"strconv"
"github.com/hexablock/log"
"github.com/hexablock/vivaldi"
"github.com/euforia/gossip/transport"
"github.com/hashicorp/memberlist"
)
// Gossip is the top-level gossip struct to manage multiple pools
type Gossip struct {
// Unique node name
name string
// Adv address for state exchange
advAddr string
// Adv port for state exchange
advPort int
// Vivaldi coordinate client
coord *vivaldi.Client
// All gossip pools
pools map[int32]*Pool
// Core network transport
trans *transport.NetTransport
// Shared logger
log *log.Logger
// Debug to turn on underlying loggers
debug bool
}
// New returns a new Gossip instance based on the given config
func New(conf *Config) (*Gossip, error) {
conf.Validate()
g := &Gossip{
name: conf.Name,
advAddr: conf.AdvertiseAddr,
advPort: conf.AdvertisePort,
pools: make(map[int32]*Pool, 1),
log: conf.Logger,
debug: conf.Debug,
}
// TODO: Change to use bind address
transConf := &transport.Config{
BindAddrs: []string{conf.BindAddr},
BindPort: conf.BindPort,
}
var err error
g.trans, err = transport.NewNetTransport(transConf)
if err != nil {
return nil, err
}
g.coord, err = vivaldi.NewClient(conf.Coordinate)
if err != nil {
return nil, err
}
return g, nil
}
// GetPool returns a gossip pool by the given id. It returns nil if the
// pool id does not exist
func (g *Gossip) GetPool(id int32) *Pool {
p, _ := g.pools[id]
return p
}
// ListPools returns a slice of int32 pool ids
func (g *Gossip) ListPools() []int32 {
out := make([]int32, 0, len(g.pools))
for k := range g.pools {
out = append(out, k)
}
return out
}
// RegisterPool creates a new gossip pool with the given config. All pools must be
// registered before gossip is started as the addition of pools is not thread-safe
func (g *Gossip) RegisterPool(pconf *PoolConfig) *Pool {
pconf.Vivaldi = g.coord
pconf.Memberlist.Transport = g.trans.RegisterPool(uint8(pconf.ID))
pconf.Memberlist.Name = g.name
pconf.Memberlist.BindAddr = g.advAddr
pconf.Memberlist.BindPort = g.advPort
pconf.Memberlist.AdvertiseAddr = g.advAddr
pconf.Memberlist.AdvertisePort = g.advPort
pconf.Logger = g.log
pconf.Debug = g.debug
p := NewPool(pconf)
g.pools[pconf.ID] = p
return p
}
// Start starts the underlying transport and all registered gossip pools
// This should be called only after all pools have been registered
func (g *Gossip) Start() (err error) {
g.trans.Start()
for _, p := range g.pools {
if err = p.Start(); err != nil {
break
}
}
return
}
func (g *Gossip) host() string {
return g.advAddr + ":" + strconv.Itoa(g.advPort)
}
// ListenTCP returns a TCP Listener interface for native non-muxed protocols. This
// does not actually start listening but rather returns a Listener interface backed
// by a channel of incoming connections
func (g *Gossip) ListenTCP() net.Listener {
ch := g.trans.TCPCh()
ln, _ := transport.ListenTCP(g.host(), ch)
return ln
}
// Listen returns a new muxed listener by the given id. The dialer must send the
// same id at the time of connection. It returns an error if the id is taken. All
// listeners must be registered before gossip is actually started as the addition
// of listeners is not thread-safe
func (g *Gossip) Listen(id uint16) (net.Listener, error) {
ch, err := g.trans.RegisterListener(id)
if err == nil {
return transport.ListenTCP(g.host(), ch)
}
return nil, err
}
// UDPPackets returns a read-only channel incoming non-gossip udp packets. This
// is useful for custom application transport allowing network communication
// on a single port
func (g *Gossip) UDPPackets() <-chan *memberlist.Packet {
return g.trans.UDPCh()
}