diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ec501f6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,24 @@ +Created .gitignore file for flag type go. +# If you prefer the allow list template instead of the deny list, see community template: +# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore +# +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work +bin +*.log diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d819e58 --- /dev/null +++ b/Makefile @@ -0,0 +1,29 @@ +.PHONY: all + +all: raft-server client + +update-proto: + protoc --proto_path=./proto --go_out=proto/raft --go_opt=paths=source_relative raft.proto + protoc --proto_path=./proto --go_out=proto/server --go_opt=paths=source_relative server.proto + +raft-server: + go build -o bin/raft cmd/server/main.go + +client: + go build -o bin/client cmd/client/main.go + +run-3-server: + echo "running on port 8000" && ./bin/raft -port=8000 -log_path=server1.log & + echo "running on port 8001" && ./bin/raft -port=8001 -log_path=server2.log & + echo "running on port 8002" && ./bin/raft -port=8002 -log_path=server3.log & + +run-5-server: + echo "running on port 8000" && ./bin/raft -port=8000 -log_path=server1.log & + echo "running on port 8001" && ./bin/raft -port=8001 -log_path=server2.log & + echo "running on port 8002" && ./bin/raft -port=8002 -log_path=server3.log & + echo "running on port 8003" && ./bin/raft -port=8003 -log_path=server4.log & + echo "running on port 8004" && ./bin/raft -port=8004 -log_path=server5.log & + +clean: + rm -rf bin + rm *.log diff --git a/cmd/client/main.go b/cmd/client/main.go new file mode 100644 index 0000000..5a8da6e --- /dev/null +++ b/cmd/client/main.go @@ -0,0 +1,44 @@ +package main + +import ( + "flag" + "fmt" + + raftPb "github.com/adylanrff/raft-algorithm/proto/raft" + serverPb "github.com/adylanrff/raft-algorithm/proto/server" + "github.com/adylanrff/raft-algorithm/raft" + "github.com/adylanrff/raft-algorithm/rpc" + "github.com/adylanrff/raft-algorithm/util" +) + +var port int +var logPath string + +func init() { + flag.IntVar(&port, "port", 8000, "server target port") + flag.StringVar(&logPath, "log_path", "server.log", "log path") + + flag.Parse() +} + +func main() { + util.InitLogger(logPath) + + raftMsg := rpc.ServerMessageDTO{ + ServerMessage: &serverPb.ServerMessage{ + Method: raft.RaftMethodName_RequestVotes, + Payload: &serverPb.ServerMessage_ServerRequest{ + ServerRequest: &serverPb.ServerRequest{ + Request: &serverPb.ServerRequest_RequestVoteRequest{ + RequestVoteRequest: &raftPb.RequestVoteRequest{}, + }, + }, + }, + }, + } + + rpcClient := rpc.NewRPCClient() + resp, err := rpcClient.Call(fmt.Sprintf("0.0.0.0:%d", port), &raftMsg) + + fmt.Printf("resp=%+v,err=%v", resp, err) +} diff --git a/cmd/server/main.go b/cmd/server/main.go new file mode 100644 index 0000000..d258b01 --- /dev/null +++ b/cmd/server/main.go @@ -0,0 +1,67 @@ +package main + +import ( + "flag" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/adylanrff/raft-algorithm/raft" + "github.com/adylanrff/raft-algorithm/rpc" + "github.com/adylanrff/raft-algorithm/util" + log "github.com/sirupsen/logrus" +) + +var port int +var logPath string + +func init() { + flag.IntVar(&port, "port", 8000, "server port") + flag.StringVar(&logPath, "log_path", "server.log", "log path") + + flag.Parse() + rand.Seed(time.Now().UnixNano()) +} + +func main() { + util.InitLogger(logPath) + + address := fmt.Sprintf("127.0.0.1:%d", port) + serverID := raft.GetRaftServerIDFromAddress(address) + + raftHandler := raft.NewRaft(serverID, address, raft.RaftConfig{ + ElectionTimeout: 1000 * time.Millisecond, + IdleTimeout: 500 * time.Millisecond, + ClusterMemberAddreses: []string{ + "127.0.0.1:8000", + "127.0.0.1:8001", + "127.0.0.1:8002", + "127.0.0.1:8003", + "127.0.0.1:8004", + }, + }) + + raftServerHandler := raft.NewRaftServerhandler(raftHandler) + + server := rpc.NewServer(address) + server.AddHandler(raft.RaftMethodName_RequestVotes, raftServerHandler.RequestVoteHandler) + server.AddHandler(raft.RaftMethodName_AppendEntries, raftServerHandler.AppendEntriesHandler) + + var wg sync.WaitGroup + + wg.Add(2) + go func() { + defer wg.Done() + server.Run() + }() + + go func() { + defer wg.Done() + raftHandler.Run() + }() + + wg.Wait() + + log.Debug("server terminated") +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..7943df9 --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module github.com/adylanrff/raft-algorithm + +go 1.19 + +require ( + github.com/sirupsen/logrus v1.9.0 // indirect + golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect + google.golang.org/protobuf v1.28.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..cf7d1bd --- /dev/null +++ b/go.sum @@ -0,0 +1,17 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= +github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/proto/raft.proto b/proto/raft.proto new file mode 100644 index 0000000..58411e7 --- /dev/null +++ b/proto/raft.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; +option go_package = "github.com/adylanrff/raft-algorithm/proto/raft"; + +// RequestVote Method +message RequestVoteRequest { + int32 term = 1; + string candidate_id = 2; + int32 last_log_index = 3; + int32 last_log_term = 4; +} + +message RequestVoteResponse { + int32 term = 1; + bool vote_granted = 2; +} + +// AppendEntries Method +message AppendEntriesRequest { + int32 term = 1; + string leader_id = 2; +} + +message AppendEntriesResponse { + int32 term = 1; + bool success = 2; +} diff --git a/proto/raft/raft.pb.go b/proto/raft/raft.pb.go new file mode 100644 index 0000000..c9ffdb3 --- /dev/null +++ b/proto/raft/raft.pb.go @@ -0,0 +1,394 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.9 +// source: raft.proto + +package raft + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// RequestVote Method +type RequestVoteRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Term int32 `protobuf:"varint,1,opt,name=term,proto3" json:"term,omitempty"` + CandidateId string `protobuf:"bytes,2,opt,name=candidate_id,json=candidateId,proto3" json:"candidate_id,omitempty"` + LastLogIndex int32 `protobuf:"varint,3,opt,name=last_log_index,json=lastLogIndex,proto3" json:"last_log_index,omitempty"` + LastLogTerm int32 `protobuf:"varint,4,opt,name=last_log_term,json=lastLogTerm,proto3" json:"last_log_term,omitempty"` +} + +func (x *RequestVoteRequest) Reset() { + *x = RequestVoteRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_raft_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RequestVoteRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RequestVoteRequest) ProtoMessage() {} + +func (x *RequestVoteRequest) ProtoReflect() protoreflect.Message { + mi := &file_raft_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RequestVoteRequest.ProtoReflect.Descriptor instead. +func (*RequestVoteRequest) Descriptor() ([]byte, []int) { + return file_raft_proto_rawDescGZIP(), []int{0} +} + +func (x *RequestVoteRequest) GetTerm() int32 { + if x != nil { + return x.Term + } + return 0 +} + +func (x *RequestVoteRequest) GetCandidateId() string { + if x != nil { + return x.CandidateId + } + return "" +} + +func (x *RequestVoteRequest) GetLastLogIndex() int32 { + if x != nil { + return x.LastLogIndex + } + return 0 +} + +func (x *RequestVoteRequest) GetLastLogTerm() int32 { + if x != nil { + return x.LastLogTerm + } + return 0 +} + +type RequestVoteResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Term int32 `protobuf:"varint,1,opt,name=term,proto3" json:"term,omitempty"` + VoteGranted bool `protobuf:"varint,2,opt,name=vote_granted,json=voteGranted,proto3" json:"vote_granted,omitempty"` +} + +func (x *RequestVoteResponse) Reset() { + *x = RequestVoteResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_raft_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RequestVoteResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RequestVoteResponse) ProtoMessage() {} + +func (x *RequestVoteResponse) ProtoReflect() protoreflect.Message { + mi := &file_raft_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RequestVoteResponse.ProtoReflect.Descriptor instead. +func (*RequestVoteResponse) Descriptor() ([]byte, []int) { + return file_raft_proto_rawDescGZIP(), []int{1} +} + +func (x *RequestVoteResponse) GetTerm() int32 { + if x != nil { + return x.Term + } + return 0 +} + +func (x *RequestVoteResponse) GetVoteGranted() bool { + if x != nil { + return x.VoteGranted + } + return false +} + +// AppendEntries Method +type AppendEntriesRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Term int32 `protobuf:"varint,1,opt,name=term,proto3" json:"term,omitempty"` + LeaderId string `protobuf:"bytes,2,opt,name=leader_id,json=leaderId,proto3" json:"leader_id,omitempty"` +} + +func (x *AppendEntriesRequest) Reset() { + *x = AppendEntriesRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_raft_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *AppendEntriesRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AppendEntriesRequest) ProtoMessage() {} + +func (x *AppendEntriesRequest) ProtoReflect() protoreflect.Message { + mi := &file_raft_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AppendEntriesRequest.ProtoReflect.Descriptor instead. +func (*AppendEntriesRequest) Descriptor() ([]byte, []int) { + return file_raft_proto_rawDescGZIP(), []int{2} +} + +func (x *AppendEntriesRequest) GetTerm() int32 { + if x != nil { + return x.Term + } + return 0 +} + +func (x *AppendEntriesRequest) GetLeaderId() string { + if x != nil { + return x.LeaderId + } + return "" +} + +type AppendEntriesResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Term int32 `protobuf:"varint,1,opt,name=term,proto3" json:"term,omitempty"` + Success bool `protobuf:"varint,2,opt,name=success,proto3" json:"success,omitempty"` +} + +func (x *AppendEntriesResponse) Reset() { + *x = AppendEntriesResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_raft_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *AppendEntriesResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AppendEntriesResponse) ProtoMessage() {} + +func (x *AppendEntriesResponse) ProtoReflect() protoreflect.Message { + mi := &file_raft_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AppendEntriesResponse.ProtoReflect.Descriptor instead. +func (*AppendEntriesResponse) Descriptor() ([]byte, []int) { + return file_raft_proto_rawDescGZIP(), []int{3} +} + +func (x *AppendEntriesResponse) GetTerm() int32 { + if x != nil { + return x.Term + } + return 0 +} + +func (x *AppendEntriesResponse) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +var File_raft_proto protoreflect.FileDescriptor + +var file_raft_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x72, 0x61, 0x66, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x95, 0x01, 0x0a, + 0x12, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x56, 0x6f, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x05, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x12, 0x21, 0x0a, 0x0c, 0x63, 0x61, 0x6e, 0x64, 0x69, + 0x64, 0x61, 0x74, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x63, + 0x61, 0x6e, 0x64, 0x69, 0x64, 0x61, 0x74, 0x65, 0x49, 0x64, 0x12, 0x24, 0x0a, 0x0e, 0x6c, 0x61, + 0x73, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x05, 0x52, 0x0c, 0x6c, 0x61, 0x73, 0x74, 0x4c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, + 0x12, 0x22, 0x0a, 0x0d, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, 0x74, 0x65, 0x72, + 0x6d, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x6c, 0x61, 0x73, 0x74, 0x4c, 0x6f, 0x67, + 0x54, 0x65, 0x72, 0x6d, 0x22, 0x4c, 0x0a, 0x13, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x56, + 0x6f, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, + 0x65, 0x72, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x12, + 0x21, 0x0a, 0x0c, 0x76, 0x6f, 0x74, 0x65, 0x5f, 0x67, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x64, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x76, 0x6f, 0x74, 0x65, 0x47, 0x72, 0x61, 0x6e, 0x74, + 0x65, 0x64, 0x22, 0x47, 0x0a, 0x14, 0x41, 0x70, 0x70, 0x65, 0x6e, 0x64, 0x45, 0x6e, 0x74, 0x72, + 0x69, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, + 0x72, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x12, 0x1b, + 0x0a, 0x09, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x08, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x49, 0x64, 0x22, 0x45, 0x0a, 0x15, 0x41, + 0x70, 0x70, 0x65, 0x6e, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x05, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, + 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, + 0x73, 0x73, 0x42, 0x30, 0x5a, 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x61, 0x64, 0x79, 0x6c, 0x61, 0x6e, 0x72, 0x66, 0x66, 0x2f, 0x72, 0x61, 0x66, 0x74, 0x2d, + 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, + 0x72, 0x61, 0x66, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_raft_proto_rawDescOnce sync.Once + file_raft_proto_rawDescData = file_raft_proto_rawDesc +) + +func file_raft_proto_rawDescGZIP() []byte { + file_raft_proto_rawDescOnce.Do(func() { + file_raft_proto_rawDescData = protoimpl.X.CompressGZIP(file_raft_proto_rawDescData) + }) + return file_raft_proto_rawDescData +} + +var file_raft_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_raft_proto_goTypes = []interface{}{ + (*RequestVoteRequest)(nil), // 0: RequestVoteRequest + (*RequestVoteResponse)(nil), // 1: RequestVoteResponse + (*AppendEntriesRequest)(nil), // 2: AppendEntriesRequest + (*AppendEntriesResponse)(nil), // 3: AppendEntriesResponse +} +var file_raft_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_raft_proto_init() } +func file_raft_proto_init() { + if File_raft_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_raft_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RequestVoteRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_raft_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RequestVoteResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_raft_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*AppendEntriesRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_raft_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*AppendEntriesResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_raft_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_raft_proto_goTypes, + DependencyIndexes: file_raft_proto_depIdxs, + MessageInfos: file_raft_proto_msgTypes, + }.Build() + File_raft_proto = out.File + file_raft_proto_rawDesc = nil + file_raft_proto_goTypes = nil + file_raft_proto_depIdxs = nil +} diff --git a/proto/server.proto b/proto/server.proto new file mode 100644 index 0000000..e01ba71 --- /dev/null +++ b/proto/server.proto @@ -0,0 +1,38 @@ +syntax = "proto3"; +option go_package = "github.com/adylanrff/raft-algorithm/proto/server"; + +import "raft.proto"; + +enum ServerMessageType { + ServerMessageType_Unknown = 0; + ServerMessageType_ServerRequest = 1; + ServerMessageType_ServerResponse = 2; +} + +message ServerMessage { + string method = 1; + oneof payload { + ServerRequest server_request = 3; + ServerResponse server_response = 4; + } +} + +message ServerRequest { + oneof request { + RequestVoteRequest request_vote_request = 1; + AppendEntriesRequest append_entries_request = 2; + } +} + +message ServerResponse { + oneof response { + RequestVoteResponse request_vote_response = 1; + AppendEntriesResponse append_entries_response = 2; + ErrorResponse error_response = 3; + } +} + +message ErrorResponse { + int32 error_code = 1; + string error_msg = 2; +} diff --git a/proto/server/server.pb.go b/proto/server/server.pb.go new file mode 100644 index 0000000..2602e7d --- /dev/null +++ b/proto/server/server.pb.go @@ -0,0 +1,578 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.9 +// source: server.proto + +package server + +import ( + raft "github.com/adylanrff/raft-algorithm/proto/raft" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ServerMessageType int32 + +const ( + ServerMessageType_ServerMessageType_Unknown ServerMessageType = 0 + ServerMessageType_ServerMessageType_ServerRequest ServerMessageType = 1 + ServerMessageType_ServerMessageType_ServerResponse ServerMessageType = 2 +) + +// Enum value maps for ServerMessageType. +var ( + ServerMessageType_name = map[int32]string{ + 0: "ServerMessageType_Unknown", + 1: "ServerMessageType_ServerRequest", + 2: "ServerMessageType_ServerResponse", + } + ServerMessageType_value = map[string]int32{ + "ServerMessageType_Unknown": 0, + "ServerMessageType_ServerRequest": 1, + "ServerMessageType_ServerResponse": 2, + } +) + +func (x ServerMessageType) Enum() *ServerMessageType { + p := new(ServerMessageType) + *p = x + return p +} + +func (x ServerMessageType) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (ServerMessageType) Descriptor() protoreflect.EnumDescriptor { + return file_server_proto_enumTypes[0].Descriptor() +} + +func (ServerMessageType) Type() protoreflect.EnumType { + return &file_server_proto_enumTypes[0] +} + +func (x ServerMessageType) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use ServerMessageType.Descriptor instead. +func (ServerMessageType) EnumDescriptor() ([]byte, []int) { + return file_server_proto_rawDescGZIP(), []int{0} +} + +type ServerMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Method string `protobuf:"bytes,1,opt,name=method,proto3" json:"method,omitempty"` + // Types that are assignable to Payload: + // *ServerMessage_ServerRequest + // *ServerMessage_ServerResponse + Payload isServerMessage_Payload `protobuf_oneof:"payload"` +} + +func (x *ServerMessage) Reset() { + *x = ServerMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_server_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ServerMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ServerMessage) ProtoMessage() {} + +func (x *ServerMessage) ProtoReflect() protoreflect.Message { + mi := &file_server_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ServerMessage.ProtoReflect.Descriptor instead. +func (*ServerMessage) Descriptor() ([]byte, []int) { + return file_server_proto_rawDescGZIP(), []int{0} +} + +func (x *ServerMessage) GetMethod() string { + if x != nil { + return x.Method + } + return "" +} + +func (m *ServerMessage) GetPayload() isServerMessage_Payload { + if m != nil { + return m.Payload + } + return nil +} + +func (x *ServerMessage) GetServerRequest() *ServerRequest { + if x, ok := x.GetPayload().(*ServerMessage_ServerRequest); ok { + return x.ServerRequest + } + return nil +} + +func (x *ServerMessage) GetServerResponse() *ServerResponse { + if x, ok := x.GetPayload().(*ServerMessage_ServerResponse); ok { + return x.ServerResponse + } + return nil +} + +type isServerMessage_Payload interface { + isServerMessage_Payload() +} + +type ServerMessage_ServerRequest struct { + ServerRequest *ServerRequest `protobuf:"bytes,3,opt,name=server_request,json=serverRequest,proto3,oneof"` +} + +type ServerMessage_ServerResponse struct { + ServerResponse *ServerResponse `protobuf:"bytes,4,opt,name=server_response,json=serverResponse,proto3,oneof"` +} + +func (*ServerMessage_ServerRequest) isServerMessage_Payload() {} + +func (*ServerMessage_ServerResponse) isServerMessage_Payload() {} + +type ServerRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Request: + // *ServerRequest_RequestVoteRequest + // *ServerRequest_AppendEntriesRequest + Request isServerRequest_Request `protobuf_oneof:"request"` +} + +func (x *ServerRequest) Reset() { + *x = ServerRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_server_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ServerRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ServerRequest) ProtoMessage() {} + +func (x *ServerRequest) ProtoReflect() protoreflect.Message { + mi := &file_server_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ServerRequest.ProtoReflect.Descriptor instead. +func (*ServerRequest) Descriptor() ([]byte, []int) { + return file_server_proto_rawDescGZIP(), []int{1} +} + +func (m *ServerRequest) GetRequest() isServerRequest_Request { + if m != nil { + return m.Request + } + return nil +} + +func (x *ServerRequest) GetRequestVoteRequest() *raft.RequestVoteRequest { + if x, ok := x.GetRequest().(*ServerRequest_RequestVoteRequest); ok { + return x.RequestVoteRequest + } + return nil +} + +func (x *ServerRequest) GetAppendEntriesRequest() *raft.AppendEntriesRequest { + if x, ok := x.GetRequest().(*ServerRequest_AppendEntriesRequest); ok { + return x.AppendEntriesRequest + } + return nil +} + +type isServerRequest_Request interface { + isServerRequest_Request() +} + +type ServerRequest_RequestVoteRequest struct { + RequestVoteRequest *raft.RequestVoteRequest `protobuf:"bytes,1,opt,name=request_vote_request,json=requestVoteRequest,proto3,oneof"` +} + +type ServerRequest_AppendEntriesRequest struct { + AppendEntriesRequest *raft.AppendEntriesRequest `protobuf:"bytes,2,opt,name=append_entries_request,json=appendEntriesRequest,proto3,oneof"` +} + +func (*ServerRequest_RequestVoteRequest) isServerRequest_Request() {} + +func (*ServerRequest_AppendEntriesRequest) isServerRequest_Request() {} + +type ServerResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Response: + // *ServerResponse_RequestVoteResponse + // *ServerResponse_AppendEntriesResponse + // *ServerResponse_ErrorResponse + Response isServerResponse_Response `protobuf_oneof:"response"` +} + +func (x *ServerResponse) Reset() { + *x = ServerResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_server_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ServerResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ServerResponse) ProtoMessage() {} + +func (x *ServerResponse) ProtoReflect() protoreflect.Message { + mi := &file_server_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ServerResponse.ProtoReflect.Descriptor instead. +func (*ServerResponse) Descriptor() ([]byte, []int) { + return file_server_proto_rawDescGZIP(), []int{2} +} + +func (m *ServerResponse) GetResponse() isServerResponse_Response { + if m != nil { + return m.Response + } + return nil +} + +func (x *ServerResponse) GetRequestVoteResponse() *raft.RequestVoteResponse { + if x, ok := x.GetResponse().(*ServerResponse_RequestVoteResponse); ok { + return x.RequestVoteResponse + } + return nil +} + +func (x *ServerResponse) GetAppendEntriesResponse() *raft.AppendEntriesResponse { + if x, ok := x.GetResponse().(*ServerResponse_AppendEntriesResponse); ok { + return x.AppendEntriesResponse + } + return nil +} + +func (x *ServerResponse) GetErrorResponse() *ErrorResponse { + if x, ok := x.GetResponse().(*ServerResponse_ErrorResponse); ok { + return x.ErrorResponse + } + return nil +} + +type isServerResponse_Response interface { + isServerResponse_Response() +} + +type ServerResponse_RequestVoteResponse struct { + RequestVoteResponse *raft.RequestVoteResponse `protobuf:"bytes,1,opt,name=request_vote_response,json=requestVoteResponse,proto3,oneof"` +} + +type ServerResponse_AppendEntriesResponse struct { + AppendEntriesResponse *raft.AppendEntriesResponse `protobuf:"bytes,2,opt,name=append_entries_response,json=appendEntriesResponse,proto3,oneof"` +} + +type ServerResponse_ErrorResponse struct { + ErrorResponse *ErrorResponse `protobuf:"bytes,3,opt,name=error_response,json=errorResponse,proto3,oneof"` +} + +func (*ServerResponse_RequestVoteResponse) isServerResponse_Response() {} + +func (*ServerResponse_AppendEntriesResponse) isServerResponse_Response() {} + +func (*ServerResponse_ErrorResponse) isServerResponse_Response() {} + +type ErrorResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ErrorCode int32 `protobuf:"varint,1,opt,name=error_code,json=errorCode,proto3" json:"error_code,omitempty"` + ErrorMsg string `protobuf:"bytes,2,opt,name=error_msg,json=errorMsg,proto3" json:"error_msg,omitempty"` +} + +func (x *ErrorResponse) Reset() { + *x = ErrorResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_server_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ErrorResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ErrorResponse) ProtoMessage() {} + +func (x *ErrorResponse) ProtoReflect() protoreflect.Message { + mi := &file_server_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ErrorResponse.ProtoReflect.Descriptor instead. +func (*ErrorResponse) Descriptor() ([]byte, []int) { + return file_server_proto_rawDescGZIP(), []int{3} +} + +func (x *ErrorResponse) GetErrorCode() int32 { + if x != nil { + return x.ErrorCode + } + return 0 +} + +func (x *ErrorResponse) GetErrorMsg() string { + if x != nil { + return x.ErrorMsg + } + return "" +} + +var File_server_proto protoreflect.FileDescriptor + +var file_server_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x0a, + 0x72, 0x61, 0x66, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa7, 0x01, 0x0a, 0x0d, 0x53, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x16, 0x0a, 0x06, + 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x65, + 0x74, 0x68, 0x6f, 0x64, 0x12, 0x37, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x72, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x53, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0d, + 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3a, 0x0a, + 0x0f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, + 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x09, 0x0a, 0x07, 0x70, 0x61, 0x79, + 0x6c, 0x6f, 0x61, 0x64, 0x22, 0xb2, 0x01, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x47, 0x0a, 0x14, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x5f, 0x76, 0x6f, 0x74, 0x65, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x56, 0x6f, + 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x12, 0x72, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x56, 0x6f, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x4d, 0x0a, 0x16, 0x61, 0x70, 0x70, 0x65, 0x6e, 0x64, 0x5f, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x65, + 0x73, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x15, 0x2e, 0x41, 0x70, 0x70, 0x65, 0x6e, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x14, 0x61, 0x70, 0x70, 0x65, 0x6e, 0x64, + 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x42, 0x09, + 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xf3, 0x01, 0x0a, 0x0e, 0x53, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4a, 0x0a, 0x15, + 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x76, 0x6f, 0x74, 0x65, 0x5f, 0x72, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x56, 0x6f, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x48, 0x00, 0x52, 0x13, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x56, 0x6f, 0x74, 0x65, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x50, 0x0a, 0x17, 0x61, 0x70, 0x70, 0x65, + 0x6e, 0x64, 0x5f, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x41, 0x70, 0x70, 0x65, + 0x6e, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x48, 0x00, 0x52, 0x15, 0x61, 0x70, 0x70, 0x65, 0x6e, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x69, + 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x37, 0x0a, 0x0e, 0x65, 0x72, + 0x72, 0x6f, 0x72, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x48, 0x00, 0x52, 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x42, 0x0a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x4b, 0x0a, 0x0d, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x43, 0x6f, 0x64, 0x65, 0x12, + 0x1b, 0x0a, 0x09, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x08, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x2a, 0x7d, 0x0a, 0x11, + 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x54, 0x79, 0x70, + 0x65, 0x12, 0x1d, 0x0a, 0x19, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x54, 0x79, 0x70, 0x65, 0x5f, 0x55, 0x6e, 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x10, 0x00, + 0x12, 0x23, 0x0a, 0x1f, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x54, 0x79, 0x70, 0x65, 0x5f, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x10, 0x01, 0x12, 0x24, 0x0a, 0x20, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x54, 0x79, 0x70, 0x65, 0x5f, 0x53, 0x65, 0x72, 0x76, 0x65, + 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x10, 0x02, 0x42, 0x32, 0x5a, 0x30, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x64, 0x79, 0x6c, 0x61, 0x6e, + 0x72, 0x66, 0x66, 0x2f, 0x72, 0x61, 0x66, 0x74, 0x2d, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, + 0x68, 0x6d, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_server_proto_rawDescOnce sync.Once + file_server_proto_rawDescData = file_server_proto_rawDesc +) + +func file_server_proto_rawDescGZIP() []byte { + file_server_proto_rawDescOnce.Do(func() { + file_server_proto_rawDescData = protoimpl.X.CompressGZIP(file_server_proto_rawDescData) + }) + return file_server_proto_rawDescData +} + +var file_server_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_server_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_server_proto_goTypes = []interface{}{ + (ServerMessageType)(0), // 0: ServerMessageType + (*ServerMessage)(nil), // 1: ServerMessage + (*ServerRequest)(nil), // 2: ServerRequest + (*ServerResponse)(nil), // 3: ServerResponse + (*ErrorResponse)(nil), // 4: ErrorResponse + (*raft.RequestVoteRequest)(nil), // 5: RequestVoteRequest + (*raft.AppendEntriesRequest)(nil), // 6: AppendEntriesRequest + (*raft.RequestVoteResponse)(nil), // 7: RequestVoteResponse + (*raft.AppendEntriesResponse)(nil), // 8: AppendEntriesResponse +} +var file_server_proto_depIdxs = []int32{ + 2, // 0: ServerMessage.server_request:type_name -> ServerRequest + 3, // 1: ServerMessage.server_response:type_name -> ServerResponse + 5, // 2: ServerRequest.request_vote_request:type_name -> RequestVoteRequest + 6, // 3: ServerRequest.append_entries_request:type_name -> AppendEntriesRequest + 7, // 4: ServerResponse.request_vote_response:type_name -> RequestVoteResponse + 8, // 5: ServerResponse.append_entries_response:type_name -> AppendEntriesResponse + 4, // 6: ServerResponse.error_response:type_name -> ErrorResponse + 7, // [7:7] is the sub-list for method output_type + 7, // [7:7] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name +} + +func init() { file_server_proto_init() } +func file_server_proto_init() { + if File_server_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_server_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ServerMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_server_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ServerRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_server_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ServerResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_server_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ErrorResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_server_proto_msgTypes[0].OneofWrappers = []interface{}{ + (*ServerMessage_ServerRequest)(nil), + (*ServerMessage_ServerResponse)(nil), + } + file_server_proto_msgTypes[1].OneofWrappers = []interface{}{ + (*ServerRequest_RequestVoteRequest)(nil), + (*ServerRequest_AppendEntriesRequest)(nil), + } + file_server_proto_msgTypes[2].OneofWrappers = []interface{}{ + (*ServerResponse_RequestVoteResponse)(nil), + (*ServerResponse_AppendEntriesResponse)(nil), + (*ServerResponse_ErrorResponse)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_server_proto_rawDesc, + NumEnums: 1, + NumMessages: 4, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_server_proto_goTypes, + DependencyIndexes: file_server_proto_depIdxs, + EnumInfos: file_server_proto_enumTypes, + MessageInfos: file_server_proto_msgTypes, + }.Build() + File_server_proto = out.File + file_server_proto_rawDesc = nil + file_server_proto_goTypes = nil + file_server_proto_depIdxs = nil +} diff --git a/raft/client.go b/raft/client.go new file mode 100644 index 0000000..9ad111e --- /dev/null +++ b/raft/client.go @@ -0,0 +1,76 @@ +package raft + +import ( + serverPb "github.com/adylanrff/raft-algorithm/proto/server" + "github.com/adylanrff/raft-algorithm/raft/model" + "github.com/adylanrff/raft-algorithm/rpc" +) + +type RaftClient interface { + // TODO: add entries implementation + AppendEntries(address string, req *model.AppendEntriesRequestDTO) (resp *model.AppendEntriesResponseDTO, err error) + RequestVote(address string, req *model.RequestVoteRequestDTO) (resp *model.RequestVoteResponseDTO, err error) +} + +type defaultRaftClient struct { + rpcClient rpc.RPCClient +} + +// AppendEntries implements RaftClient +// TODO: implement entries +func (c *defaultRaftClient) AppendEntries(address string, req *model.AppendEntriesRequestDTO) (resp *model.AppendEntriesResponseDTO, err error) { + msg := &rpc.ServerMessageDTO{ + ServerMessage: &serverPb.ServerMessage{ + Method: RaftMethodName_AppendEntries, + Payload: &serverPb.ServerMessage_ServerRequest{ + ServerRequest: &serverPb.ServerRequest{ + Request: &serverPb.ServerRequest_AppendEntriesRequest{ + AppendEntriesRequest: req.AppendEntriesRequest, + }, + }, + }, + }, + } + + rsp, err := c.rpcClient.Call(address, msg) + if err != nil { + return nil, err + } + + appendEntriesResp := rsp.GetServerResponse().GetAppendEntriesResponse() + return &model.AppendEntriesResponseDTO{ + AppendEntriesResponse: appendEntriesResp, + }, nil +} + +// RequestVote implements RaftClient +func (c *defaultRaftClient) RequestVote(address string, req *model.RequestVoteRequestDTO) (resp *model.RequestVoteResponseDTO, err error) { + msg := &rpc.ServerMessageDTO{ + ServerMessage: &serverPb.ServerMessage{ + Method: RaftMethodName_RequestVotes, + Payload: &serverPb.ServerMessage_ServerRequest{ + ServerRequest: &serverPb.ServerRequest{ + Request: &serverPb.ServerRequest_RequestVoteRequest{ + RequestVoteRequest: req.RequestVoteRequest, + }, + }, + }, + }, + } + + rsp, err := c.rpcClient.Call(address, msg) + if err != nil { + return nil, err + } + + requestVoteRsp := rsp.GetServerResponse().GetRequestVoteResponse() + return &model.RequestVoteResponseDTO{ + RequestVoteResponse: requestVoteRsp, + }, nil +} + +func NewRaftClient(rpcClient rpc.RPCClient) RaftClient { + return &defaultRaftClient{ + rpcClient: rpcClient, + } +} diff --git a/raft/config.go b/raft/config.go new file mode 100644 index 0000000..b9be8a5 --- /dev/null +++ b/raft/config.go @@ -0,0 +1,11 @@ +package raft + +import "time" + +type RaftConfig struct { + ElectionTimeout time.Duration + IdleTimeout time.Duration + // ClusterMemberAddresses list all of the cluster member address + // take note to also include our own address in this + ClusterMemberAddreses []string +} diff --git a/raft/constants.go b/raft/constants.go new file mode 100644 index 0000000..16c289a --- /dev/null +++ b/raft/constants.go @@ -0,0 +1,7 @@ +package raft + +// RaftMethodName - method name for raft +const ( + RaftMethodName_RequestVotes = "raft.request_votes" + RaftMethodName_AppendEntries = "raft.append_entries" +) diff --git a/raft/model/append_entries.go b/raft/model/append_entries.go new file mode 100644 index 0000000..c9728f7 --- /dev/null +++ b/raft/model/append_entries.go @@ -0,0 +1,17 @@ +package model + +import raftPb "github.com/adylanrff/raft-algorithm/proto/raft" + +type AppendEntriesRequestDTO struct { + *raftPb.AppendEntriesRequest +} + +func NewAppendEntriesRequestDTO() *AppendEntriesRequestDTO { + return &AppendEntriesRequestDTO{ + AppendEntriesRequest: &raftPb.AppendEntriesRequest{}, + } +} + +type AppendEntriesResponseDTO struct { + *raftPb.AppendEntriesResponse +} diff --git a/raft/model/request_vote.go b/raft/model/request_vote.go new file mode 100644 index 0000000..caf7189 --- /dev/null +++ b/raft/model/request_vote.go @@ -0,0 +1,35 @@ +package model + +import raftPb "github.com/adylanrff/raft-algorithm/proto/raft" + +type RequestVoteRequestDTO struct { + *raftPb.RequestVoteRequest +} + +func NewRequestVoteRequestDTO( + term int32, + candidateID string, + lastLogIndex int32, + lastLogTerm int32) *RequestVoteRequestDTO { + return &RequestVoteRequestDTO{ + RequestVoteRequest: &raftPb.RequestVoteRequest{ + Term: term, + CandidateId: candidateID, + LastLogIndex: lastLogIndex, + LastLogTerm: int32(lastLogTerm), + }, + } +} + +type RequestVoteResponseDTO struct { + *raftPb.RequestVoteResponse +} + +func NewRequestVoteResponseDTO(term int32, voteGranted bool) *RequestVoteResponseDTO { + return &RequestVoteResponseDTO{ + RequestVoteResponse: &raftPb.RequestVoteResponse{ + Term: term, + VoteGranted: voteGranted, + }, + } +} diff --git a/raft/model/server_state.go b/raft/model/server_state.go new file mode 100644 index 0000000..0c0932c --- /dev/null +++ b/raft/model/server_state.go @@ -0,0 +1,90 @@ +package model + +import ( + "sync" + + log "github.com/sirupsen/logrus" +) + +// RaftState +type RaftState struct { + sync.Mutex + + // persistent state + ID string + CurrentTerm int32 + VotedFor string // CandidateID for the current term election + Logs []RaftLog + Role RaftRole + + // volatile state + CommitIndex int32 + LastApplied int32 + + // leaders only + NextIndex map[string]int + MatchIndex map[string]int +} + +func (s *RaftState) Vote(candidateID string) { + log.WithFields(log.Fields{ + "id": s.ID, + "vote_for": candidateID, + }).Info("voting done") + + s.VotedFor = candidateID +} + +func (s *RaftState) ChangeRole(raftRole RaftRole) { + log.WithFields(log.Fields{ + "role_before": s.Role.String(), + "role_after": raftRole.String(), + "id": s.ID, + }).Info("raft role changed") + + s.Role = raftRole +} + +func NewRaftState(id string) RaftState { + return RaftState{ + ID: id, + CurrentTerm: 0, + VotedFor: "", + Logs: make([]RaftLog, 0), + Role: RaftRoleFollower, + + CommitIndex: 0, + LastApplied: 0, + + // for leaders only, set to nil first + NextIndex: nil, + MatchIndex: nil, + } +} + +// RaftLog +type RaftLog struct { + Term int32 +} + +// RaftRole +type RaftRole int32 + +const ( + RaftRoleFollower RaftRole = iota + RaftRoleCandidate + RaftRoleLeader +) + +func (r RaftRole) String() string { + switch r { + case RaftRoleCandidate: + return "candidate" + case RaftRoleFollower: + return "follower" + case RaftRoleLeader: + return "leader" + } + + return "" +} diff --git a/raft/raft.go b/raft/raft.go new file mode 100644 index 0000000..4f0f031 --- /dev/null +++ b/raft/raft.go @@ -0,0 +1,242 @@ +package raft + +import ( + "math/rand" + "time" + + "github.com/adylanrff/raft-algorithm/raft/model" + "github.com/adylanrff/raft-algorithm/rpc" + log "github.com/sirupsen/logrus" +) + +type Raft interface { + RequestVote(req *model.RequestVoteRequestDTO) (*model.RequestVoteResponseDTO, error) + AppendEntries(req *model.AppendEntriesRequestDTO) (*model.AppendEntriesResponseDTO, error) +} + +type defaultRaft struct { + address string + state model.RaftState + config RaftConfig + + raftClient RaftClient + + receiveAppendEntriesSignal chan struct{} + sendAppendEntriesSignal chan struct{} +} + +func NewRaft(id string, address string, cfg RaftConfig) *defaultRaft { + electionTimeout := rand.Intn(int(cfg.ElectionTimeout*2)-int(cfg.ElectionTimeout)) + int(cfg.ElectionTimeout) + cfg.ElectionTimeout = time.Duration(electionTimeout) + + return &defaultRaft{ + address: address, + state: model.NewRaftState(id), + config: cfg, + raftClient: NewRaftClient(rpc.NewRPCClient()), + + receiveAppendEntriesSignal: make(chan struct{}), + sendAppendEntriesSignal: make(chan struct{}), + } +} + +// RequestVote implements RaftServerHandler +func (r *defaultRaft) RequestVote(req *model.RequestVoteRequestDTO) (*model.RequestVoteResponseDTO, error) { + r.state.Lock() + defer r.state.Unlock() + + log.WithFields(log.Fields{ + "id": r.state.ID, + "votedFor": r.state.VotedFor, + "req": req.RequestVoteRequest, + "method": "RequestVote", + }).Debug("request_vote_request_start") + + // reply false if the term is less than our current term + if req.GetTerm() < r.state.CurrentTerm { + log.WithFields(log.Fields{ + "candidate_term": req.GetTerm(), + "current_term": r.state.CurrentTerm, + }).Error("outdated term") + return model.NewRequestVoteResponseDTO(r.state.CurrentTerm, false), nil + } + + // Reply false also if we already voted and it is not the candidate ID + if r.state.VotedFor != req.GetCandidateId() && r.state.VotedFor != "" { + log.WithFields(log.Fields{ + "voted_for": r.state.VotedFor, + "candidate_id": req.GetCandidateId(), + }).Error("already voted before") + return model.NewRequestVoteResponseDTO(r.state.CurrentTerm, false), nil + } + + // Grant vote! + r.state.VotedFor = req.GetCandidateId() + resp := model.NewRequestVoteResponseDTO(r.state.CurrentTerm, true) + log.WithFields(log.Fields{ + "id": r.state.ID, + "votedFor": r.state.VotedFor, + "resp": resp, + "method": "RequestVote", + }).Debug("request_vote_request_end") + + return resp, nil +} + +// AppendEntries implements RaftServerHandler +func (r *defaultRaft) AppendEntries(req *model.AppendEntriesRequestDTO) (*model.AppendEntriesResponseDTO, error) { + log.WithFields(log.Fields{ + "req": req.AppendEntriesRequest, + "method": "AppendEntries", + }).Debug("append_entries_request_start") + + // received a heartbeat + r.receiveAppendEntriesSignal <- struct{}{} + return &model.AppendEntriesResponseDTO{}, nil +} + +func (r *defaultRaft) Run() error { + go r.watcher() + + for { + switch r.state.Role { + case model.RaftRoleFollower: + r.doFollowerAction() + case model.RaftRoleCandidate: + r.doCandidateAction() + case model.RaftRoleLeader: + r.doLeaderAction() + } + } +} + +func (r *defaultRaft) doFollowerAction() { + log.WithFields(log.Fields{}).Infof("do follower action start") + + for { + select { + case <-time.After(r.config.ElectionTimeout): + // be a candidate + // TODO: fix race condition possibilities, + // should not happen because the only thread that modifies the state.role is this thread + r.state.ChangeRole(model.RaftRoleCandidate) + return + case <-r.receiveAppendEntriesSignal: + log.Debug("append entries received") + } + } +} + +func (r *defaultRaft) doCandidateAction() { + log.WithFields(log.Fields{}).Infof("do candidate action startzzz") + + votes := 1 + majority := len(r.config.ClusterMemberAddreses) / 2 + + log.WithFields(log.Fields{}).Infof("do candidate action send election") + voteRespChan := r.doElection() + + for { + select { + case resp, ok := <-voteRespChan: + if ok && resp.VoteGranted { + votes++ + } + if votes > majority { + r.state.ChangeRole(model.RaftRoleLeader) + return + } + case <-time.After(r.config.ElectionTimeout): + log.WithFields(log.Fields{}).Infof("election timeout") + // do another election + return + case <-r.receiveAppendEntriesSignal: + log.WithFields(log.Fields{}).Infof("heartbeat received") + r.state.Lock() + r.state.ChangeRole(model.RaftRoleFollower) + r.state.VotedFor = "" + r.state.Unlock() + return + } + } +} + +func (r *defaultRaft) doElection() chan *model.RequestVoteResponseDTO { + log.WithFields(log.Fields{}).Infof("do election start") + + r.state.Lock() + defer r.state.Unlock() + + // 1. increment current term + r.state.CurrentTerm++ + // 2. vote for self + r.state.VotedFor = r.state.ID + + // Do voting + log.WithFields(log.Fields{}).Infof("do voting") + requestVoteReq := model.NewRequestVoteRequestDTO( + r.state.CurrentTerm, + r.state.ID, + int32(len(r.state.Logs)), + 0) + + voteRespChan := make(chan *model.RequestVoteResponseDTO, len(r.config.ClusterMemberAddreses)) + + for _, memberAddress := range r.config.ClusterMemberAddreses { + if memberAddress != r.address { + go func(memberAddress string) { + resp, err := r.raftClient.RequestVote(memberAddress, requestVoteReq) + if err != nil { + log.WithFields(log.Fields{ + "err": err, + }).Error("error requesting vote") + return + } + + log.WithFields(log.Fields{ + "id": r.state.ID, + "address": memberAddress, + "resp": resp, + }).Infof("do voting done") + voteRespChan <- resp + }(memberAddress) + } + } + + return voteRespChan +} + +func (r *defaultRaft) doLeaderAction() { + log.WithFields(log.Fields{}).Infof("do leader action start") + + for { + select { + case <-r.sendAppendEntriesSignal: + log.Debug("append entries sent") + case <-time.After(r.config.IdleTimeout): + log.Debug("idle timeout") + for _, memberAddress := range r.config.ClusterMemberAddreses { + if memberAddress != r.address { + go r.raftClient.AppendEntries(memberAddress, model.NewAppendEntriesRequestDTO()) + } + } + } + } +} + +// for debugging purposes +func (r *defaultRaft) watcher() { + ticker := time.NewTicker(time.Millisecond * 2000) + + for { + select { + case <-ticker.C: + log.WithFields(log.Fields{ + "id": r.state.ID, + "role": r.state.Role, + "term": r.state.CurrentTerm, + "voted_for": r.state.VotedFor, + }).Debug("raft node status") + } + } +} diff --git a/raft/server_handler.go b/raft/server_handler.go new file mode 100644 index 0000000..0564738 --- /dev/null +++ b/raft/server_handler.go @@ -0,0 +1,70 @@ +package raft + +import ( + raftModel "github.com/adylanrff/raft-algorithm/raft/model" + "github.com/adylanrff/raft-algorithm/rpc" + + serverPb "github.com/adylanrff/raft-algorithm/proto/server" +) + +// TODO: This sucks... do a better way to handle various request/response +// especially the response generation +// perhaps codegen would be a good way (just like what grpc do) + +type RaftServerHandler struct { + raftHandler Raft +} + +func NewRaftServerhandler(raftHandler Raft) *RaftServerHandler { + return &RaftServerHandler{ + raftHandler: raftHandler, + } +} + +func (h *RaftServerHandler) AppendEntriesHandler(req *rpc.ServerMessageDTO) (resp *rpc.ServerMessageDTO, err error) { + appendEntriesDTO := &raftModel.AppendEntriesRequestDTO{ + AppendEntriesRequest: req.GetServerRequest().GetAppendEntriesRequest(), + } + + appendEntriesResp, appendEntriesErr := h.raftHandler.AppendEntries(appendEntriesDTO) + + resp = &rpc.ServerMessageDTO{ + ServerMessage: &serverPb.ServerMessage{ + Method: req.GetMethod(), + Payload: &serverPb.ServerMessage_ServerResponse{ + ServerResponse: &serverPb.ServerResponse{ + Response: &serverPb.ServerResponse_AppendEntriesResponse{ + AppendEntriesResponse: appendEntriesResp.AppendEntriesResponse, + }, + }, + }, + }, + } + + err = appendEntriesErr + + return +} + +func (h *RaftServerHandler) RequestVoteHandler(req *rpc.ServerMessageDTO) (resp *rpc.ServerMessageDTO, err error) { + requestVoteDTO := &raftModel.RequestVoteRequestDTO{ + RequestVoteRequest: req.GetServerRequest().GetRequestVoteRequest(), + } + + requestVoteResp, requestVoteErr := h.raftHandler.RequestVote(requestVoteDTO) + resp = &rpc.ServerMessageDTO{ + ServerMessage: &serverPb.ServerMessage{ + Method: req.GetMethod(), + Payload: &serverPb.ServerMessage_ServerResponse{ + ServerResponse: &serverPb.ServerResponse{ + Response: &serverPb.ServerResponse_RequestVoteResponse{ + RequestVoteResponse: requestVoteResp.RequestVoteResponse, + }, + }, + }, + }, + } + err = requestVoteErr + + return +} diff --git a/raft/util.go b/raft/util.go new file mode 100644 index 0000000..497357e --- /dev/null +++ b/raft/util.go @@ -0,0 +1,7 @@ +package raft + +func GetRaftServerIDFromAddress(address string) string { + // TODO: Implement some other server id / hash alg to generate server id + // for now just use the address + return address +} diff --git a/rpc/client.go b/rpc/client.go new file mode 100644 index 0000000..588257e --- /dev/null +++ b/rpc/client.go @@ -0,0 +1,42 @@ +package rpc + +import ( + "net" +) + +type RPCClient interface { + Call(address string, payload *ServerMessageDTO) (resp *ServerMessageDTO, err error) +} + +type defaultRpcClient struct { +} + +// Call implements RPCClient +func (*defaultRpcClient) Call(address string, payload *ServerMessageDTO) (resp *ServerMessageDTO, err error) { + conn, err := net.Dial("tcp", address) + if err != nil { + return nil, err + } + defer conn.Close() + + byteSrvMsg, err := payload.ToBytes() + if err != nil { + return nil, err + } + + _, err = conn.Write(byteSrvMsg) + if err != nil { + return nil, err + } + + resp, err = ParseServerMessage(conn) + if err != nil { + return nil, err + } + + return +} + +func NewRPCClient() RPCClient { + return &defaultRpcClient{} +} diff --git a/rpc/parser.go b/rpc/parser.go new file mode 100644 index 0000000..2440402 --- /dev/null +++ b/rpc/parser.go @@ -0,0 +1,69 @@ +package rpc + +import ( + "errors" + "io" + + serverPb "github.com/adylanrff/raft-algorithm/proto/server" + log "github.com/sirupsen/logrus" + "google.golang.org/protobuf/proto" +) + +func ParseServerMessage(reader io.Reader) (*ServerMessageDTO, error) { + byteSize, err := readByteSize(reader) + if err != nil { + return nil, err + } + + return readServerMessage(reader, byteSize) +} + +func readByteSize(reader io.Reader) (int, error) { + byteSizeBuf := make([]byte, 1) + n, err := reader.Read(byteSizeBuf) + if err != nil { + if err != io.EOF { + log.WithFields(log.Fields{ + "n": n, + }).Error("read error") + } + + return 0, err + } + + byteSize := int(byteSizeBuf[0]) + return byteSize, nil +} + +func readServerMessage(reader io.Reader, byteSize int) (*ServerMessageDTO, error) { + messageBuf := make([]byte, byteSize) + n, err := reader.Read(messageBuf) + if err != nil { + if err != io.EOF { + log.WithFields(log.Fields{ + "n": n, + }).Error("read error") + } + + return nil, err + } + + if n != byteSize { + log.WithFields(log.Fields{ + "n": n, + "header_byte_size": byteSize, + }).Error("byte size differs from header size") + return nil, errors.New("unexpected byte size") + } + + var serverMessage serverPb.ServerMessage + + err = proto.Unmarshal(messageBuf[:n], &serverMessage) + if err != nil { + return nil, err + } + + return &ServerMessageDTO{ + ServerMessage: &serverMessage, + }, nil +} diff --git a/rpc/server.go b/rpc/server.go new file mode 100644 index 0000000..f02d9b9 --- /dev/null +++ b/rpc/server.go @@ -0,0 +1,129 @@ +package rpc + +import ( + "errors" + "io" + "net" + + log "github.com/sirupsen/logrus" +) + +type Handler func(req *ServerMessageDTO) (resp *ServerMessageDTO, err error) + +// Server - implements the raft server +type Server struct { + address string + Handlers map[string]Handler + + Parse func(reader io.Reader) (req *ServerMessageDTO, err error) +} + +func NewServer(address string) *Server { + return &Server{ + address: address, + Parse: ParseServerMessage, + Handlers: make(map[string]Handler), + } +} + +func (s *Server) Run() { + srv, err := net.Listen("tcp", s.address) + if err != nil { + panic(err) + } + defer srv.Close() + + log.WithFields(log.Fields{ + "address": s.address, + }).Info("running server") + + for { + conn, err := srv.Accept() + if err != nil { + log.WithFields(log.Fields{"err": err}).Error("connection eror") + continue + } + + // TODO: add workercount limiter + go s.handleConn(conn) + } +} + +func (s *Server) AddHandler(method string, handler Handler) { + s.Handlers[method] = handler +} + +func (s *Server) handleConn(conn net.Conn) error { + var ( + err error + msg *ServerMessageDTO + ) + + defer func() { + // TODO: use better way, create ErrorDTO + if err != nil { + errorServerMessageDTO := NewErrorMessageDTO(-1, err.Error()) + errorServerMessageBytes, convErr := errorServerMessageDTO.ToBytes() + if convErr != nil { + log.WithFields( + log.Fields{ + "errorDTO": errorServerMessageDTO, + "err": convErr, + }).Error("convert error") + + errBytes := []byte(convErr.Error()) + msg := append([]byte{byte(len(errBytes))}, errBytes...) + _, err = conn.Write(msg) + if err != nil { + log.WithFields( + log.Fields{ + "errorDTO": errorServerMessageDTO, + "err": convErr, + }).Error("write error") + } + return + } + conn.Write(errorServerMessageBytes) + } + }() + + msg, err = s.Parse(conn) + if err != nil { + log.WithFields(log.Fields{"err": err}).Error("parse message error") + return err + } + + resp, err := s.handleMsg(msg) + if err != nil { + log.WithFields(log.Fields{"err": err}).Error("handle error") + return err + } + + respByte, err := resp.ToBytes() + if err != nil { + log.WithFields(log.Fields{"err": err}).Error("handle error") + return err + } + + var ( + n int + ) + n, err = conn.Write(respByte) + log.WithFields(log.Fields{ + "err": err, + "bytes_written": n, + "resp": resp, + }).Debug("write response") + + return err +} + +func (s *Server) handleMsg(msg *ServerMessageDTO) (resp *ServerMessageDTO, err error) { + method := msg.GetMethod() + handler, ok := s.Handlers[method] + if !ok { + return nil, errors.New("unrecognized methods") + } + + return handler(msg) +} diff --git a/rpc/server_message.go b/rpc/server_message.go new file mode 100644 index 0000000..ce0d5c6 --- /dev/null +++ b/rpc/server_message.go @@ -0,0 +1,44 @@ +package rpc + +import ( + serverPb "github.com/adylanrff/raft-algorithm/proto/server" + "google.golang.org/protobuf/proto" +) + +type ServerMessageDTO struct { + *serverPb.ServerMessage +} + +func (dto *ServerMessageDTO) ToBytes() ([]byte, error) { + marshalledMessage, err := proto.Marshal(dto.ServerMessage) + if err != nil { + return nil, err + } + + return append([]byte{byte(len(marshalledMessage))}, marshalledMessage...), nil +} + +type ErrorServerMessageDTO struct { + *ServerMessageDTO +} + +func NewErrorMessageDTO(errorCode int32, errorMsg string) *ErrorServerMessageDTO { + errorResponse := &serverPb.ErrorResponse{ + ErrorCode: errorCode, + ErrorMsg: errorMsg, + } + + return &ErrorServerMessageDTO{ + ServerMessageDTO: &ServerMessageDTO{ + ServerMessage: &serverPb.ServerMessage{ + Payload: &serverPb.ServerMessage_ServerResponse{ + ServerResponse: &serverPb.ServerResponse{ + Response: &serverPb.ServerResponse_ErrorResponse{ + ErrorResponse: errorResponse, + }, + }, + }, + }, + }, + } +} diff --git a/util/logger.go b/util/logger.go new file mode 100644 index 0000000..6437087 --- /dev/null +++ b/util/logger.go @@ -0,0 +1,18 @@ +package util + +import ( + "os" + + log "github.com/sirupsen/logrus" +) + +func InitLogger(logPath string) { + log.SetFormatter(&log.TextFormatter{}) + log.SetLevel(log.DebugLevel) + + f, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0755) + if err != nil { + panic(err) + } + log.SetOutput(f) +}