Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

handle Snapshot and SnapshotRecovery requests in the http transporter

  • Loading branch information...
commit 4157c675e0b6d92461823fcbeb631489910d7ac5 1 parent 2c0c8f0
John Shahid jvshahid authored
Showing with 123 additions and 14 deletions.
  1. +123 −14 http_transporter.go
137 http_transporter.go
View
@@ -5,6 +5,8 @@ import (
"fmt"
"io"
"net/http"
+ "net/url"
+ "path"
)
// Parts from this transporter were heavily influenced by Peter Bougon's
@@ -19,12 +21,14 @@ import (
// An HTTPTransporter is a default transport layer used to communicate between
// multiple servers.
type HTTPTransporter struct {
- DisableKeepAlives bool
- prefix string
- appendEntriesPath string
- requestVotePath string
- httpClient http.Client
- Transport *http.Transport
+ DisableKeepAlives bool
+ prefix string
+ appendEntriesPath string
+ requestVotePath string
+ snapshotPath string
+ snapshotRecoveryPath string
+ httpClient http.Client
+ Transport *http.Transport
}
type HTTPMuxer interface {
@@ -40,11 +44,13 @@ type HTTPMuxer interface {
// Creates a new HTTP transporter with the given path prefix.
func NewHTTPTransporter(prefix string) *HTTPTransporter {
t := &HTTPTransporter{
- DisableKeepAlives: false,
- prefix: prefix,
- appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"),
- requestVotePath: fmt.Sprintf("%s%s", prefix, "/requestVote"),
- Transport: &http.Transport{DisableKeepAlives: false},
+ DisableKeepAlives: false,
+ prefix: prefix,
+ appendEntriesPath: joinPath(prefix, "/appendEntries"),
+ requestVotePath: joinPath(prefix, "/requestVote"),
+ snapshotPath: joinPath(prefix, "/snapshot"),
+ snapshotRecoveryPath: joinPath(prefix, "/snapshotRecovery"),
+ Transport: &http.Transport{DisableKeepAlives: false},
}
t.httpClient.Transport = t.Transport
return t
@@ -71,6 +77,16 @@ func (t *HTTPTransporter) RequestVotePath() string {
return t.requestVotePath
}
+// Retrieves the Snapshot path.
+func (t *HTTPTransporter) SnapshotPath() string {
+ return t.snapshotPath
+}
+
+// Retrieves the SnapshotRecovery path.
+func (t *HTTPTransporter) SnapshotRecoveryPath() string {
+ return t.snapshotRecoveryPath
+}
+
//------------------------------------------------------------------------------
//
// Methods
@@ -85,6 +101,8 @@ func (t *HTTPTransporter) RequestVotePath() string {
func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer) {
mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server))
mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server))
+ mux.HandleFunc(t.SnapshotPath(), t.snapshotHandler(server))
+ mux.HandleFunc(t.SnapshotRecoveryPath(), t.snapshotRecoveryHandler(server))
}
//--------------------------------------
@@ -99,7 +117,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, re
return nil
}
- url := fmt.Sprintf("%s%s", peer.ConnectionString, t.AppendEntriesPath())
+ url := joinPath(peer.ConnectionString, t.AppendEntriesPath())
traceln(server.Name(), "POST", url)
t.Transport.ResponseHeaderTimeout = server.ElectionTimeout()
@@ -146,14 +164,67 @@ func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *Reques
return resp
}
+func joinPath(connectionString, thePath string) string {
+ u, err := url.Parse(connectionString)
+ if err != nil {
+ panic(err)
+ }
+ u.Path = path.Join(u.Path, thePath)
+ return u.String()
+}
+
// Sends a SnapshotRequest RPC to a peer.
func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
- return nil
+ var b bytes.Buffer
+ if _, err := req.Encode(&b); err != nil {
+ traceln("transporter.rv.encoding.error:", err)
+ return nil
+ }
+
+ url := joinPath(peer.ConnectionString, t.snapshotPath)
+ traceln(server.Name(), "POST", url)
+
+ httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
+ if httpResp == nil || err != nil {
+ traceln("transporter.rv.response.error:", err)
+ return nil
+ }
+ defer httpResp.Body.Close()
+
+ resp := &SnapshotResponse{}
+ if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
+ traceln("transporter.rv.decoding.error:", err)
+ return nil
+ }
+
+ return resp
}
// Sends a SnapshotRequest RPC to a peer.
func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
- return nil
+ var b bytes.Buffer
+ if _, err := req.Encode(&b); err != nil {
+ traceln("transporter.rv.encoding.error:", err)
+ return nil
+ }
+
+ url := joinPath(peer.ConnectionString, t.snapshotRecoveryPath)
+ traceln(server.Name(), "POST", url)
+
+ httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
+ if httpResp == nil || err != nil {
+ traceln("transporter.rv.response.error:", err)
+ return nil
+ }
+ defer httpResp.Body.Close()
+
+ resp := &SnapshotRecoveryResponse{}
+ if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
+ traceln("transporter.rv.decoding.error:", err)
+ return nil
+ }
+
+ return resp
}
//--------------------------------------
@@ -197,3 +268,41 @@ func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc {
}
}
}
+
+// Handles incoming Snapshot requests.
+func (t *HTTPTransporter) snapshotHandler(server Server) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ traceln(server.Name(), "RECV /snapshot")
+
+ req := &SnapshotRequest{}
+ if _, err := req.Decode(r.Body); err != nil {
+ http.Error(w, "", http.StatusBadRequest)
+ return
+ }
+
+ resp := server.RequestSnapshot(req)
+ if _, err := resp.Encode(w); err != nil {
+ http.Error(w, "", http.StatusInternalServerError)
+ return
+ }
+ }
+}
+
+// Handles incoming SnapshotRecovery requests.
+func (t *HTTPTransporter) snapshotRecoveryHandler(server Server) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ traceln(server.Name(), "RECV /snapshotRecovery")
+
+ req := &SnapshotRecoveryRequest{}
+ if _, err := req.Decode(r.Body); err != nil {
+ http.Error(w, "", http.StatusBadRequest)
+ return
+ }
+
+ resp := server.SnapshotRecoveryRequest(req)
+ if _, err := resp.Encode(w); err != nil {
+ http.Error(w, "", http.StatusInternalServerError)
+ return
+ }
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.