/
store.go
128 lines (116 loc) · 2.64 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package mdbhttp
import (
"bytes"
"compress/flate"
"compress/gzip"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
meter "github.com/alxarch/go-meter/v2"
)
// Storer is a remote Storer over HTTP
type Storer struct {
HTTPClient
URL string
}
// Store implements EventStore interface
func (c *Storer) Store(r *meter.Snapshot) (err error) {
body := getSyncBuffer()
defer putSyncBuffer(body)
err = body.Encode(r)
if err != nil {
return
}
req, err := http.NewRequest(http.MethodPost, c.URL, &body.buffer)
if err != nil {
return
}
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("Content-Type", "application/json")
client := c.HTTPClient
if client == nil {
client = http.DefaultClient
}
res, err := client.Do(req)
if err != nil {
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
err = fmt.Errorf("Invalid HTTP status: [%d] %s", res.StatusCode, res.Status)
}
return
}
type syncBuffer struct {
buffer bytes.Buffer
gzip *gzip.Writer
json *json.Encoder
}
var syncBuffers sync.Pool
func getSyncBuffer() *syncBuffer {
if x := syncBuffers.Get(); x != nil {
return x.(*syncBuffer)
}
return new(syncBuffer)
}
func putSyncBuffer(b *syncBuffer) {
syncBuffers.Put(b)
}
func (b *syncBuffer) Encode(x interface{}) error {
b.buffer.Reset()
if b.gzip == nil {
b.gzip = gzip.NewWriter(&b.buffer)
} else {
b.gzip.Reset(&b.buffer)
}
if b.json == nil {
b.json = json.NewEncoder(b.gzip)
}
if err := b.json.Encode(x); err != nil {
return err
}
return b.gzip.Close()
}
// StoreHandler returns an HTTP endpoint for an EventStore
func StoreHandler(s meter.Storer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
req := meter.Snapshot{}
dec := json.NewDecoder(r.Body)
if err := dec.Decode(&req); err != nil {
code := http.StatusBadRequest
http.Error(w, http.StatusText(code), code)
return
}
if req.Time.IsZero() {
req.Time = time.Now()
}
if err := s.Store(&req); err != nil {
code := http.StatusInternalServerError
http.Error(w, http.StatusText(code), code)
return
}
sendJSON(w, json.RawMessage(`{"status":"OK"}`))
}
}
// InflateRequest middleware inflates request body
func InflateRequest(next http.Handler) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
body := r.Body
defer body.Close()
switch r.Header.Get("Content-Encoding") {
case "gzip":
// err is returned on first read
zr, _ := gzip.NewReader(body)
r.Header.Del("Content-Encoding")
r.Body = zr
case "deflate":
zr := flate.NewReader(body)
r.Header.Del("Content-Encoding")
r.Body = zr
}
next.ServeHTTP(w, r)
}
}