-
Notifications
You must be signed in to change notification settings - Fork 1
/
cluster.go
130 lines (101 loc) · 2.91 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package gostore
import (
"fmt"
"github.com/hashicorp/memberlist"
"github.com/sirupsen/logrus"
"log"
"math/rand"
"net"
"os"
"strconv"
"time"
)
type Node interface {
Address() string
SameAs(other Node) bool
}
type NodeRef struct {
host string
port uint16
}
type memberlistDelegate struct {
router *Router
}
type Cluster struct {
logger *logrus.Logger
memberList *memberlist.Memberlist
router Router
}
func (node NodeRef) Address() string {
return net.JoinHostPort(node.host, strconv.Itoa(int(node.port)))
}
func (node NodeRef) SameAs(other Node) bool {
return node.Address() == other.Address()
}
func (node NodeRef) String() string {
return node.Address()
}
func (cluster *Cluster) createMemberList(port int) {
hostNumber := rand.New(rand.NewSource(time.Now().UnixNano())).Int()
hostName, _ := os.Hostname()
delegate := &memberlistDelegate{router: &cluster.router}
config := memberlist.DefaultLocalConfig()
config.Name = fmt.Sprintf("%s-%X", hostName, hostNumber)
config.BindPort = port
config.AdvertisePort = port
config.Logger = log.New(cluster.logger.Writer(), "", 0)
config.Events = delegate
list, err := memberlist.Create(config)
if err != nil {
cluster.logger.Fatalf("Failed to create memberlist: %s", err)
}
cluster.memberList = list
}
// NotifyJoin is invoked when a node is detected to have joined.
// The Node argument must not be modified.
func (delegate *memberlistDelegate) NotifyJoin(node *memberlist.Node) {
delegate.router.AddNode(NodeRef{host: node.Addr.String(), port: node.Port - 1})
}
// NotifyLeave is invoked when a node is detected to have left.
// The Node argument must not be modified.
func (delegate *memberlistDelegate) NotifyLeave(node *memberlist.Node) {
delegate.router.RemoveNode(NodeRef{host: node.Addr.String(), port: node.Port - 1})
}
// NotifyUpdate is invoked when a node is detected to have
// updated, usually involving the meta data. The Node argument
// must not be modified.
func (delegate *memberlistDelegate) NotifyUpdate(node *memberlist.Node) {
// nothing to do
}
func (cluster Cluster) LocalNode() Node {
local := cluster.memberList.LocalNode()
return NodeRef{host: local.Addr.String(), port: local.Port - 1}
}
func (cluster Cluster) Members() []Node {
var nodes []Node
for _, member := range cluster.memberList.Members() {
nodes = append(nodes, NodeRef{host: member.Addr.String(), port: member.Port - 1})
}
return nodes
}
func (cluster Cluster) ResponsibleNode(key string) Node {
return cluster.router.ResponsibleNode(key)
}
func (cluster *Cluster) Join(member string) error {
_, err := cluster.memberList.Join([]string{member})
return err
}
func (cluster *Cluster) Shutdown() error {
if cluster.memberList == nil {
return nil
}
return cluster.memberList.Shutdown()
}
func NewCluster(logger *logrus.Logger, port int) *Cluster {
cluster := &Cluster{
logger: logger,
router: NewRouter(),
}
cluster.createMemberList(port)
return cluster
}