-
Notifications
You must be signed in to change notification settings - Fork 1
/
agent.go
223 lines (185 loc) · 5.02 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package main
import (
"crypto/tls"
"errors"
"net"
"net/http"
"regexp"
"strconv"
"time"
"github.com/30x/changeagent/common"
"github.com/30x/changeagent/communication"
"github.com/30x/changeagent/raft"
"github.com/30x/changeagent/storage"
"github.com/golang/glog"
"github.com/gorilla/mux"
)
/*
ChangeAgent is a server that implements the Raft protocol, plus the "changeagent"
API.
*/
type ChangeAgent struct {
stor storage.Storage
raft *raft.Service
router *mux.Router
markedDown int32
uriPrefix string
}
const (
// NormalChange denotes a Raft proposal that will appear to everyone in the change log.
// We may introduce additional change types in the future.
NormalChange = 0
dbCacheSize = 10 * 1024 * 1024
plainTextContent = "text/plain"
jsonContent = "application/json"
)
/*
StartChangeAgent starts an instance of changeagent with its API listening on a specific
HTTP "mux".
"dbFile" denotes the name of the base directory for the local RocksDB database.
"httpMux" must have been previously created using the "net/http" package,
and it must listen for HTTP requests.
If "uriPrefix" is not the empty string, then every API call will require that
it be prepended. In other words, "/changes" will become "/prefix/changes".
The prefix must not end with a "/".
*/
func StartChangeAgent(
dbFile string,
httpMux *http.ServeMux,
uriPrefix string,
comm communication.Communication) (*ChangeAgent, error) {
if uriPrefix != "" {
if uriPrefix[len(uriPrefix)-1] == '/' {
return nil, errors.New("Invalid URI prefix: Must not end with a slash")
}
if uriPrefix[0] != '/' {
uriPrefix = "/" + uriPrefix
}
}
stor, err := storage.CreateRocksDBStorage(dbFile, dbCacheSize)
if err != nil {
return nil, err
}
agent := &ChangeAgent{
stor: stor,
router: mux.NewRouter(),
uriPrefix: uriPrefix,
}
raft, err := raft.StartRaft(comm, stor, agent)
if err != nil {
return nil, err
}
agent.raft = raft
comm.SetRaft(raft)
agent.initDiagnosticAPI(uriPrefix)
agent.initChangesAPI(uriPrefix)
agent.initHooksAPI(uriPrefix)
agent.initClusterAPI(uriPrefix)
agent.initConfigAPI(uriPrefix)
httpMux.Handle("/", agent.router)
return agent, nil
}
/*
Close stops changeagent.
*/
func (a *ChangeAgent) Close() {
a.raft.Close()
a.stor.Close()
}
/*
Delete deletes the database, cleaning out the contents of the DB
directory. "Close" must be called first.
*/
func (a *ChangeAgent) Delete() {
a.stor.Delete()
}
/*
GetRaftState returns the state of the internal Raft implementation.
*/
func (a *ChangeAgent) GetRaftState() raft.State {
return a.raft.GetState()
}
func (a *ChangeAgent) makeProposal(proposal *common.Entry) (*common.Entry, error) {
// Timestamp and otherwise update the proposal
proposal.Timestamp = time.Now()
// Send the raft proposal. This happens asynchronously.
newIndex, err := a.raft.Propose(proposal)
if err != nil {
glog.Warningf("Fatal error making Raft proposal: %v", err)
return nil, err
}
glog.V(2).Infof("Proposed new change with index %d", newIndex)
err = a.raft.WaitForCommit(newIndex)
if err == nil {
newEntry := &common.Entry{
Index: newIndex,
}
return newEntry, nil
}
return nil, err
}
/*
Commit is called by the Raft implementation when an entry has reached
commit state. However, we do not do anything here today.
*/
func (a *ChangeAgent) Commit(entry *common.Entry) error {
// Nothing to do now. Perhaps we take this interface out.
return nil
}
func writeError(resp http.ResponseWriter, code int, err error) {
glog.Errorf("Returning error %d: %s", code, err)
msg := marshalError(err)
resp.Header().Set("Content-Type", jsonContent)
resp.WriteHeader(code)
resp.Write([]byte(msg))
}
var jsonContentRe = regexp.MustCompile("^application/json(;.*)?$")
func isJSON(resp http.ResponseWriter, req *http.Request) bool {
if !jsonContentRe.MatchString(req.Header.Get("Content-Type")) {
writeError(resp, http.StatusUnsupportedMediaType, errors.New("Unsupported content type"))
return false
}
return true
}
func startListener(port int, key, cert, cas string) (net.Listener, int, error) {
addr := &net.TCPAddr{
Port: port,
}
tcpListener, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, 0, err
}
var listener net.Listener = tcpListener
success := false
defer func() {
if !success {
listener.Close()
}
}()
if key != "" || cert != "" {
if key == "" || cert == "" {
return nil, 0, errors.New("Both -key and -cert must be set")
}
tlsCert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, 0, err
}
tlsCfg := tls.Config{
Certificates: []tls.Certificate{tlsCert},
}
if cas != "" {
caPool, err := communication.LoadCertPool(cas)
if err != nil {
return nil, 0, err
}
// If we have "cas" then also verify clients
tlsCfg.ClientCAs = caPool
tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert
}
listener = tls.NewListener(tcpListener, &tlsCfg)
}
success = true
_, portStr, _ := net.SplitHostPort(listener.Addr().String())
listenPort, _ := strconv.Atoi(portStr)
return listener, listenPort, err
}