Skip to content
This repository has been archived by the owner on Sep 6, 2018. It is now read-only.

Commit

Permalink
Merge pull request #151 from xiangli-cmu/refactor_proto
Browse files Browse the repository at this point in the history
WIP Refactor proto
  • Loading branch information
benbjohnson committed Jan 23, 2014
2 parents 639ffa3 + 353d5f3 commit ad05eb4
Show file tree
Hide file tree
Showing 32 changed files with 4,971 additions and 349 deletions.
52 changes: 18 additions & 34 deletions append_entries_request.go
@@ -1,10 +1,11 @@
package raft

import (
"code.google.com/p/goprotobuf/proto"
"github.com/goraft/raft/protobuf"
"io"
"io/ioutil"

"code.google.com/p/gogoprotobuf/proto"
"github.com/goraft/raft/protobuf"
)

// The request sent to a server to append entries to the log.
Expand All @@ -14,43 +15,38 @@ type AppendEntriesRequest struct {
PrevLogTerm uint64
CommitIndex uint64
LeaderName string
Entries []*LogEntry
Entries []*protobuf.LogEntry
}

// Creates a new AppendEntries request.
func newAppendEntriesRequest(term uint64, prevLogIndex uint64, prevLogTerm uint64, commitIndex uint64, leaderName string, entries []*LogEntry) *AppendEntriesRequest {
func newAppendEntriesRequest(term uint64, prevLogIndex uint64, prevLogTerm uint64,
commitIndex uint64, leaderName string, entries []*LogEntry) *AppendEntriesRequest {
pbEntries := make([]*protobuf.LogEntry, len(entries))

for i := range entries {
pbEntries[i] = entries[i].pb
}

return &AppendEntriesRequest{
Term: term,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
CommitIndex: commitIndex,
LeaderName: leaderName,
Entries: entries,
Entries: pbEntries,
}
}

// Encodes the AppendEntriesRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *AppendEntriesRequest) Encode(w io.Writer) (int, error) {

protoEntries := make([]*protobuf.ProtoAppendEntriesRequest_ProtoLogEntry, len(req.Entries))

for i, entry := range req.Entries {
protoEntries[i] = &protobuf.ProtoAppendEntriesRequest_ProtoLogEntry{
Index: proto.Uint64(entry.Index),
Term: proto.Uint64(entry.Term),
CommandName: proto.String(entry.CommandName),
Command: entry.Command,
}
}

pb := &protobuf.ProtoAppendEntriesRequest{
pb := &protobuf.AppendEntriesRequest{
Term: proto.Uint64(req.Term),
PrevLogIndex: proto.Uint64(req.PrevLogIndex),
PrevLogTerm: proto.Uint64(req.PrevLogTerm),
CommitIndex: proto.Uint64(req.CommitIndex),
LeaderName: proto.String(req.LeaderName),
Entries: protoEntries,
Entries: req.Entries,
}

p, err := proto.Marshal(pb)
Expand All @@ -70,9 +66,7 @@ func (req *AppendEntriesRequest) Decode(r io.Reader) (int, error) {
return -1, err
}

totalBytes := len(data)

pb := &protobuf.ProtoAppendEntriesRequest{}
pb := new(protobuf.AppendEntriesRequest)
if err := proto.Unmarshal(data, pb); err != nil {
return -1, err
}
Expand All @@ -82,17 +76,7 @@ func (req *AppendEntriesRequest) Decode(r io.Reader) (int, error) {
req.PrevLogTerm = pb.GetPrevLogTerm()
req.CommitIndex = pb.GetCommitIndex()
req.LeaderName = pb.GetLeaderName()
req.Entries = pb.GetEntries()

req.Entries = make([]*LogEntry, len(pb.Entries))

for i, entry := range pb.Entries {
req.Entries[i] = &LogEntry{
Index: entry.GetIndex(),
Term: entry.GetTerm(),
CommandName: entry.GetCommandName(),
Command: entry.Command,
}
}

return totalBytes, nil
return len(data), nil
}
67 changes: 35 additions & 32 deletions append_entries_response.go
@@ -1,70 +1,73 @@
package raft

import (
"code.google.com/p/goprotobuf/proto"
"github.com/goraft/raft/protobuf"
"io"
"io/ioutil"

"code.google.com/p/gogoprotobuf/proto"
"github.com/goraft/raft/protobuf"
)

// The response returned from a server appending entries to the log.
type AppendEntriesResponse struct {
Term uint64
// the current index of the server
Index uint64
Success bool
CommitIndex uint64
peer string
append bool
pb *protobuf.AppendEntriesResponse
peer string
append bool
}

// Creates a new AppendEntries response.
func newAppendEntriesResponse(term uint64, success bool, index uint64, commitIndex uint64) *AppendEntriesResponse {
pb := &protobuf.AppendEntriesResponse{
Term: proto.Uint64(term),
Index: proto.Uint64(index),
Success: proto.Bool(success),
CommitIndex: proto.Uint64(commitIndex),
}

return &AppendEntriesResponse{
Term: term,
Success: success,
Index: index,
CommitIndex: commitIndex,
pb: pb,
}
}

func (aer *AppendEntriesResponse) Index() uint64 {
return aer.pb.GetIndex()
}

func (aer *AppendEntriesResponse) CommitIndex() uint64 {
return aer.pb.GetCommitIndex()
}

func (aer *AppendEntriesResponse) Term() uint64 {
return aer.pb.GetTerm()
}

func (aer *AppendEntriesResponse) Success() bool {
return aer.pb.GetSuccess()
}

// Encodes the AppendEntriesResponse to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (resp *AppendEntriesResponse) Encode(w io.Writer) (int, error) {
pb := &protobuf.ProtoAppendEntriesResponse{
Term: proto.Uint64(resp.Term),
Index: proto.Uint64(resp.Index),
CommitIndex: proto.Uint64(resp.CommitIndex),
Success: proto.Bool(resp.Success),
}
p, err := proto.Marshal(pb)
b, err := proto.Marshal(resp.pb)
if err != nil {
return -1, err
}

return w.Write(p)
return w.Write(b)
}

// Decodes the AppendEntriesResponse from a buffer. Returns the number of bytes read and
// any error that occurs.
func (resp *AppendEntriesResponse) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)

if err != nil {
return -1, err
}

totalBytes := len(data)

pb := &protobuf.ProtoAppendEntriesResponse{}
if err := proto.Unmarshal(data, pb); err != nil {
resp.pb = new(protobuf.AppendEntriesResponse)
if err := proto.Unmarshal(data, resp.pb); err != nil {
return -1, err
}

resp.Term = pb.GetTerm()
resp.Index = pb.GetIndex()
resp.CommitIndex = pb.GetCommitIndex()
resp.Success = pb.GetSuccess()

return totalBytes, nil
return len(data), nil
}

0 comments on commit ad05eb4

Please sign in to comment.