Skip to content
This repository has been archived by the owner on Feb 11, 2021. It is now read-only.

Commit

Permalink
Server interface.
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Oct 14, 2013
1 parent e138889 commit 8f6f82f
Show file tree
Hide file tree
Showing 11 changed files with 273 additions and 245 deletions.
2 changes: 1 addition & 1 deletion command.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func init() {
// A command represents an action to be taken on the replicated state machine.
type Command interface {
CommandName() string
Apply(server *Server) (interface{}, error)
Apply(server Server) (interface{}, error)
}

type CommandEncoder interface {
Expand Down
14 changes: 7 additions & 7 deletions http_transporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (t *HTTPTransporter) RequestVotePath() string {
//--------------------------------------

// Applies Raft routes to an HTTP router for a given server.
func (t *HTTPTransporter) Install(server *Server, mux HTTPMuxer) {
func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer) {
mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server))
mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server))
}
Expand All @@ -87,7 +87,7 @@ func (t *HTTPTransporter) Install(server *Server, mux HTTPMuxer) {
//--------------------------------------

// Sends an AppendEntries RPC to a peer.
func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
var b bytes.Buffer
if _, err := req.Encode(&b); err != nil {
traceln("transporter.ae.encoding.error:", err)
Expand Down Expand Up @@ -115,7 +115,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r
}

// Sends a RequestVote RPC to a peer.
func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
var b bytes.Buffer
if _, err := req.Encode(&b); err != nil {
traceln("transporter.rv.encoding.error:", err)
Expand Down Expand Up @@ -143,12 +143,12 @@ func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *Reque
}

// Sends a SnapshotRequest RPC to a peer.
func (t *HTTPTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
return nil
}

// Sends a SnapshotRequest RPC to a peer.
func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server *Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
return nil
}

Expand All @@ -157,7 +157,7 @@ func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server *Server, peer *Peer
//--------------------------------------

// Handles incoming AppendEntries requests.
func (t *HTTPTransporter) appendEntriesHandler(server *Server) http.HandlerFunc {
func (t *HTTPTransporter) appendEntriesHandler(server Server) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
traceln(server.Name(), "RECV /appendEntries")

Expand All @@ -176,7 +176,7 @@ func (t *HTTPTransporter) appendEntriesHandler(server *Server) http.HandlerFunc
}

// Handles incoming RequestVote requests.
func (t *HTTPTransporter) requestVoteHandler(server *Server) http.HandlerFunc {
func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
traceln(server.Name(), "RECV /requestVote")

Expand Down
14 changes: 7 additions & 7 deletions http_transporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ func TestHTTPTransporter(t *testing.T) {
transporter := NewHTTPTransporter("/raft")
transporter.DisableKeepAlives = true

servers := []*Server{}
f0 := func(server *Server, httpServer *http.Server) {
servers := []Server{}
f0 := func(server Server, httpServer *http.Server) {
// Stop the leader and wait for an election.
server.Stop()
time.Sleep(testElectionTimeout * 2)
Expand All @@ -25,15 +25,15 @@ func TestHTTPTransporter(t *testing.T) {
}
server.Start()
}
f1 := func(server *Server, httpServer *http.Server) {
f1 := func(server Server, httpServer *http.Server) {
}
f2 := func(server *Server, httpServer *http.Server) {
f2 := func(server Server, httpServer *http.Server) {
}
runTestHttpServers(t, &servers, transporter, f0, f1, f2)
}

// Starts multiple independent Raft servers wrapped with HTTP servers.
func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTransporter, callbacks ...func(*Server, *http.Server)) {
func runTestHttpServers(t *testing.T, servers *[]Server, transporter *HTTPTransporter, callbacks ...func(Server, *http.Server)) {
var wg sync.WaitGroup
httpServers := []*http.Server{}
listeners := []net.Listener{}
Expand Down Expand Up @@ -94,7 +94,7 @@ func BenchmarkSpeed(b *testing.B) {
transporter := NewHTTPTransporter("/raft")
transporter.DisableKeepAlives = true

servers := []*Server{}
servers := []Server{}

for i := 0; i < 3; i++ {
port := 9000 + i
Expand Down Expand Up @@ -145,7 +145,7 @@ func BenchmarkSpeed(b *testing.B) {
}
}

func send(c chan bool, s *Server) {
func send(c chan bool, s Server) {
for i := 0; i < 20; i++ {
s.Do(&NOPCommand{})
}
Expand Down
4 changes: 2 additions & 2 deletions join_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package raft
// Join command interface
type JoinCommand interface {
CommandName() string
Apply(server *Server) (interface{}, error)
Apply(server Server) (interface{}, error)
NodeName() string
}

Expand All @@ -18,7 +18,7 @@ func (c *DefaultJoinCommand) CommandName() string {
return "raft:join"
}

func (c *DefaultJoinCommand) Apply(server *Server) (interface{}, error) {
func (c *DefaultJoinCommand) Apply(server Server) (interface{}, error) {
err := server.AddPeer(c.Name, c.ConnectionString)

return []byte("join"), err
Expand Down
4 changes: 2 additions & 2 deletions leave_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package raft
// Leave command interface
type LeaveCommand interface {
CommandName() string
Apply(server *Server) (interface{}, error)
Apply(server Server) (interface{}, error)
NodeName() string
}

Expand All @@ -17,7 +17,7 @@ func (c *DefaultLeaveCommand) CommandName() string {
return "raft:leave"
}

func (c *DefaultLeaveCommand) Apply(server *Server) (interface{}, error) {
func (c *DefaultLeaveCommand) Apply(server Server) (interface{}, error) {
err := server.RemovePeer(c.Name)

return []byte("leave"), err
Expand Down
2 changes: 1 addition & 1 deletion nop_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func (c NOPCommand) CommandName() string {
return "raft:nop"
}

func (c NOPCommand) Apply(server *Server) (interface{}, error) {
func (c NOPCommand) Apply(server Server) (interface{}, error) {
return nil, nil
}

Expand Down
4 changes: 2 additions & 2 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
server *Server
server *server
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
prevLogIndex uint64
Expand All @@ -29,7 +29,7 @@ type Peer struct {
//------------------------------------------------------------------------------

// Creates a new peer.
func newPeer(server *Server, name string, connectionString string, heartbeatTimeout time.Duration) *Peer {
func newPeer(server *server, name string, connectionString string, heartbeatTimeout time.Duration) *Peer {
return &Peer{
server: server,
Name: name,
Expand Down
Loading

0 comments on commit 8f6f82f

Please sign in to comment.