/
api.go
165 lines (139 loc) · 4.37 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package raftchunking
import (
"bytes"
"crypto/rand"
"encoding/binary"
"fmt"
"io"
"time"
"github.com/hashicorp/go-raftchunking/types"
"github.com/hashicorp/raft"
proto "google.golang.org/protobuf/proto"
)
var (
// ChunkSize is the threshold used for breaking a large value into chunks.
// Defaults to the suggested max data size for the raft library.
ChunkSize = raft.SuggestedMaxDataSize
)
// errorFuture is used to return a static error.
type errorFuture struct {
err error
}
func (e errorFuture) Error() error {
return e.err
}
func (e errorFuture) Response() interface{} {
return nil
}
func (e errorFuture) Index() uint64 {
return 0
}
// multiFuture is a future specialized for the chunking case. It contains some
// number of other futures in the order in which data was chunked and sent to
// apply.
type multiFuture []raft.ApplyFuture
// Error will return only when all Error functions in the contained futures
// return, in order.
func (m multiFuture) Error() error {
for _, v := range m {
if err := v.Error(); err != nil {
return err
}
}
return nil
}
// Index returns the index of the last chunk. Since required behavior is to not
// call this until Error is called, the last Index will correspond to the Apply
// of the final chunk.
func (m multiFuture) Index() uint64 {
// This shouldn't happen but need an escape hatch
if len(m) == 0 {
return 0
}
return m[len(m)-1].Index()
}
// Response returns the response from underlying Apply of the last chunk.
func (m multiFuture) Response() interface{} {
// This shouldn't happen but need an escape hatch
if len(m) == 0 {
return nil
}
return m[len(m)-1].Response()
}
type ApplyFunc func(raft.Log, time.Duration) raft.ApplyFuture
// ChunkingApply takes in a byte slice and chunks into ChunkSize (or less if
// EOF) chunks, calling Apply on each. It requires a corresponding wrapper
// around the FSM to handle reconstructing on the other end. Timeout will be the
// timeout for each individual operation, not total. The return value is a
// future whose Error() will return only when all underlying Apply futures have
// had Error() return. Note that any error indicates that the entire operation
// will not be applied, assuming the correct FSM wrapper is used. If extensions
// is passed in, it will be set as the Extensions value on the Apply once all
// chunks are received.
func ChunkingApply(cmd, extensions []byte, timeout time.Duration, applyFunc ApplyFunc) raft.ApplyFuture {
// Generate a random op num via 64 random bits. These only have to be
// unique across _in flight_ chunk operations until a Term changes so
// should be fine.
rb := make([]byte, 8)
n, err := rand.Read(rb)
if err != nil {
return errorFuture{err: err}
}
if n != 8 {
return errorFuture{err: fmt.Errorf("expected to read %d bytes for op num, read %d", 8, n)}
}
opNum := binary.BigEndian.Uint64(rb)
var logs []raft.Log
var byteChunks [][]byte
var mf multiFuture
// We break into chunks first so that we know how many chunks there will be
// to put in NumChunks in the extensions info. This could probably be a bit
// more efficient by just reslicing but doing it this way is a bit easier
// for others to follow/track and in this kind of operation this won't be
// the slow part anyways.
reader := bytes.NewReader(cmd)
remain := reader.Len()
for {
if remain <= 0 {
break
}
if remain > ChunkSize {
remain = ChunkSize
}
b := make([]byte, remain)
n, err := reader.Read(b)
if err != nil && err != io.EOF {
return errorFuture{err: err}
}
if n != remain {
return errorFuture{err: fmt.Errorf("expected to read %d bytes from buf, read %d", remain, n)}
}
byteChunks = append(byteChunks, b)
remain = reader.Len()
}
// Create the underlying chunked logs
for i, chunk := range byteChunks {
chunkInfo := &types.ChunkInfo{
OpNum: opNum,
SequenceNum: uint32(i),
NumChunks: uint32(len(byteChunks)),
}
// If extensions were passed in attach them to the last chunk so it
// will go through Apply at the end.
if i == len(byteChunks)-1 {
chunkInfo.NextExtensions = extensions
}
chunkBytes, err := proto.Marshal(chunkInfo)
if err != nil {
return errorFuture{err: fmt.Errorf("error marshaling chunk info: %w", err)}
}
logs = append(logs, raft.Log{
Data: chunk,
Extensions: chunkBytes,
})
}
for _, log := range logs {
mf = append(mf, applyFunc(log, timeout))
}
return mf
}