forked from cockroachdb/cockroach
/
replica_sideload.go
218 lines (199 loc) · 8.2 KB
/
replica_sideload.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
// Copyright 2017 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package storage
import (
"golang.org/x/net/context"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/coreos/etcd/raft/raftpb"
"github.com/pkg/errors"
)
var errSideloadedFileNotFound = errors.New("sideloaded file not found")
// sideloadStorage is the interface used for Raft SSTable sideloading.
// Implementations do not need to be thread safe.
type sideloadStorage interface {
// Writes the given contents to the file specified by the given index and
// term. Does not perform the write if the file exists.
PutIfNotExists(_ context.Context, index, term uint64, contents []byte) error
// Load the file at the given index and term. Return errSideloadedFileNotFound when no
// such file is present.
Get(_ context.Context, index, term uint64) ([]byte, error)
// Purge removes the file at the given index and term. It may also
// remove any leftover files at the same index and earlier terms, but
// is not required to do so. When no file at the given index and term
// exists, returns errSideloadedFileNotFound.
Purge(_ context.Context, index, term uint64) error
// Clear files that may have been written by this sideloadStorage.
Clear(context.Context) error
// TruncateTo removes all files belonging to an index strictly smaller than
// the given one.
TruncateTo(_ context.Context, index uint64) error
// Returns an absolute path to the file that Get() would return the contents
// of. Does not check whether the file actually exists.
Filename(_ context.Context, index, term uint64) (string, error)
}
// maybeSideloadEntriesRaftMuLocked should be called with a slice of "fat"
// entries before appending them to the Raft log. For those entries which are
// sideloadable, this is where the actual sideloading happens: in come fat
// proposals, out go thin proposals. Note that this method is to be called
// before modifications are persisted to the log. The other way around is
// incorrect since an ill-timed crash gives you thin proposals and no files.
//
// The passed-in slice is not mutated.
func (r *Replica) maybeSideloadEntriesRaftMuLocked(
ctx context.Context, entriesToAppend []raftpb.Entry,
) ([]raftpb.Entry, error) {
// TODO(tschottdorf): allocating this closure could be expensive. If so make
// it a method on Replica.
maybeRaftCommand := func(cmdID storagebase.CmdIDKey) (storagebase.RaftCommand, bool) {
r.mu.Lock()
defer r.mu.Unlock()
cmd, ok := r.mu.proposals[cmdID]
if ok {
return cmd.command, true
}
return storagebase.RaftCommand{}, false
}
return maybeSideloadEntriesImpl(ctx, entriesToAppend, r.raftMu.sideloaded, maybeRaftCommand)
}
// maybeSideloadEntriesImpl iterates through the provided slice of entries. If
// no sideloadable entries are found, it returns the same slice. Otherwise, it
// returns a new slice in which all applicable entries have been sideloaded to
// the specified sideloadStorage. maybeRaftCommand is called when sideloading is
// necessary and can optionally supply a pre-Unmarshaled RaftCommand (which
// usually is provided by the Replica in-flight proposal map.
func maybeSideloadEntriesImpl(
ctx context.Context,
entriesToAppend []raftpb.Entry,
sideloaded sideloadStorage,
maybeRaftCommand func(storagebase.CmdIDKey) (storagebase.RaftCommand, bool),
) ([]raftpb.Entry, error) {
cow := false
for i := range entriesToAppend {
var err error
if sniffSideloadedRaftCommand(entriesToAppend[i].Data) {
log.Event(ctx, "sideloading command in append")
if !cow {
// Avoid mutating the passed-in entries directly. The caller
// wants them to remain "fat".
log.Eventf(ctx, "copying entries slice of length %d", len(entriesToAppend))
cow = true
entriesToAppend = append([]raftpb.Entry(nil), entriesToAppend...)
}
ent := &entriesToAppend[i]
cmdID, data := DecodeRaftCommand(ent.Data) // cheap
strippedCmd, ok := maybeRaftCommand(cmdID)
if ok {
// Happy case: we have this proposal locally (i.e. we proposed
// it). In this case, we can save unmarshalling the fat proposal
// because it's already in-memory.
if strippedCmd.ReplicatedEvalResult.AddSSTable == nil {
log.Fatalf(ctx, "encountered sideloaded non-AddSSTable command: %+v", strippedCmd)
}
log.Eventf(ctx, "command already in memory")
// The raft proposal is immutable. To respect that, shallow-copy
// the (nullable) AddSSTable struct which we intend to modify.
addSSTableCopy := *strippedCmd.ReplicatedEvalResult.AddSSTable
strippedCmd.ReplicatedEvalResult.AddSSTable = &addSSTableCopy
} else {
// Bad luck: we didn't have the proposal in-memory, so we'll
// have to unmarshal it.
log.Event(ctx, "proposal not already in memory; unmarshaling")
if err := strippedCmd.Unmarshal(data); err != nil {
return nil, err
}
}
if strippedCmd.ReplicatedEvalResult.AddSSTable == nil {
// Still no AddSSTable; someone must've proposed a v2 command
// but not becaused it contains an inlined SSTable. Strange, but
// let's be future proof.
log.Warning(ctx, "encountered sideloaded Raft command without inlined payload")
continue
}
// Actually strip the command.
dataToSideload := strippedCmd.ReplicatedEvalResult.AddSSTable.Data
strippedCmd.ReplicatedEvalResult.AddSSTable.Data = nil
{
var err error
data, err = strippedCmd.Marshal()
if err != nil {
return nil, errors.Wrap(err, "while marshalling stripped sideloaded command")
}
}
ent.Data = encodeRaftCommandV2(cmdID, data)
log.Eventf(ctx, "writing payload at index=%d term=%d", ent.Index, ent.Term)
if err = sideloaded.PutIfNotExists(ctx, ent.Index, ent.Term, dataToSideload); err != nil {
return nil, err
}
}
}
return entriesToAppend, nil
}
func sniffSideloadedRaftCommand(data []byte) (sideloaded bool) {
return len(data) > 0 && data[0] == byte(raftVersionSideloaded)
}
// maybeInlineSideloadedRaftCommand takes an entry and inspects it. If its
// command encoding version indicates a sideloaded entry, it uses the entryCache
// or sideloadStorage to inline the payload, returning a new entry (which must
// be treated as immutable by the caller) or nil (if inlining does not apply)
//
// If a payload is missing, returns an error whose Cause() is
// errSideloadedFileNotFound.
func maybeInlineSideloadedRaftCommand(
ctx context.Context,
rangeID roachpb.RangeID,
ent raftpb.Entry,
sideloaded sideloadStorage,
entryCache *raftEntryCache,
) (*raftpb.Entry, error) {
if !sniffSideloadedRaftCommand(ent.Data) {
return nil, nil
}
log.Event(ctx, "inlining sideloaded SSTable")
// We could unmarshal this yet again, but if it's committed we
// are very likely to have appended it recently, in which case
// we can save work.
cachedSingleton, _, _ := entryCache.getEntries(
nil, rangeID, ent.Index, ent.Index+1, 1<<20,
)
if len(cachedSingleton) > 0 {
log.Event(ctx, "using cache hit")
return &cachedSingleton[0], nil
}
// Make a shallow copy.
entCpy := ent
ent = entCpy
log.Event(ctx, "inlined entry not cached")
// Out of luck, for whatever reason the inlined proposal isn't in the cache.
cmdID, data := DecodeRaftCommand(ent.Data)
ent.Data = nil // no reuse of potentially shared slice
var command storagebase.RaftCommand
if err := command.Unmarshal(data); err != nil {
return nil, err
}
sideloadedData, err := sideloaded.Get(ctx, ent.Index, ent.Term)
if err != nil {
return nil, errors.Wrap(err, "loading sideloaded data")
}
command.ReplicatedEvalResult.AddSSTable.Data = sideloadedData
{
data, err := command.Marshal()
if err != nil {
return nil, err
}
ent.Data = encodeRaftCommandV2(cmdID, data)
}
return &ent, nil
}