-
Notifications
You must be signed in to change notification settings - Fork 60
/
vbucket_filter.h
324 lines (281 loc) · 10.6 KB
/
vbucket_filter.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2017-Present Couchbase, Inc.
*
* Use of this software is governed by the Business Source License included
* in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
* in that file, in accordance with the Business Source License, use of this
* software will be governed by the Apache License, Version 2.0, included in
* the file licenses/APL2.txt.
*/
#pragma once
#include "collections/collections_types.h"
#include "item.h"
#include <memcached/dcp_stream_id.h>
#include <memcached/engine_common.h>
#include <memcached/engine_error.h>
#include <nlohmann/json_fwd.hpp>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
class EventuallyPersistentEngine;
class SystemEventMessage;
namespace Collections::VB {
class Manifest;
class ReadHandle;
/**
* The VB filter object constructs from data that is yielded from DCP stream
* request, an optional string. The string is optional because a legacy DCP
* user will not set any string data, whilst a collection aware request can.
* However the collection aware request may contain an empty string. For
* reference here's what the optiona string means.
*
* - uninitialized: legacy stream-request. A client that can only ever receive
* the default collection.
* - empty string: A collection aware client that wants *everything* from the
* epoch. For example KV replication streams.
* - non-empty string: A collection aware client wanting to resume a stream or
* request specific collections (or both). The non-empty
* string is a JSON document as follows.
*
* A client can request individual collections or a group by defining an array
* of collection-IDs.
*
* Client wants two collections:
* {"collections":["0", "ac1"]}}
*
* Client wants to resume a stream that was previously everything, client
* specifies the highest manifest UID they've received.
* {"uid":"5"}
*
* Why not both? Client wants to resume a subset of collections.
* {"collections":["0", "ac1"]}, "uid":"5"}
*
* The class exposes methods which are for use by the ActiveStream.
*
* Should a Mutation/Deletion/SystemEvent be included in the DCP stream?
* - checkAndUpdate
* checkAndUpdate is non-const because for SystemEvents which represent the
* drop of a collection, they will modify the Filter to remove the collection
* if the filter was targeting the collection.
*
*/
class Filter {
public:
/**
* Construct a Collections::VB::Filter using the producer's
* Collections::Filter and the vbucket's collections manifest.
*
* If the producer's filter is configured to filter collections then the
* resulting object will filter the intersection filter:manifest
* collections. The constructor will log when it finds it must drop a
* collection
*
* If the producer's filter is effectively a passthrough
* (allowAllCollections returns true) then so will the resulting VB filter.
*
* @param jsonFilter Optional string data that can contain JSON
* configuration info
* @param manifest The vbucket's collection manifest.
* @param cookie Cookie associated with the connection that is making a
* stream-request
* @param engine reference to engine for checkPrivilege calls
* @throws cb::engine_error
*/
Filter(std::optional<std::string_view> jsonFilter,
const ::Collections::VB::Manifest& manifest,
gsl::not_null<const void*> cookie,
const EventuallyPersistentEngine& engine);
/**
* Check the item and if required and maybe update the filter.
*
* If the item represents a collection deletion and this filter matches the
* collection, we must update the filter so we can later see that the filter
* is empty, and the DCP stream can choose to close.
*
* @param item an Item to be processed.
* @return if the Item is allowed to be sent on the DcpStream
*/
bool checkAndUpdate(Item& item) {
// passthrough, everything is allowed.
if (passthrough) {
return true;
}
// The presence of _default is a simple check against defaultAllowed
if (item.getKey().isInDefaultCollection() && defaultAllowed) {
return true;
}
// More complex checks needed...
return checkAndUpdateSlow(item);
}
/**
* Check if the filter allows the collection
*
* Note: if cid is SystemEvent then we say it is allowed as we don't know
* enough about the type/scope/collection it represents to say it is not.
*
* @param cid The collection-ID to check.
* @return if the key should be allowed to be sent on the DcpStream
*/
bool check(DocKey key) const {
// passthrough, everything is allowed.
if (passthrough) {
return true;
}
// The presence of _default is a simple check against defaultAllowed
if (key.isInDefaultCollection() && defaultAllowed) {
return true;
}
// More complex checks needed...
return checkSlow(key);
}
/**
* @return if the filter is empty
*/
bool empty() const;
/**
* Check the privilege revision for any change. If changed update our copy
* of the revision and then evaluate the required privileges.
* @return engine status - success/no_access/unknown_scope|collection
*/
cb::engine_errc checkPrivileges(gsl::not_null<const void*> cookie,
const EventuallyPersistentEngine& engine);
/**
* Add statistics for this filter, currently just depicts the object's state
*/
void addStats(const AddStatFn& add_stat,
const void* c,
const std::string& prefix,
Vbid vb) const;
/**
* Was this filter constructed for a non-collection aware client?
*/
bool isLegacyFilter() const {
return !systemEventsAllowed;
}
/// @return the size of the filter
size_t size() const {
return filter.size();
}
CollectionID front() const {
return filter.begin()->first;
}
cb::mcbp::DcpStreamId getStreamId() const {
return streamId;
}
using Container = ::std::unordered_map<CollectionID, ScopeID>;
Container::const_iterator begin() const {
return filter.begin();
}
Container::const_iterator end() const {
return filter.end();
}
/// @return true if this filter represents a single collection
bool singleCollection() const {
return !passthrough && filter.size() == 1;
}
/**
* Method to check if the filter dose not filter collections
* @return true if the filter is a pass-through filter
*/
bool isPassThroughFilter() const {
return passthrough;
}
/**
* Method to check if the filter represents a collection filter, one which
* allows a subset of collections. Note a legacy filter is different, it
* allows only the default collection and no system events. Whereas this
* returns true is a subset of collections is permitted and their system
* events.
*/
bool isCollectionFilter() const {
return !isPassThroughFilter() && !isLegacyFilter();
}
/**
* Dump this to std::cerr
*/
void dump() const;
protected:
/**
* Constructor helper method for parsing the JSON
* @return first:success or unknown_collection/unknown_scope, second
* manifest-ID when first is != success.
*/
[[nodiscard]] std::pair<cb::engine_errc, uint64_t> constructFromJson(
const nlohmann::json& json,
const Collections::VB::Manifest& manifest);
/**
* Private helper to examine the given collection object against the
* manifest and add to internal container or throw an exception
* @return true if collection is known and added
*/
[[nodiscard]] bool addCollection(const nlohmann::json& object,
const ::Collections::VB::ReadHandle& rh);
/**
* Private helper to examine the given scope object against the manifest and
* add the associated collections to the internal container
* @return true if scope is known and added
*/
[[nodiscard]] bool addScope(const nlohmann::json& object,
const ::Collections::VB::ReadHandle& rh);
/**
* Does the filter allow the system event? I.e. a "meat,dairy" filter
* shouldn't allow delete events for the "fruit" collection.
*
* May update the filter if we are filtering on scopes and the event is
* an add or delete collection.
*
* @param item a SystemEventMessage to check
* @param return true if the filter says this event should be allowed
*/
bool checkAndUpdateSystemEvent(const Item& item);
/// Non-inline, slow path of checkAndUpdate().
bool checkAndUpdateSlow(Item& item);
/// Non-inline slow path of check(key)
bool checkSlow(DocKey key) const;
/**
* Remove the collection of the item from the filter
*
* @param item a SystemEventMessage to check
* @return true if a collection was removed from this filter
*/
bool remove(const Item& item);
/**
* Called Item represents a collection system event
*/
bool processCollectionEvent(const Item& item);
/**
* Called Item represents a scope system event
*/
bool processScopeEvent(const Item& item);
/**
* Enable the default collection at construction time
*/
void enableDefaultCollection();
/**
* Disable the default collection
*/
void disableDefaultCollection();
/**
* Insert the collection, will toggle defaultAllowed if found
*/
void insertCollection(CollectionID cid, ScopeID sid);
Container filter;
std::optional<ScopeID> scopeID;
// use an optional so we don't use any special values to mean unset
std::optional<uint32_t> lastCheckedPrivilegeRevision;
cb::mcbp::DcpStreamId streamId = {};
bool scopeIsDropped = false;
bool defaultAllowed = false;
bool passthrough = false;
bool systemEventsAllowed = false;
friend std::ostream& operator<<(std::ostream& os, const Filter& filter);
// keys and types used in JSON parsing
static const char* CollectionsKey;
static const char* ScopeKey;
static const char* UidKey;
static const char* StreamIdKey;
};
std::ostream& operator<<(std::ostream& os, const Filter& filter);
} // namespace Collections::VB