Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/Jille/raft-grpc-transport v1.5.0
github.com/Jille/raftadmin v1.2.1
github.com/cockroachdb/errors v1.11.1
github.com/hashicorp/go-hclog v1.5.0
github.com/hashicorp/raft v1.6.0
github.com/hashicorp/raft-boltdb/v2 v2.3.0
github.com/spaolacci/murmur3 v1.1.0
Expand All @@ -28,7 +29,6 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-msgpack/v2 v2.1.1 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
Expand Down
5 changes: 3 additions & 2 deletions kv/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"google.golang.org/grpc/reflection"
)

var raftHostformat = "localhost:6000%d"
var hostformat = "localhost:5000%d"

var kvs map[string]Store
Expand Down Expand Up @@ -65,6 +66,7 @@ func createNode(n int) []*grpc.Server {
for i := 0; i < n; i++ {
ctx := context.Background()
addr := fmt.Sprintf(hostformat, i)
raftAddr := fmt.Sprintf(raftHostformat, i)
_, port, err := net.SplitHostPort(addr)
if err != nil {
log.Fatalf("failed to parse local address (%q): %v", fmt.Sprintf(hostformat, i), err)
Expand All @@ -78,13 +80,12 @@ func createNode(n int) []*grpc.Server {
fsm := NewKvFSM(st)

kvs[strconv.Itoa(i)] = st
r, tm, err := NewRaft(ctx, strconv.Itoa(i), addr, fsm, i == 0, cfg)
r, err := NewRaft(ctx, strconv.Itoa(i), raftAddr, fsm, i == 0, cfg)
if err != nil {
log.Fatalf("failed to start raft: %v", err)
}
s := grpc.NewServer()
pb.RegisterRawKVServer(s, NewGRPCServer(fsm, st, r))
tm.Register(s)
leaderhealth.Setup(r, s, []string{"Example"})
raftadmin.Register(s, r)
reflection.Register(s)
Expand Down
49 changes: 37 additions & 12 deletions kv/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,26 @@ package kv

import (
"context"
"net"

transport "github.com/Jille/raft-grpc-transport"
"github.com/cockroachdb/errors"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/raft"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func NewRaft(_ context.Context, myID, myAddress string, fsm raft.FSM, bootstrap bool, cfg raft.Configuration) (
*raft.Raft, *transport.Manager, error) {
const (
defaultTimeout = 10
maxPool = 3
)

func NewRaft(
_ context.Context,
myID string,
myAddress string,
fsm raft.FSM,
bootstrap bool,
cfg raft.Configuration) (
*raft.Raft, error) {
c := raft.DefaultConfig()
c.LocalID = raft.ServerID(myID)

Expand All @@ -20,21 +30,36 @@ func NewRaft(_ context.Context, myID, myAddress string, fsm raft.FSM, bootstrap
sdb := raft.NewInmemStore()
fss := raft.NewInmemSnapshotStore()

tm := transport.New(raft.ServerAddress(myAddress), []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
})
advertise, err := net.ResolveTCPAddr("tcp", myAddress)
if err != nil {
return nil, errors.WithStack(err)
}

raftTransport, err := raft.NewTCPTransportWithLogger(
myAddress,
advertise,
maxPool,
defaultTimeout,
hclog.New(&hclog.LoggerOptions{
Name: "raft-net",
Output: hclog.DefaultOutput,
Level: hclog.DefaultLevel,
}))
if err != nil {
return nil, errors.WithStack(err)
}

r, err := raft.NewRaft(c, fsm, ldb, sdb, fss, tm.Transport())
r, err := raft.NewRaft(c, fsm, ldb, sdb, fss, raftTransport)
if err != nil {
return nil, nil, errors.WithStack(err)
return nil, errors.WithStack(err)
}

if bootstrap {
f := r.BootstrapCluster(cfg)
if err := f.Error(); err != nil {
return nil, nil, errors.WithStack(err)
return nil, errors.WithStack(err)
}
}

return r, tm, nil
return r, nil
}