-
Notifications
You must be signed in to change notification settings - Fork 2
/
writerproxy.go
133 lines (110 loc) · 3.8 KB
/
writerproxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package writerproxy
// To issue any commands to Event Horizon, you must contact a Writer. But that means
// that you have to have correct TLS configuration, know the authentication tokens,
// know the IP addresses of all the Writers, which Writers are responsible for
// which streams etc.
//
// To make things easier, the Pusher mirrors the API surface of a Writer, and
// handles all that above stuff for you, so you can just issue HTTP requests to
// a hardcoded IP (loopback) which will proxy your commands to a Writer.
//
// This is the proxy portion for this described setup.
import (
"encoding/json"
"github.com/function61/eventhorizon/config"
wtypes "github.com/function61/eventhorizon/writer/types"
"github.com/function61/eventhorizon/writer/writerclient"
"log"
"net/http"
"sync"
)
type Proxy struct {
serverDone *sync.WaitGroup
server *http.Server
writerClient *writerclient.Client
}
func New(confCtx *config.Context, writerClient *writerclient.Client) *Proxy {
// listen on loopback - this service is not intended to be called by
// other components than the application
p := &Proxy{
serverDone: &sync.WaitGroup{},
server: &http.Server{Addr: "127.0.0.1:9093"},
writerClient: writerClient,
}
// these paths probably should be /writerproxy, but we decided to keep the
// paths identical to Writer, so writerclient and pushlib writer proxy
// client can be as similar as possible
http.HandleFunc("/writer/append", func(w http.ResponseWriter, r *http.Request) {
var appendToStreamRequest wtypes.AppendToStreamRequest
if err := json.NewDecoder(r.Body).Decode(&appendToStreamRequest); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
output, err := p.writerClient.Append(&appendToStreamRequest)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(output)
})
http.HandleFunc("/writer/create_stream", func(w http.ResponseWriter, r *http.Request) {
var req wtypes.CreateStreamRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
output, err := p.writerClient.CreateStream(&req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(output)
})
http.HandleFunc("/writer/subscribe", func(w http.ResponseWriter, r *http.Request) {
var req wtypes.SubscribeToStreamRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
err := p.writerClient.SubscribeToStream(&req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write([]byte("OK"))
})
http.HandleFunc("/writer/unsubscribe", func(w http.ResponseWriter, r *http.Request) {
var req wtypes.UnsubscribeFromStreamRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
err := p.writerClient.UnsubscribeFromStream(&req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write([]byte("OK"))
})
return p
}
func (p *Proxy) Run() {
p.serverDone.Add(1)
go func() {
defer p.serverDone.Done()
if err := p.server.ListenAndServe(); err != nil {
// cannot panic, because this probably is an intentional close
log.Printf("writerproxy: ListenAndServe() error: %s", err)
}
}()
}
func (p *Proxy) Close() {
// now close the server gracefully ("shutdown")
// timeout could be given instead of nil as a https://golang.org/pkg/context/
if err := p.server.Shutdown(nil); err != nil {
panic(err) // failure/timeout shutting down the server gracefully
}
p.serverDone.Wait()
}