Skip to content
This repository has been archived by the owner on Sep 6, 2018. It is now read-only.

Commit

Permalink
Merge pull request #154 from influxdb/http-transport-snapshot-support
Browse files Browse the repository at this point in the history
handle Snapshot and SnapshotRecovery requests in the http transporter
  • Loading branch information
benbjohnson committed Jan 19, 2014
2 parents 2c0c8f0 + 4157c67 commit 175a2df
Showing 1 changed file with 123 additions and 14 deletions.
137 changes: 123 additions & 14 deletions http_transporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"path"
)

// Parts from this transporter were heavily influenced by Peter Bougon's
Expand All @@ -19,12 +21,14 @@ import (
// An HTTPTransporter is a default transport layer used to communicate between
// multiple servers.
type HTTPTransporter struct {
DisableKeepAlives bool
prefix string
appendEntriesPath string
requestVotePath string
httpClient http.Client
Transport *http.Transport
DisableKeepAlives bool
prefix string
appendEntriesPath string
requestVotePath string
snapshotPath string
snapshotRecoveryPath string
httpClient http.Client
Transport *http.Transport
}

type HTTPMuxer interface {
Expand All @@ -40,11 +44,13 @@ type HTTPMuxer interface {
// Creates a new HTTP transporter with the given path prefix.
func NewHTTPTransporter(prefix string) *HTTPTransporter {
t := &HTTPTransporter{
DisableKeepAlives: false,
prefix: prefix,
appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"),
requestVotePath: fmt.Sprintf("%s%s", prefix, "/requestVote"),
Transport: &http.Transport{DisableKeepAlives: false},
DisableKeepAlives: false,
prefix: prefix,
appendEntriesPath: joinPath(prefix, "/appendEntries"),
requestVotePath: joinPath(prefix, "/requestVote"),
snapshotPath: joinPath(prefix, "/snapshot"),
snapshotRecoveryPath: joinPath(prefix, "/snapshotRecovery"),
Transport: &http.Transport{DisableKeepAlives: false},
}
t.httpClient.Transport = t.Transport
return t
Expand All @@ -71,6 +77,16 @@ func (t *HTTPTransporter) RequestVotePath() string {
return t.requestVotePath
}

// Retrieves the Snapshot path.
func (t *HTTPTransporter) SnapshotPath() string {
return t.snapshotPath
}

// Retrieves the SnapshotRecovery path.
func (t *HTTPTransporter) SnapshotRecoveryPath() string {
return t.snapshotRecoveryPath
}

//------------------------------------------------------------------------------
//
// Methods
Expand All @@ -85,6 +101,8 @@ func (t *HTTPTransporter) RequestVotePath() string {
func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer) {
mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server))
mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server))
mux.HandleFunc(t.SnapshotPath(), t.snapshotHandler(server))
mux.HandleFunc(t.SnapshotRecoveryPath(), t.snapshotRecoveryHandler(server))
}

//--------------------------------------
Expand All @@ -99,7 +117,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, re
return nil
}

url := fmt.Sprintf("%s%s", peer.ConnectionString, t.AppendEntriesPath())
url := joinPath(peer.ConnectionString, t.AppendEntriesPath())
traceln(server.Name(), "POST", url)

t.Transport.ResponseHeaderTimeout = server.ElectionTimeout()
Expand Down Expand Up @@ -146,14 +164,67 @@ func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *Reques
return resp
}

func joinPath(connectionString, thePath string) string {
u, err := url.Parse(connectionString)
if err != nil {
panic(err)
}
u.Path = path.Join(u.Path, thePath)
return u.String()
}

// Sends a SnapshotRequest RPC to a peer.
func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
return nil
var b bytes.Buffer
if _, err := req.Encode(&b); err != nil {
traceln("transporter.rv.encoding.error:", err)
return nil
}

url := joinPath(peer.ConnectionString, t.snapshotPath)
traceln(server.Name(), "POST", url)

httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
if httpResp == nil || err != nil {
traceln("transporter.rv.response.error:", err)
return nil
}
defer httpResp.Body.Close()

resp := &SnapshotResponse{}
if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
traceln("transporter.rv.decoding.error:", err)
return nil
}

return resp
}

// Sends a SnapshotRequest RPC to a peer.
func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
return nil
var b bytes.Buffer
if _, err := req.Encode(&b); err != nil {
traceln("transporter.rv.encoding.error:", err)
return nil
}

url := joinPath(peer.ConnectionString, t.snapshotRecoveryPath)
traceln(server.Name(), "POST", url)

httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
if httpResp == nil || err != nil {
traceln("transporter.rv.response.error:", err)
return nil
}
defer httpResp.Body.Close()

resp := &SnapshotRecoveryResponse{}
if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
traceln("transporter.rv.decoding.error:", err)
return nil
}

return resp
}

//--------------------------------------
Expand Down Expand Up @@ -197,3 +268,41 @@ func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc {
}
}
}

// Handles incoming Snapshot requests.
func (t *HTTPTransporter) snapshotHandler(server Server) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
traceln(server.Name(), "RECV /snapshot")

req := &SnapshotRequest{}
if _, err := req.Decode(r.Body); err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}

resp := server.RequestSnapshot(req)
if _, err := resp.Encode(w); err != nil {
http.Error(w, "", http.StatusInternalServerError)
return
}
}
}

// Handles incoming SnapshotRecovery requests.
func (t *HTTPTransporter) snapshotRecoveryHandler(server Server) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
traceln(server.Name(), "RECV /snapshotRecovery")

req := &SnapshotRecoveryRequest{}
if _, err := req.Decode(r.Body); err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}

resp := server.SnapshotRecoveryRequest(req)
if _, err := resp.Encode(w); err != nil {
http.Error(w, "", http.StatusInternalServerError)
return
}
}
}

0 comments on commit 175a2df

Please sign in to comment.