/
writer.go
278 lines (251 loc) · 7.31 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
// Copyright 2016 Orion Labs, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rbs
import (
"bytes"
"fmt"
"io"
"time"
"github.com/garyburd/redigo/redis"
)
// DefaultMaxChunkSize is 1kb
const DefaultMaxChunkSize uint16 = 2 ^ 10 // 1024
const syncChannelPattern = "rbs:sync:%s"
// An RedWriter is the implementation that stores a variable-length stream
// of bytes into a Redis hash.
type RedWriter struct {
quit bool
conn redis.Conn
name string
next uint16
min uint16
max uint16
carry *bytes.Buffer
trailers []CmdBuilder
}
// NewWriter creates an io.WriteCloser that will write byte chunks to Redis.
// The underlying implementation is a *RedWriter.
//
// The intendend usage for this is a growing stream of bytes where the full
// length is not known until the stream is closed. (Though, this does not
// preclude the ability to be used for streams of known/fixed length.)
//
// A single Redis hash is the sole underlying data structure. The name parameter
// is used as the Redis key, and each chunk of bytes is stored as a
// field+value pair on the hash. This implementation reserves all field names
// starting with "c:" (for chunk), but is open to clients using any arbitrary
// other field names on this hash.
//
// Network hits are minimized by using Redis pipelining. Per Write
// invocation, the buffer is apportioned into configurable-sized chunks,
// any trailers (arbitrary Redis command configured by the client) are
// assembled, and all of it is written out to Redis at once.
//
// Upon Close, if >0 bytes had been written, a last pipeline is
// constructed, writing out any buffered data, an end-of-stream marker, and
// any configured trailers.
//
// The default configuration has a 1024-byte maximum chunk size, and a 0-byte
// minimum chunk size, and no trailers. Clients will want to adjust these
// parameters to their use cases. (Some factors that may be considered in
// this tuning: desired ingress/egress data rates; Redis tuning; etc.)
//
// Configuration is by functional options, as inspired by this blog post:
// http://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis
//
// See: WriteExpire, WriteMaxChunk, WriteMinChunk, WriteStdPub, WriteTrailer
func NewWriter(conn redis.Conn, name string, options ...func(*RedWriter) error) (io.WriteCloser, error) {
w := &RedWriter{
conn: conn,
name: name,
max: DefaultMaxChunkSize,
carry: bytes.NewBuffer(make([]byte, 0, DefaultMaxChunkSize)),
}
for _, option := range options {
err := option(w)
if err != nil {
w.Close()
return nil, err
}
}
return w, nil
}
// WriteMaxChunk overrides the default maximum size per chunk written to Redis
func WriteMaxChunk(max uint16) func(*RedWriter) error {
return func(rw *RedWriter) error {
if rw.min > max {
return fmt.Errorf("size conflict: %d > %d", rw.min, max)
}
rw.max = max
rw.carry = bytes.NewBuffer(make([]byte, 0, max))
return nil
}
}
// WriteMinChunk sets the minimums size per chunk written to Redis
func WriteMinChunk(min uint16) func(*RedWriter) error {
return func(rw *RedWriter) error {
if min > rw.max {
return fmt.Errorf("size conflict: %d > %d", min, rw.max)
}
rw.min = min
return nil
}
}
// A CmdBuilder spits out the parameters that are used for invocation
// of a Redis command
type CmdBuilder interface {
Build() (string, []interface{}, error)
}
// CmdBuilderFunc is a function adapter for the CmdBuilder interface
type CmdBuilderFunc func() (string, []interface{}, error)
// Build conforms to the CmdBuilder interface
func (f CmdBuilderFunc) Build() (string, []interface{}, error) {
return f()
}
// WriteTrailer provides clients a way to configure arbitrary Redis commands
// to be included per pipeline flush
func WriteTrailer(c CmdBuilder) func(*RedWriter) error {
return func(rw *RedWriter) error {
rw.trailers = append(rw.trailers, c)
return nil
}
}
// WriteExpire sends a Redis EXPIRE command for the underlying hash upon
// each pipeline flush
func WriteExpire(sec uint16) func(*RedWriter) error {
return func(rw *RedWriter) error {
rw.trailers = append(
rw.trailers,
CmdBuilderFunc(func() (string, []interface{}, error) {
return "EXPIRE", []interface{}{rw.name, sec}, nil
}),
)
return nil
}
}
// WriteStdPub sends a standard Redis PUBLISH message for this hash on
// every pipeline flush. This is inteneded to be used in concert with
// a standard read-side subscription for synchronizing when there are
// new bytes to be read.
func WriteStdPub() func(*RedWriter) error {
return func(rw *RedWriter) error {
channel := fmt.Sprintf(syncChannelPattern, rw.name)
rw.trailers = append(
rw.trailers,
CmdBuilderFunc(func() (string, []interface{}, error) {
return "PUBLISH", []interface{}{channel, time.Now().Unix()}, nil
}),
)
return nil
}
}
func (rw *RedWriter) writeChunk(b []byte) error {
chunk := fmt.Sprintf(chunkNumber, rw.next)
err := rw.conn.Send("HSET", rw.name, chunk, b)
if err != nil {
return err
}
rw.next++
return nil
}
func (rw *RedWriter) flush() error {
for _, builder := range rw.trailers {
cmd, parts, err := builder.Build()
if err != nil {
return err
}
err = rw.conn.Send(cmd, parts...)
if err != nil {
return err
}
}
return rw.conn.Flush()
}
// Close conforms to the io.Closer interface.
//
// It is necessary to write out any buffered (leftover) bytes, and to
// add the end-of-stream marker
//
// This finishes by calling Close on the underlyling Redis connection.
func (rw *RedWriter) Close() error {
if rw.quit {
return io.ErrClosedPipe
}
rw.quit = true
defer rw.conn.Close()
flush := false
if rw.carry.Len() > 0 {
err := rw.writeChunk(rw.carry.Bytes())
if err != nil {
return err
}
flush = true
}
if rw.next > 0 {
body := rw.next - 1
err := rw.conn.Send("HSET", rw.name, chunkLast, body)
if err != nil {
return err
}
flush = true
}
if flush {
return rw.flush()
}
return nil
}
// Write conforms to the io.Writer interface.
//
// This is where the incoming buffer is apportioned into chunks, and
// written out to Redis via a pipeline.
//
// There should be at most 1 hit to Redis per Write invocation.
// (Only fewer if a chunk is less than the configured minimum chunk size.)
func (rw *RedWriter) Write(p []byte) (int, error) {
if rw.quit {
return 0, io.ErrClosedPipe
}
offset := 0
n := len(p)
flush := false
for offset < n {
remains := n - offset
if remains+rw.carry.Len() < int(rw.min) {
rw.carry.Write(p[offset:])
break
}
grab := int(rw.max) - rw.carry.Len()
if grab > remains {
grab = remains
}
c, err := rw.carry.Write(p[offset:(offset + grab)])
if err != nil {
return 0, err
}
offset += c
err = rw.writeChunk(rw.carry.Bytes())
if err != nil {
return 0, err
}
flush = true
rw.carry.Reset()
}
if flush {
ferr := rw.flush()
if ferr != nil {
return 0, ferr
}
}
return n, nil
}