forked from cockroachdb/cockroach
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
153 lines (140 loc) · 5.01 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Spencer Kimball (spencer.kimball@gmail.com)
package gossip
import (
"bytes"
"encoding/gob"
"math/rand"
"net"
"sync"
"time"
"github.com/cockroachdb/cockroach/rpc"
"github.com/cockroachdb/cockroach/util"
"github.com/golang/glog"
)
// server maintains an array of connected peers to which it gossips
// newly arrived information on a periodic basis.
type server struct {
interval time.Duration // Interval at which to gossip fresh info
mu sync.Mutex // Mutex protects is (infostore) & incoming
ready *sync.Cond // Broadcasts wakeup to waiting gossip requests
is *infoStore // The backing infostore
closed bool // True if server was closed
incoming *addrSet // Incoming client addresses
clientAddrMap map[string]net.Addr // Incoming client's local address -> client's server address
}
// newServer creates and returns a server struct.
func newServer(interval time.Duration) *server {
s := &server{
is: newInfoStore(nil),
interval: interval,
incoming: newAddrSet(MaxPeers),
clientAddrMap: make(map[string]net.Addr),
}
s.ready = sync.NewCond(&s.mu)
return s
}
// Gossip receives gossipped information from a peer node.
// The received delta is combined with the infostore, and this
// node's own gossip is returned to requesting client.
func (s *server) Gossip(args *GossipRequest, reply *GossipResponse) error {
s.mu.Lock()
defer s.mu.Unlock()
// If there is no more capacity to accept incoming clients, return
// a random already-being-serviced incoming client as an alternate.
if !s.incoming.hasAddr(args.Addr) {
if !s.incoming.hasSpace() {
reply.Alternate = s.incoming.selectRandom()
return nil
}
s.incoming.addAddr(args.Addr)
// This lookup map allows the incoming client to be removed from
// the incoming addr set when its connection is closed. See
// server.serveConn() below.
s.clientAddrMap[args.LAddr.String()] = args.Addr
}
// Update infostore with gossipped infos.
if args.Delta != nil {
glog.V(1).Infof("received delta infostore from client %s: %s", args.Addr, args.Delta)
s.is.combine(args.Delta)
}
// If requested max sequence is not -1, wait for gossip interval to expire.
if args.MaxSeq != -1 {
s.ready.Wait()
}
// The exit condition for waiting clients.
if s.closed {
return util.Errorf("gossip server shutdown")
}
// Return reciprocal delta.
delta := s.is.delta(args.Addr, args.MaxSeq)
if delta != nil {
// If V(1), double check that we can gob-encode the infostore.
// Problems here seem to very confusingly disappear into the RPC internals.
if glog.V(1) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(delta); err != nil {
glog.Fatalf("infostore could not be encoded: %v", err)
}
}
reply.Delta = delta
glog.Infof("gossip: client %s sent %d info(s)", args.Addr, delta.infoCount())
}
return nil
}
// jitteredGossipInterval returns a randomly jittered duration from
// interval [0.75 * gossipInterval, 1.25 * gossipInterval).
func (s *server) jitteredGossipInterval() time.Duration {
return time.Duration(float64(s.interval) * (0.75 + 0.5*rand.Float64()))
}
// start initializes the infostore with the rpc server address and
// then begins processing connecting clients in an infinite select
// loop via goroutine. Periodically, clients connected and awaiting
// the next round of gossip are awoken via the conditional variable.
func (s *server) start(rpcServer *rpc.Server) {
s.is.NodeAddr = rpcServer.Addr()
rpcServer.RegisterName("Gossip", s)
rpcServer.AddCloseCallback(s.onClose)
go func() {
// Periodically wakeup blocked client gossip requests.
gossipTimeout := time.Tick(s.jitteredGossipInterval())
for {
select {
case <-gossipTimeout:
// Wakeup all blocked gossip requests.
s.ready.Broadcast()
}
}
}()
}
// stop sets the server's closed bool to true and broadcasts to
// waiting gossip clients to wakeup and finish.
func (s *server) stop() {
s.mu.Lock()
defer s.mu.Unlock()
s.closed = true
s.ready.Broadcast() // wake up clients
}
// onClose is invoked by the rpcServer each time a connected client
// is closed. Remove the client from the incoming address set.
func (s *server) onClose(conn net.Conn) {
s.mu.Lock()
defer s.mu.Unlock()
if clientAddr, ok := s.clientAddrMap[conn.RemoteAddr().String()]; ok {
s.incoming.removeAddr(clientAddr)
}
}