forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
handler.go
156 lines (132 loc) · 4.31 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package main
import (
"log"
"math/rand"
"net/http"
"net/url"
"strings"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/httpd"
"github.com/influxdb/influxdb/messaging"
"github.com/influxdb/influxdb/raft"
)
// Handler represents an HTTP handler for InfluxDB node.
// Depending on its role, it will serve many different endpoints.
type Handler struct {
Log *raft.Log
Broker *influxdb.Broker
Server *influxdb.Server
Config *Config
}
// NewHandler returns a new instance of Handler.
func NewHandler() *Handler {
return &Handler{}
}
// ServeHTTP responds to HTTP request to the handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// FIXME: This is very brittle. Refactor to have common path prefix
if strings.HasPrefix(r.URL.Path, "/raft") {
h.serveRaft(w, r)
return
}
if strings.HasPrefix(r.URL.Path, "/messaging") {
h.serveMessaging(w, r)
return
}
if strings.HasPrefix(r.URL.Path, "/data_nodes") ||
strings.HasPrefix(r.URL.Path, "/process_continuous_queries") ||
strings.HasPrefix(r.URL.Path, "/metastore") {
h.serveMetadata(w, r)
return
}
h.serveData(w, r)
}
// serveMessaging responds to broker requests
func (h *Handler) serveMessaging(w http.ResponseWriter, r *http.Request) {
if h.Broker == nil && h.Server == nil {
log.Println("no broker or server configured to handle messaging endpoints")
http.Error(w, "server unavailable", http.StatusServiceUnavailable)
return
}
// If we're running a broker, handle the broker endpoints
if h.Broker != nil {
mh := &messaging.Handler{
Broker: h.Broker.Broker,
RaftHandler: &raft.Handler{Log: h.Log},
}
mh.ServeHTTP(w, r)
return
}
// Redirect to a valid broker to handle the request
h.redirect(h.Server.BrokerURLs(), w, r)
}
// serveMetadata responds to broker requests
func (h *Handler) serveMetadata(w http.ResponseWriter, r *http.Request) {
if h.Broker == nil && h.Server == nil {
log.Println("no broker or server configured to handle metadata endpoints")
http.Error(w, "server unavailable", http.StatusServiceUnavailable)
return
}
if h.Server != nil {
sh := httpd.NewClusterHandler(h.Server, h.Config.Authentication.Enabled, version)
sh.WriteTrace = h.Config.Logging.WriteTracing
sh.ServeHTTP(w, r)
return
}
t := h.Broker.Topic(influxdb.BroadcastTopicID)
if t == nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
// Redirect to a valid data URL to handle the request
h.redirect(h.Broker.Topic(influxdb.BroadcastTopicID).DataURLs(), w, r)
}
// serveRaft responds to raft requests.
func (h *Handler) serveRaft(w http.ResponseWriter, r *http.Request) {
if h.Log == nil && h.Server == nil {
log.Println("no broker or server configured to handle raft endpoints")
http.Error(w, "server unavailable", http.StatusServiceUnavailable)
return
}
if h.Log != nil {
rh := raft.Handler{Log: h.Log}
rh.ServeHTTP(w, r)
return
}
// Redirect to a valid broker to handle the request
h.redirect(h.Server.BrokerURLs(), w, r)
}
// serveData responds to data requests
func (h *Handler) serveData(w http.ResponseWriter, r *http.Request) {
if h.Broker == nil && h.Server == nil {
log.Println("no broker or server configured to handle data endpoints")
http.Error(w, "server unavailable", http.StatusServiceUnavailable)
return
}
if h.Server != nil {
sh := httpd.NewAPIHandler(h.Server, h.Config.Authentication.Enabled, version)
sh.WriteTrace = h.Config.Logging.WriteTracing
sh.ServeHTTP(w, r)
return
}
t := h.Broker.Topic(influxdb.BroadcastTopicID)
if t == nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
// Redirect to a valid data URL to handle the request
h.redirect(h.Broker.Topic(influxdb.BroadcastTopicID).DataURLs(), w, r)
}
// redirect redirects a request to URL in u. If u is an empty slice,
// a 503 is returned
func (h *Handler) redirect(u []url.URL, w http.ResponseWriter, r *http.Request) {
// No valid URLs, return an error
if len(u) == 0 {
http.Error(w, "service unavailable", http.StatusServiceUnavailable)
return
}
// TODO: Log to internal stats to track how frequently this is happening. If
// this is happening frequently, the clients are using a suboptimal endpoint
// Redirect the client to a valid data node that can handle the request
http.Redirect(w, r, u[rand.Intn(len(u))].String()+r.RequestURI, http.StatusTemporaryRedirect)
}