-
Notifications
You must be signed in to change notification settings - Fork 0
/
mutigroup.go
177 lines (172 loc) · 5.17 KB
/
mutigroup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package dragonboat
import (
"bufio"
"context"
"flag"
"fmt"
"os"
"os/signal"
"path/filepath"
"runtime"
"strings"
"syscall"
"time"
"github.com/lni/dragonboat/v4"
"github.com/lni/dragonboat/v4/config"
"github.com/lni/dragonboat/v4/logger"
"github.com/lni/goutils/syncutil"
)
const (
// we use two raft groups in this example, they are identified by the cluster
// ID values below
shardID1 uint64 = 100
shardID2 uint64 = 101
)
var (
// initial nodes count is three, their addresses are also fixed
// this is for simplicity
addresses = []string{
"localhost:63001",
"localhost:63002",
"localhost:63003",
}
)
func main() {
replicaID := flag.Int("nodeid", 1, "ReplicaID to use")
flag.Parse()
if *replicaID > 3 || *replicaID < 1 {
fmt.Fprintf(os.Stderr, "invalid nodeid %d, it must be 1, 2 or 3", *replicaID)
os.Exit(1)
}
// https://github.com/golang/go/issues/17393
if runtime.GOOS == "darwin" {
signal.Ignore(syscall.Signal(0xd))
}
initialMembers := make(map[uint64]string)
for idx, v := range addresses {
// key is the ReplicaID, ReplicaID is not allowed to be 0
// value is the raft address
initialMembers[uint64(idx+1)] = v
}
nodeAddr := initialMembers[uint64(*replicaID)]
fmt.Fprintf(os.Stdout, "node address: %s\n", nodeAddr)
// change the log verbosity
logger.GetLogger("raft").SetLevel(logger.ERROR)
logger.GetLogger("rsm").SetLevel(logger.WARNING)
logger.GetLogger("transport").SetLevel(logger.WARNING)
logger.GetLogger("grpc").SetLevel(logger.WARNING)
// config for raft
// note the ShardID value is not specified here
rc := config.Config{
ReplicaID: uint64(*replicaID),
ElectionRTT: 5,
HeartbeatRTT: 1,
CheckQuorum: true,
SnapshotEntries: 10,
CompactionOverhead: 5,
}
datadir := filepath.Join(
"example-data",
"multigroup-data",
fmt.Sprintf("node%d", *replicaID))
// config for the nodehost
// by default, insecure transport is used, you can choose to use Mutual TLS
// Authentication to authenticate both servers and clients. To use Mutual
// TLS Authentication, set the MutualTLS field in NodeHostConfig to true, set
// the CAFile, CertFile and KeyFile fields to point to the path of your CA
// file, certificate and key files.
// by default, TCP based RPC module is used, set the RaftRPCFactory field in
// NodeHostConfig to rpc.NewRaftGRPC (github.com/lni/dragonboat/plugin/rpc) to
// use gRPC based transport. To use gRPC based RPC module, you need to install
// the gRPC library first -
//
// $ go get -u google.golang.org/grpc
//
nhc := config.NodeHostConfig{
WALDir: datadir,
NodeHostDir: datadir,
RTTMillisecond: 200,
RaftAddress: nodeAddr,
// RaftRPCFactory: rpc.NewRaftGRPC,
}
// create a NodeHost instance. it is a facade interface allowing access to
// all functionalities provided by dragonboat.
nh, err := dragonboat.NewNodeHost(nhc)
if err != nil {
panic(err)
}
defer nh.Close()
// start the first cluster
// we use StateMachine as the IStateMachine for this cluster, its
// behaviour is identical to the one used in the Hello World example.
rc.ShardID = shardID1
if err := nh.StartReplica(initialMembers, false, NewStateMachine, rc); err != nil {
fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err)
os.Exit(1)
}
// start the second cluster
// we use SecondStateMachine as the IStateMachine for the second cluster
rc.ShardID = shardID2
if err := nh.StartReplica(initialMembers, false, NewStateMachine, rc); err != nil {
fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err)
os.Exit(1)
}
raftStopper := syncutil.NewStopper()
consoleStopper := syncutil.NewStopper()
ch := make(chan string, 16)
consoleStopper.RunWorker(func() {
reader := bufio.NewReader(os.Stdin)
for {
s, err := reader.ReadString('\n')
if err != nil {
close(ch)
return
}
if s == "exit\n" {
raftStopper.Stop()
// no data will be lost/corrupted if nodehost.Stop() is not called
nh.Close()
return
}
ch <- s
}
})
raftStopper.RunWorker(func() {
// use NO-OP client session here
// check the example in godoc to see how to use a regular client session
cs1 := nh.GetNoOPSession(shardID1)
cs2 := nh.GetNoOPSession(shardID2)
for {
select {
case v, ok := <-ch:
if !ok {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
// remove the \n char
msg := strings.Replace(strings.TrimSpace(v), "\n", "", 1)
var err error
// In this example, the strategy on how data is sharded across different
// Raft groups is based on whether the input message ends with a "?".
// In your application, you are free to choose strategies suitable for
// your application.
if strings.HasSuffix(msg, "?") {
// user message ends with "?", make a proposal to update the second
// raft group
_, err = nh.SyncPropose(ctx, cs2, []byte(msg))
} else {
// message not ends with "?", make a proposal to update the first
// raft group
_, err = nh.SyncPropose(ctx, cs1, []byte(msg))
}
cancel()
if err != nil {
fmt.Fprintf(os.Stderr, "SyncPropose returned error %v\n", err)
}
case <-raftStopper.ShouldStop():
return
}
}
})
raftStopper.Wait()
}