/
master.go
118 lines (98 loc) · 3.02 KB
/
master.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package cluster
import (
"context"
"github.com/ByteStorage/FlyDB/lib/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"net"
"os"
"os/signal"
"syscall"
"time"
)
func (m *Master) ListenSlave() {
timeTick := time.NewTicker(200 * time.Millisecond)
for range timeTick.C {
for k, t := range m.Heartbeat {
if t.Add(5*time.Second).Unix() < time.Now().Unix() {
delete(m.Heartbeat, k)
}
}
}
}
func (m *Master) WaitForLeader() {
timeTick := time.NewTicker(100 * time.Millisecond)
for range timeTick.C {
ch := m.Raft.LeaderCh()
select {
case isLeader := <-ch:
if isLeader {
return
}
default:
continue
}
}
}
func (m *Master) NewRaft() {
//判断是新启动的master还是重启的master
}
func (m *Master) StartGrpcServer() {
listener, err := net.Listen("tcp", m.Addr)
if err != nil {
panic(err)
}
server := grpc.NewServer()
grpc_health_v1.RegisterHealthServer(server, health.NewServer())
go func() {
err := server.Serve(listener)
if err != nil {
panic(err)
}
}()
// graceful shutdown
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGKILL)
<-sig
}
func (m *Master) ListenRequest() {
}
func (m *Master) Get(ctx context.Context, in *proto.MasterGetRequest) (*proto.MasterGetResponse, error) {
panic("implement me")
}
func (m *Master) Set(ctx context.Context, in *proto.MasterSetRequest) (*proto.MasterSetResponse, error) {
panic("implement me")
}
func (m *Master) Del(ctx context.Context, in *proto.MasterDelRequest) (*proto.MasterDelResponse, error) {
panic("implement me")
}
func (m *Master) Keys(ctx context.Context, in *proto.MasterKeysRequest) (*proto.MasterKeysResponse, error) {
panic("implement me")
}
func (m *Master) Scan(ctx context.Context, in *proto.MasterScanRequest) (*proto.MasterScanResponse, error) {
panic("implement me")
}
func (m *Master) Expire(ctx context.Context, in *proto.MasterExpireRequest) (*proto.MasterExpireResponse, error) {
panic("implement me")
}
func (m *Master) TTL(ctx context.Context, in *proto.MasterTTLRequest) (*proto.MasterTTLResponse, error) {
panic("implement me")
}
func (m *Master) Ping(ctx context.Context, in *proto.MasterPingRequest) (*proto.MasterPingResponse, error) {
panic("implement me")
}
func (m *Master) Shutdown(ctx context.Context, in *proto.MasterShutdownRequest) (*proto.MasterShutdownResponse, error) {
panic("implement me")
}
func (m *Master) RegisterSlave(ctx context.Context, in *proto.MasterRegisterSlaveRequest) (*proto.MasterRegisterSlaveResponse, error) {
panic("implement me")
}
func (m *Master) CurrentLeader(ctx context.Context, in *proto.MasterCurrentLeaderRequest) (*proto.MasterCurrentLeaderResponse, error) {
addr, _ := m.Raft.LeaderWithID()
return &proto.MasterCurrentLeaderResponse{Leader: string(addr)}, nil
}
func (m *Master) ReceiveHeartbeat(ctx context.Context, in *proto.MasterHeartbeatRequest) (*proto.MasterHeartbeatResponse, error) {
m.Heartbeat[in.Addr] = time.Now()
return &proto.MasterHeartbeatResponse{}, nil
}