Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
693 lines (607 sloc) 19.7 KB
/*
memberlist is a library that manages cluster
membership and member failure detection using a gossip based protocol.
The use cases for such a library are far-reaching: all distributed systems
require membership, and memberlist is a re-usable solution to managing
cluster membership and node failure detection.
memberlist is eventually consistent but converges quickly on average.
The speed at which it converges can be heavily tuned via various knobs
on the protocol. Node failures are detected and network partitions are partially
tolerated by attempting to communicate to potentially dead nodes through
multiple routes.
*/
package memberlist
import (
"container/list"
"fmt"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
multierror "github.com/hashicorp/go-multierror"
sockaddr "github.com/hashicorp/go-sockaddr"
"github.com/miekg/dns"
)
type Memberlist struct {
sequenceNum uint32 // Local sequence number
incarnation uint32 // Local incarnation number
numNodes uint32 // Number of known nodes (estimate)
pushPullReq uint32 // Number of push/pull requests
config *Config
shutdown int32 // Used as an atomic boolean value
shutdownCh chan struct{}
leave int32 // Used as an atomic boolean value
leaveBroadcast chan struct{}
shutdownLock sync.Mutex // Serializes calls to Shutdown
leaveLock sync.Mutex // Serializes calls to Leave
transport Transport
handoffCh chan struct{}
highPriorityMsgQueue *list.List
lowPriorityMsgQueue *list.List
msgQueueLock sync.Mutex
nodeLock sync.RWMutex
nodes []*nodeState // Known nodes
nodeMap map[string]*nodeState // Maps Addr.String() -> NodeState
nodeTimers map[string]*suspicion // Maps Addr.String() -> suspicion timer
awareness *awareness
tickerLock sync.Mutex
tickers []*time.Ticker
stopTick chan struct{}
probeIndex int
ackLock sync.Mutex
ackHandlers map[uint32]*ackHandler
broadcasts *TransmitLimitedQueue
logger *log.Logger
}
// BuildVsnArray creates the array of Vsn
func (conf *Config) BuildVsnArray() []uint8 {
return []uint8{
ProtocolVersionMin, ProtocolVersionMax, conf.ProtocolVersion,
conf.DelegateProtocolMin, conf.DelegateProtocolMax,
conf.DelegateProtocolVersion,
}
}
// newMemberlist creates the network listeners.
// Does not schedule execution of background maintenance.
func newMemberlist(conf *Config) (*Memberlist, error) {
if conf.ProtocolVersion < ProtocolVersionMin {
return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
} else if conf.ProtocolVersion > ProtocolVersionMax {
return nil, fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]",
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
}
if len(conf.SecretKey) > 0 {
if conf.Keyring == nil {
keyring, err := NewKeyring(nil, conf.SecretKey)
if err != nil {
return nil, err
}
conf.Keyring = keyring
} else {
if err := conf.Keyring.AddKey(conf.SecretKey); err != nil {
return nil, err
}
if err := conf.Keyring.UseKey(conf.SecretKey); err != nil {
return nil, err
}
}
}
if conf.LogOutput != nil && conf.Logger != nil {
return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.")
}
logDest := conf.LogOutput
if logDest == nil {
logDest = os.Stderr
}
logger := conf.Logger
if logger == nil {
logger = log.New(logDest, "", log.LstdFlags)
}
// Set up a network transport by default if a custom one wasn't given
// by the config.
transport := conf.Transport
if transport == nil {
nc := &NetTransportConfig{
BindAddrs: []string{conf.BindAddr},
BindPort: conf.BindPort,
Logger: logger,
}
// See comment below for details about the retry in here.
makeNetRetry := func(limit int) (*NetTransport, error) {
var err error
for try := 0; try < limit; try++ {
var nt *NetTransport
if nt, err = NewNetTransport(nc); err == nil {
return nt, nil
}
if strings.Contains(err.Error(), "address already in use") {
logger.Printf("[DEBUG] memberlist: Got bind error: %v", err)
continue
}
}
return nil, fmt.Errorf("failed to obtain an address: %v", err)
}
// The dynamic bind port operation is inherently racy because
// even though we are using the kernel to find a port for us, we
// are attempting to bind multiple protocols (and potentially
// multiple addresses) with the same port number. We build in a
// few retries here since this often gets transient errors in
// busy unit tests.
limit := 1
if conf.BindPort == 0 {
limit = 10
}
nt, err := makeNetRetry(limit)
if err != nil {
return nil, fmt.Errorf("Could not set up network transport: %v", err)
}
if conf.BindPort == 0 {
port := nt.GetAutoBindPort()
conf.BindPort = port
conf.AdvertisePort = port
logger.Printf("[DEBUG] memberlist: Using dynamic bind port %d", port)
}
transport = nt
}
m := &Memberlist{
config: conf,
shutdownCh: make(chan struct{}),
leaveBroadcast: make(chan struct{}, 1),
transport: transport,
handoffCh: make(chan struct{}, 1),
highPriorityMsgQueue: list.New(),
lowPriorityMsgQueue: list.New(),
nodeMap: make(map[string]*nodeState),
nodeTimers: make(map[string]*suspicion),
awareness: newAwareness(conf.AwarenessMaxMultiplier),
ackHandlers: make(map[uint32]*ackHandler),
broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
logger: logger,
}
m.broadcasts.NumNodes = func() int {
return m.estNumNodes()
}
go m.streamListen()
go m.packetListen()
go m.packetHandler()
return m, nil
}
// Create will create a new Memberlist using the given configuration.
// This will not connect to any other node (see Join) yet, but will start
// all the listeners to allow other nodes to join this memberlist.
// After creating a Memberlist, the configuration given should not be
// modified by the user anymore.
func Create(conf *Config) (*Memberlist, error) {
m, err := newMemberlist(conf)
if err != nil {
return nil, err
}
if err := m.setAlive(); err != nil {
m.Shutdown()
return nil, err
}
m.schedule()
return m, nil
}
// Join is used to take an existing Memberlist and attempt to join a cluster
// by contacting all the given hosts and performing a state sync. Initially,
// the Memberlist only contains our own state, so doing this will cause
// remote nodes to become aware of the existence of this node, effectively
// joining the cluster.
//
// This returns the number of hosts successfully contacted and an error if
// none could be reached. If an error is returned, the node did not successfully
// join the cluster.
func (m *Memberlist) Join(existing []string) (int, error) {
numSuccess := 0
var errs error
for _, exist := range existing {
addrs, err := m.resolveAddr(exist)
if err != nil {
err = fmt.Errorf("Failed to resolve %s: %v", exist, err)
errs = multierror.Append(errs, err)
m.logger.Printf("[WARN] memberlist: %v", err)
continue
}
for _, addr := range addrs {
hp := joinHostPort(addr.ip.String(), addr.port)
if err := m.pushPullNode(hp, true); err != nil {
err = fmt.Errorf("Failed to join %s: %v", addr.ip, err)
errs = multierror.Append(errs, err)
m.logger.Printf("[DEBUG] memberlist: %v", err)
continue
}
numSuccess++
}
}
if numSuccess > 0 {
errs = nil
}
return numSuccess, errs
}
// ipPort holds information about a node we want to try to join.
type ipPort struct {
ip net.IP
port uint16
}
// tcpLookupIP is a helper to initiate a TCP-based DNS lookup for the given host.
// The built-in Go resolver will do a UDP lookup first, and will only use TCP if
// the response has the truncate bit set, which isn't common on DNS servers like
// Consul's. By doing the TCP lookup directly, we get the best chance for the
// largest list of hosts to join. Since joins are relatively rare events, it's ok
// to do this rather expensive operation.
func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, error) {
// Don't attempt any TCP lookups against non-fully qualified domain
// names, since those will likely come from the resolv.conf file.
if !strings.Contains(host, ".") {
return nil, nil
}
// Make sure the domain name is terminated with a dot (we know there's
// at least one character at this point).
dn := host
if dn[len(dn)-1] != '.' {
dn = dn + "."
}
// See if we can find a server to try.
cc, err := dns.ClientConfigFromFile(m.config.DNSConfigPath)
if err != nil {
return nil, err
}
if len(cc.Servers) > 0 {
// We support host:port in the DNS config, but need to add the
// default port if one is not supplied.
server := cc.Servers[0]
if !hasPort(server) {
server = net.JoinHostPort(server, cc.Port)
}
// Do the lookup.
c := new(dns.Client)
c.Net = "tcp"
msg := new(dns.Msg)
msg.SetQuestion(dn, dns.TypeANY)
in, _, err := c.Exchange(msg, server)
if err != nil {
return nil, err
}
// Handle any IPs we get back that we can attempt to join.
var ips []ipPort
for _, r := range in.Answer {
switch rr := r.(type) {
case (*dns.A):
ips = append(ips, ipPort{rr.A, defaultPort})
case (*dns.AAAA):
ips = append(ips, ipPort{rr.AAAA, defaultPort})
case (*dns.CNAME):
m.logger.Printf("[DEBUG] memberlist: Ignoring CNAME RR in TCP-first answer for '%s'", host)
}
}
return ips, nil
}
return nil, nil
}
// resolveAddr is used to resolve the address into an address,
// port, and error. If no port is given, use the default
func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
// This captures the supplied port, or the default one.
hostStr = ensurePort(hostStr, m.config.BindPort)
host, sport, err := net.SplitHostPort(hostStr)
if err != nil {
return nil, err
}
lport, err := strconv.ParseUint(sport, 10, 16)
if err != nil {
return nil, err
}
port := uint16(lport)
// If it looks like an IP address we are done. The SplitHostPort() above
// will make sure the host part is in good shape for parsing, even for
// IPv6 addresses.
if ip := net.ParseIP(host); ip != nil {
return []ipPort{ipPort{ip, port}}, nil
}
// First try TCP so we have the best chance for the largest list of
// hosts to join. If this fails it's not fatal since this isn't a standard
// way to query DNS, and we have a fallback below.
ips, err := m.tcpLookupIP(host, port)
if err != nil {
m.logger.Printf("[DEBUG] memberlist: TCP-first lookup failed for '%s', falling back to UDP: %s", hostStr, err)
}
if len(ips) > 0 {
return ips, nil
}
// If TCP didn't yield anything then use the normal Go resolver which
// will try UDP, then might possibly try TCP again if the UDP response
// indicates it was truncated.
ans, err := net.LookupIP(host)
if err != nil {
return nil, err
}
ips = make([]ipPort, 0, len(ans))
for _, ip := range ans {
ips = append(ips, ipPort{ip, port})
}
return ips, nil
}
// setAlive is used to mark this node as being alive. This is the same
// as if we received an alive notification our own network channel for
// ourself.
func (m *Memberlist) setAlive() error {
// Get the final advertise address from the transport, which may need
// to see which address we bound to.
addr, port, err := m.transport.FinalAdvertiseAddr(
m.config.AdvertiseAddr, m.config.AdvertisePort)
if err != nil {
return fmt.Errorf("Failed to get final advertise address: %v", err)
}
// Check if this is a public address without encryption
ipAddr, err := sockaddr.NewIPAddr(addr.String())
if err != nil {
return fmt.Errorf("Failed to parse interface addresses: %v", err)
}
ifAddrs := []sockaddr.IfAddr{
sockaddr.IfAddr{
SockAddr: ipAddr,
},
}
_, publicIfs, err := sockaddr.IfByRFC("6890", ifAddrs)
if len(publicIfs) > 0 && !m.config.EncryptionEnabled() {
m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!")
}
// Set any metadata from the delegate.
var meta []byte
if m.config.Delegate != nil {
meta = m.config.Delegate.NodeMeta(MetaMaxSize)
if len(meta) > MetaMaxSize {
panic("Node meta data provided is longer than the limit")
}
}
a := alive{
Incarnation: m.nextIncarnation(),
Node: m.config.Name,
Addr: addr,
Port: uint16(port),
Meta: meta,
Vsn: m.config.BuildVsnArray(),
}
m.aliveNode(&a, nil, true)
return nil
}
// LocalNode is used to return the local Node
func (m *Memberlist) LocalNode() *Node {
m.nodeLock.RLock()
defer m.nodeLock.RUnlock()
state := m.nodeMap[m.config.Name]
return &state.Node
}
// UpdateNode is used to trigger re-advertising the local node. This is
// primarily used with a Delegate to support dynamic updates to the local
// meta data. This will block until the update message is successfully
// broadcasted to a member of the cluster, if any exist or until a specified
// timeout is reached.
func (m *Memberlist) UpdateNode(timeout time.Duration) error {
// Get the node meta data
var meta []byte
if m.config.Delegate != nil {
meta = m.config.Delegate.NodeMeta(MetaMaxSize)
if len(meta) > MetaMaxSize {
panic("Node meta data provided is longer than the limit")
}
}
// Get the existing node
m.nodeLock.RLock()
state := m.nodeMap[m.config.Name]
m.nodeLock.RUnlock()
// Format a new alive message
a := alive{
Incarnation: m.nextIncarnation(),
Node: m.config.Name,
Addr: state.Addr,
Port: state.Port,
Meta: meta,
Vsn: m.config.BuildVsnArray(),
}
notifyCh := make(chan struct{})
m.aliveNode(&a, notifyCh, true)
// Wait for the broadcast or a timeout
if m.anyAlive() {
var timeoutCh <-chan time.Time
if timeout > 0 {
timeoutCh = time.After(timeout)
}
select {
case <-notifyCh:
case <-timeoutCh:
return fmt.Errorf("timeout waiting for update broadcast")
}
}
return nil
}
// SendTo is deprecated in favor of SendBestEffort, which requires a node to
// target.
func (m *Memberlist) SendTo(to net.Addr, msg []byte) error {
// Encode as a user message
buf := make([]byte, 1, len(msg)+1)
buf[0] = byte(userMsg)
buf = append(buf, msg...)
// Send the message
return m.rawSendMsgPacket(to.String(), nil, buf)
}
// SendToUDP is deprecated in favor of SendBestEffort.
func (m *Memberlist) SendToUDP(to *Node, msg []byte) error {
return m.SendBestEffort(to, msg)
}
// SendToTCP is deprecated in favor of SendReliable.
func (m *Memberlist) SendToTCP(to *Node, msg []byte) error {
return m.SendReliable(to, msg)
}
// SendBestEffort uses the unreliable packet-oriented interface of the transport
// to target a user message at the given node (this does not use the gossip
// mechanism). The maximum size of the message depends on the configured
// UDPBufferSize for this memberlist instance.
func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error {
// Encode as a user message
buf := make([]byte, 1, len(msg)+1)
buf[0] = byte(userMsg)
buf = append(buf, msg...)
// Send the message
return m.rawSendMsgPacket(to.Address(), to, buf)
}
// SendReliable uses the reliable stream-oriented interface of the transport to
// target a user message at the given node (this does not use the gossip
// mechanism). Delivery is guaranteed if no error is returned, and there is no
// limit on the size of the message.
func (m *Memberlist) SendReliable(to *Node, msg []byte) error {
return m.sendUserMsg(to.Address(), msg)
}
// Members returns a list of all known live nodes. The node structures
// returned must not be modified. If you wish to modify a Node, make a
// copy first.
func (m *Memberlist) Members() []*Node {
m.nodeLock.RLock()
defer m.nodeLock.RUnlock()
nodes := make([]*Node, 0, len(m.nodes))
for _, n := range m.nodes {
if n.State != stateDead {
nodes = append(nodes, &n.Node)
}
}
return nodes
}
// NumMembers returns the number of alive nodes currently known. Between
// the time of calling this and calling Members, the number of alive nodes
// may have changed, so this shouldn't be used to determine how many
// members will be returned by Members.
func (m *Memberlist) NumMembers() (alive int) {
m.nodeLock.RLock()
defer m.nodeLock.RUnlock()
for _, n := range m.nodes {
if n.State != stateDead {
alive++
}
}
return
}
// Leave will broadcast a leave message but will not shutdown the background
// listeners, meaning the node will continue participating in gossip and state
// updates.
//
// This will block until the leave message is successfully broadcasted to
// a member of the cluster, if any exist or until a specified timeout
// is reached.
//
// This method is safe to call multiple times, but must not be called
// after the cluster is already shut down.
func (m *Memberlist) Leave(timeout time.Duration) error {
m.leaveLock.Lock()
defer m.leaveLock.Unlock()
if m.hasShutdown() {
panic("leave after shutdown")
}
if !m.hasLeft() {
atomic.StoreInt32(&m.leave, 1)
m.nodeLock.Lock()
state, ok := m.nodeMap[m.config.Name]
m.nodeLock.Unlock()
if !ok {
m.logger.Printf("[WARN] memberlist: Leave but we're not in the node map.")
return nil
}
d := dead{
Incarnation: state.Incarnation,
Node: state.Name,
}
m.deadNode(&d)
// Block until the broadcast goes out
if m.anyAlive() {
var timeoutCh <-chan time.Time
if timeout > 0 {
timeoutCh = time.After(timeout)
}
select {
case <-m.leaveBroadcast:
case <-timeoutCh:
return fmt.Errorf("timeout waiting for leave broadcast")
}
}
}
return nil
}
// Check for any other alive node.
func (m *Memberlist) anyAlive() bool {
m.nodeLock.RLock()
defer m.nodeLock.RUnlock()
for _, n := range m.nodes {
if n.State != stateDead && n.Name != m.config.Name {
return true
}
}
return false
}
// GetHealthScore gives this instance's idea of how well it is meeting the soft
// real-time requirements of the protocol. Lower numbers are better, and zero
// means "totally healthy".
func (m *Memberlist) GetHealthScore() int {
return m.awareness.GetHealthScore()
}
// ProtocolVersion returns the protocol version currently in use by
// this memberlist.
func (m *Memberlist) ProtocolVersion() uint8 {
// NOTE: This method exists so that in the future we can control
// any locking if necessary, if we change the protocol version at
// runtime, etc.
return m.config.ProtocolVersion
}
// Shutdown will stop any background maintanence of network activity
// for this memberlist, causing it to appear "dead". A leave message
// will not be broadcasted prior, so the cluster being left will have
// to detect this node's shutdown using probing. If you wish to more
// gracefully exit the cluster, call Leave prior to shutting down.
//
// This method is safe to call multiple times.
func (m *Memberlist) Shutdown() error {
m.shutdownLock.Lock()
defer m.shutdownLock.Unlock()
if m.hasShutdown() {
return nil
}
// Shut down the transport first, which should block until it's
// completely torn down. If we kill the memberlist-side handlers
// those I/O handlers might get stuck.
if err := m.transport.Shutdown(); err != nil {
m.logger.Printf("[ERR] Failed to shutdown transport: %v", err)
}
// Now tear down everything else.
atomic.StoreInt32(&m.shutdown, 1)
close(m.shutdownCh)
m.deschedule()
return nil
}
func (m *Memberlist) hasShutdown() bool {
return atomic.LoadInt32(&m.shutdown) == 1
}
func (m *Memberlist) hasLeft() bool {
return atomic.LoadInt32(&m.leave) == 1
}
func (m *Memberlist) getNodeState(addr string) nodeStateType {
m.nodeLock.RLock()
defer m.nodeLock.RUnlock()
n := m.nodeMap[addr]
return n.State
}
func (m *Memberlist) getNodeStateChange(addr string) time.Time {
m.nodeLock.RLock()
defer m.nodeLock.RUnlock()
n := m.nodeMap[addr]
return n.StateChange
}
func (m *Memberlist) changeNode(addr string, f func(*nodeState)) {
m.nodeLock.Lock()
defer m.nodeLock.Unlock()
n := m.nodeMap[addr]
f(n)
}
You can’t perform that action at this time.