diff --git a/go.mod b/go.mod index f987ea0b2..a0ccd3cde 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/Jille/raft-grpc-transport v1.5.0 github.com/Jille/raftadmin v1.2.1 github.com/cockroachdb/errors v1.11.1 + github.com/hashicorp/go-hclog v1.5.0 github.com/hashicorp/raft v1.6.0 github.com/hashicorp/raft-boltdb/v2 v2.3.0 github.com/spaolacci/murmur3 v1.1.0 @@ -28,7 +29,6 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect - github.com/hashicorp/go-hclog v1.5.0 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-msgpack/v2 v2.1.1 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect diff --git a/kv/grpc_test.go b/kv/grpc_test.go index 9d0f77560..30b94225d 100644 --- a/kv/grpc_test.go +++ b/kv/grpc_test.go @@ -23,6 +23,7 @@ import ( "google.golang.org/grpc/reflection" ) +var raftHostformat = "localhost:6000%d" var hostformat = "localhost:5000%d" var kvs map[string]Store @@ -65,6 +66,7 @@ func createNode(n int) []*grpc.Server { for i := 0; i < n; i++ { ctx := context.Background() addr := fmt.Sprintf(hostformat, i) + raftAddr := fmt.Sprintf(raftHostformat, i) _, port, err := net.SplitHostPort(addr) if err != nil { log.Fatalf("failed to parse local address (%q): %v", fmt.Sprintf(hostformat, i), err) @@ -78,13 +80,12 @@ func createNode(n int) []*grpc.Server { fsm := NewKvFSM(st) kvs[strconv.Itoa(i)] = st - r, tm, err := NewRaft(ctx, strconv.Itoa(i), addr, fsm, i == 0, cfg) + r, err := NewRaft(ctx, strconv.Itoa(i), raftAddr, fsm, i == 0, cfg) if err != nil { log.Fatalf("failed to start raft: %v", err) } s := grpc.NewServer() pb.RegisterRawKVServer(s, NewGRPCServer(fsm, st, r)) - tm.Register(s) leaderhealth.Setup(r, s, []string{"Example"}) raftadmin.Register(s, r) reflection.Register(s) diff --git a/kv/raft.go b/kv/raft.go index 7c055a086..5365e31aa 100644 --- a/kv/raft.go +++ b/kv/raft.go @@ -2,16 +2,26 @@ package kv import ( "context" + "net" - transport "github.com/Jille/raft-grpc-transport" "github.com/cockroachdb/errors" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/raft" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) -func NewRaft(_ context.Context, myID, myAddress string, fsm raft.FSM, bootstrap bool, cfg raft.Configuration) ( - *raft.Raft, *transport.Manager, error) { +const ( + defaultTimeout = 10 + maxPool = 3 +) + +func NewRaft( + _ context.Context, + myID string, + myAddress string, + fsm raft.FSM, + bootstrap bool, + cfg raft.Configuration) ( + *raft.Raft, error) { c := raft.DefaultConfig() c.LocalID = raft.ServerID(myID) @@ -20,21 +30,36 @@ func NewRaft(_ context.Context, myID, myAddress string, fsm raft.FSM, bootstrap sdb := raft.NewInmemStore() fss := raft.NewInmemSnapshotStore() - tm := transport.New(raft.ServerAddress(myAddress), []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - }) + advertise, err := net.ResolveTCPAddr("tcp", myAddress) + if err != nil { + return nil, errors.WithStack(err) + } + + raftTransport, err := raft.NewTCPTransportWithLogger( + myAddress, + advertise, + maxPool, + defaultTimeout, + hclog.New(&hclog.LoggerOptions{ + Name: "raft-net", + Output: hclog.DefaultOutput, + Level: hclog.DefaultLevel, + })) + if err != nil { + return nil, errors.WithStack(err) + } - r, err := raft.NewRaft(c, fsm, ldb, sdb, fss, tm.Transport()) + r, err := raft.NewRaft(c, fsm, ldb, sdb, fss, raftTransport) if err != nil { - return nil, nil, errors.WithStack(err) + return nil, errors.WithStack(err) } if bootstrap { f := r.BootstrapCluster(cfg) if err := f.Error(); err != nil { - return nil, nil, errors.WithStack(err) + return nil, errors.WithStack(err) } } - return r, tm, nil + return r, nil }