/
httpapi.go
91 lines (82 loc) · 2.34 KB
/
httpapi.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package raft
import (
"context"
"io/ioutil"
"net/http"
"strconv"
"fmt"
"github.com/coreos/etcd/raft/raftpb"
)
// Handler for a http based httpRaftAPI backed by raft
type httpRaftAPI struct {
confChangeC chan<- raftpb.ConfChange
}
func (h *httpRaftAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
key := r.RequestURI
switch {
case r.Method == "POST":
url, err := ioutil.ReadAll(r.Body)
if err != nil {
rlog.Error(fmt.Sprintf("Failed to convert ID for conf change (%v)", err.Error()))
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}
nodeID, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {
rlog.Error(fmt.Sprintf("Failed to convert ID for conf change (%v)", err.Error()))
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: nodeID,
Context: url,
}
h.confChangeC <- cc
// As above, optimistic that raft will apply the conf change
w.WriteHeader(http.StatusCreated)
case r.Method == "DELETE":
nodeID, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {
rlog.Error(fmt.Sprintf("Failed to convert ID for conf change (%v)", err.Error()))
http.Error(w, "Failed on DELETE", http.StatusBadRequest)
return
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: nodeID,
}
h.confChangeC <- cc
// As above, optimistic that raft will apply the conf change
w.WriteHeader(http.StatusAccepted)
default:
w.Header().Add("Allow", "POST")
w.Header().Add("Allow", "DELETE")
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
func serveHTTPRaftAPI(ctx context.Context, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
srv := &http.Server{
Addr: "localhost:" + strconv.Itoa(port),
Handler: &httpRaftAPI{
confChangeC: confChangeC,
},
}
go func() {
if err := srv.ListenAndServe(); err != nil {
rlog.Error(fmt.Sprintf("ListenAndServe have a err: (%v)", err.Error()))
}
}()
select {
case <-ctx.Done():
srv.Close()
case err := <-errorC:
srv.Close()
if err != nil {
rlog.Error(fmt.Sprintf("the errorC chan receive a err (%v)\n", err.Error()))
}
}
}