/
kvstore.h
1280 lines (1109 loc) · 43.4 KB
/
kvstore.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2010 Couchbase, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "callbacks.h"
#include "collections/eraser_context.h"
#include "collections/kvstore.h"
#include <memcached/engine_common.h>
#include <utilities/hdrhistogram.h>
#include <relaxed_atomic.h>
#include <atomic>
#include <chrono>
#include <cstring>
#include <list>
#include <map>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
/* Forward declarations */
class BucketLogger;
class DiskDocKey;
class Item;
class KVStore;
class KVStoreConfig;
class PersistenceCallback;
class RollbackCB;
class RollbackResult;
namespace cb::mcbp {
class Request;
} // namespace cb::mcbp
namespace VB {
class Commit;
} // namespace VB
namespace Collections::VB {
struct PersistedStats;
} // namespace Collections::VB
class vb_bgfetch_item_ctx_t;
struct TransactionContext;
using vb_bgfetch_queue_t =
std::unordered_map<DiskDocKey, vb_bgfetch_item_ctx_t>;
enum class GetMetaOnly { Yes, No };
using BloomFilterCBPtr = std::shared_ptr<Callback<Vbid&, const DocKey&, bool&>>;
using ExpiredItemsCBPtr = std::shared_ptr<Callback<Item&, time_t&>>;
enum class SnapshotSource { Historical, Head };
/**
* Generic information about a KVStore file
*/
struct FileInfo {
/// The number of items stored
uint64_t items = 0;
/// The number of deleted item stored
uint64_t deletedItems = 0;
/// The size on disk of the KVStore file
uint64_t size = 0;
/// Last purge sequence number
uint64_t purgeSeqno = 0;
};
struct CompactionStats {
size_t collectionsItemsPurged = 0;
size_t collectionsDeletedItemsPurged = 0;
uint64_t tombstonesPurged = 0;
uint64_t preparesPurged = 0;
uint64_t prepareBytesPurged = 0;
FileInfo pre;
FileInfo post;
};
struct CompactionConfig {
uint64_t purge_before_ts = 0;
uint64_t purge_before_seq = 0;
uint8_t drop_deletes = 0;
bool retain_erroneous_tombstones = false;
bool operator==(const CompactionConfig& c) const;
bool operator!=(const CompactionConfig& c) const {
return !(*this == c);
}
};
struct CompactionContext {
CompactionContext(Vbid vbid,
const CompactionConfig& config,
uint64_t purgeSeq)
: vbid(vbid), compactConfig(config), max_purged_seq(purgeSeq) {
}
Vbid vbid;
/// The configuration for this compaction.
const CompactionConfig compactConfig;
uint64_t max_purged_seq;
BloomFilterCBPtr bloomFilterCallback;
ExpiredItemsCBPtr expiryCallback;
struct CompactionStats stats;
/// pointer as context cannot be constructed until deeper inside storage
std::unique_ptr<Collections::VB::EraserContext> eraserContext;
Collections::KVStore::DroppedCb droppedKeyCb =
[](const DiskDocKey&, int64_t, bool, int64_t) {};
/**
* A function to call on completion of compaction (before we swap our files)
* to correctly set in memory state such as the purge seqno.
*/
std::function<void(CompactionContext&)> completionCallback;
/// The SyncRepl HCS, can purge any prepares before the HCS.
uint64_t highCompletedSeqno = 0;
};
using MakeCompactionContextCallback =
std::function<std::shared_ptr<CompactionContext>(
Vbid, CompactionConfig&, uint64_t)>;
struct kvstats_ctx {
explicit kvstats_ctx(VB::Commit& commitData) : commitData(commitData) {
}
/// flusher data for managing manifest changes, item counts, vbstate
VB::Commit& commitData;
/**
* Delta of onDiskPrepares that we should add to the value tracked in
* the persisted VB state before commit
*/
size_t onDiskPrepareDelta = 0;
/**
* Delta of onDiskPrepareBytes that we should add to the value tracked in
* the persisted VB state before commit.
*/
ssize_t onDiskPrepareBytesDelta = 0;
};
class NoLookupCallback : public StatusCallback<CacheLookup> {
public:
void callback(CacheLookup&) override {
}
};
struct DBFileInfo {
/// Total size of the file (what 'stat()' would return). Includes both
/// current data (spaceUsed) plus any previous data which is no longer
/// referenced in current file header.
uint64_t fileSize = 0;
/// Total size of "current" data in the file - sum of all
/// keys+metdata+values (included deleted docs) plus overheads to manage it
/// (indexes such as B-Trees, headers etc).
uint64_t spaceUsed = 0;
/// Total size of all SyncWrite prepares, both completed and pending.
/// This can be used to adjust spaceUsed to give an estimate of how much
/// data in the file is actually needed - completed prepares are no
/// longer needed and can be purged during compaction - as such they can
/// be considered part of the "Fragmented" count.
uint64_t prepareBytes = 0;
/**
* @returns An estimate of the number of bytes which are "live" data and
* hence are not subject to being discarded during compactionn. This
* is calculated as the size of the current data (spaceUsed), minus an
* estimate of the size of completed prepares (which will be purged on
* compaction).
* Note: All prepared SyncWrites (completed and in-progress) are used as
* an estimate for completed sync writes, given (a) it's difficult
* to track exactly how any prepares have been completed and (b)
* in general we expect the overwhelming majority of on-disk prepares
* to be completed.
*/
uint64_t getEstimatedLiveData() const {
if (spaceUsed > prepareBytes) {
// Sanity check - if totalOnDiskPrepareSize is somehow larger than
// spaceUsed then skip the adjustment.
return spaceUsed - prepareBytes;
}
return spaceUsed;
}
};
enum scan_error_t {
scan_success,
scan_again,
scan_failed
};
enum class DocumentFilter {
ALL_ITEMS,
NO_DELETES,
ALL_ITEMS_AND_DROPPED_COLLECTIONS
};
/**
* When fetching documents from disk, what form should the value be returned?
*/
enum class ValueFilter {
/// Only return the key & metadata (no value).
KEYS_ONLY,
/// Return key & metadata, and value. If value is compressed then return
/// in compressed form.
VALUES_COMPRESSED,
/// Return key & metadata, and value. Value will be returned uncompressed.
VALUES_DECOMPRESSED
};
struct vbucket_state;
/**
* Abstract file handle class to allow a DB file to be opened and held open
* for multiple KVStore methods.
*/
class KVFileHandle {
public:
virtual ~KVFileHandle() = default;
};
class ScanContext {
public:
ScanContext(Vbid vbid,
std::unique_ptr<KVFileHandle> handle,
DocumentFilter docFilter,
ValueFilter valFilter,
std::unique_ptr<StatusCallback<GetValue>> cb,
std::unique_ptr<StatusCallback<CacheLookup>> cl,
const std::vector<Collections::KVStore::DroppedCollection>&
droppedCollections,
int64_t maxSeqno);
virtual ~ScanContext() = default;
const StatusCallback<GetValue>& getValueCallback() const {
return *callback;
}
StatusCallback<GetValue>& getValueCallback() {
return *callback;
}
const StatusCallback<CacheLookup>& getCacheCallback() const {
return *lookup;
}
StatusCallback<CacheLookup>& getCacheCallback() {
return *lookup;
}
const Vbid vbid;
int64_t lastReadSeqno{0};
const std::unique_ptr<KVFileHandle> handle;
const DocumentFilter docFilter;
const ValueFilter valFilter;
std::unique_ptr<StatusCallback<GetValue>> callback;
std::unique_ptr<StatusCallback<CacheLookup>> lookup;
BucketLogger* logger;
const Collections::VB::ScanContext collectionsContext;
int64_t maxSeqno;
};
class BySeqnoScanContext : public ScanContext {
public:
BySeqnoScanContext(
std::unique_ptr<StatusCallback<GetValue>> cb,
std::unique_ptr<StatusCallback<CacheLookup>> cl,
Vbid vb,
std::unique_ptr<KVFileHandle> handle,
int64_t start,
int64_t end,
uint64_t purgeSeqno,
DocumentFilter _docFilter,
ValueFilter _valFilter,
uint64_t _documentCount,
const vbucket_state& vbucketState,
const std::vector<Collections::KVStore::DroppedCollection>&
droppedCollections,
std::optional<uint64_t> timestamp = {});
const int64_t startSeqno;
const uint64_t purgeSeqno;
const uint64_t documentCount;
/**
* The highest seqno of a mutation or commit on disk. Used for backfill
* for non sync-write aware connections as the snapshot end to ensure the
* snapshot end matches the last item sent (aborts and prepares are skipped
* for such connections).
*/
const uint64_t maxVisibleSeqno;
/**
* The on disk "High Completed Seqno". This number changes in different ways
* when compared to the one in memory so has been named differently. The
* seqno will be read from disk and sent to a replica in a snapshot marker
* so that we can optimise warmup after having received a disk snapshot.
* This is necessary due to de-duplication as a replica will see logical
* commits out of order. It cannot update the HCS value reliably with the
* information received and perform the warmup optimisation so the active
* node will send a persistedCompletedSeqno value which it will write at the
* end of the snapshot. This seqno is also used to optimise local warmup.
*/
const uint64_t persistedCompletedSeqno;
/// Timestamp for the data (if available)
const std::optional<uint64_t> timestamp;
};
/**
* ByIdRange describes a sub-set of 'keys' from the lexicographically ordered
* ById index.
* keys = {k | k >= startKey and k < endKey}
* E.g. startKey="b" and endKey="c" when the ById index is:
* {"a", "b", "ba", "bb", "c" }
* yields:
* {"b", "ba", "bb"}
*/
struct ByIdRange {
ByIdRange(DiskDocKey start, DiskDocKey end)
: startKey(std::move(start)), endKey(std::move(end)) {
}
DiskDocKey startKey;
DiskDocKey endKey;
bool rangeScanSuccess{false};
};
class ByIdScanContext : public ScanContext {
public:
ByIdScanContext(std::unique_ptr<StatusCallback<GetValue>> cb,
std::unique_ptr<StatusCallback<CacheLookup>> cl,
Vbid vb,
std::unique_ptr<KVFileHandle> handle,
std::vector<ByIdRange> ranges,
DocumentFilter _docFilter,
ValueFilter _valFilter,
const std::vector<Collections::KVStore::DroppedCollection>&
droppedCollections,
int64_t maxSeqno);
std::vector<ByIdRange> ranges;
// Key should be set by KVStore when a scan must be paused, this is where
// a scan can resume from
DiskDocKey lastReadKey;
};
struct FileStats {
FileStats() = default;
// Read time length
Hdr1sfMicroSecHistogram readTimeHisto;
// Distance from last read
Hdr1sfInt32Histogram readSeekHisto;
// Size of read
Hdr1sfInt32Histogram readSizeHisto;
// Write time length
Hdr1sfMicroSecHistogram writeTimeHisto;
// Write size
Hdr1sfInt32Histogram writeSizeHisto;
// Time spent in sync
Hdr1sfMicroSecHistogram syncTimeHisto;
// Read count per open() / close() pair
Hdr1sfInt32Histogram readCountHisto;
// Write count per open() / close() pair
Hdr1sfInt32Histogram writeCountHisto;
// total bytes read from disk.
cb::RelaxedAtomic<size_t> totalBytesRead{0};
// Total bytes written to disk.
cb::RelaxedAtomic<size_t> totalBytesWritten{0};
size_t getMemFootPrint() const;
void reset();
};
/**
* Stats and timings for KVStore
*/
class KVStoreStats {
public:
KVStoreStats();
/// Resets all statistics to their initial vaule.
void reset();
// the number of docs committed
cb::RelaxedAtomic<size_t> docsCommitted;
// the number of open() calls
cb::RelaxedAtomic<size_t> numOpen;
// the number of close() calls
cb::RelaxedAtomic<size_t> numClose;
// the number of vbuckets loaded
cb::RelaxedAtomic<size_t> numLoadedVb;
//stats tracking failures
cb::RelaxedAtomic<size_t> numCompactionFailure;
cb::RelaxedAtomic<size_t> numGetFailure;
cb::RelaxedAtomic<size_t> numSetFailure;
cb::RelaxedAtomic<size_t> numDelFailure;
cb::RelaxedAtomic<size_t> numOpenFailure;
cb::RelaxedAtomic<size_t> numVbSetFailure;
/**
* Number of documents read (full and meta-only) from disk for background
* fetch operations.
*/
cb::RelaxedAtomic<size_t> io_bg_fetch_docs_read;
//! Number of logical write operations (i.e. one per saved doc; not
// considering how many actual pwrite() calls were made).
cb::RelaxedAtomic<size_t> io_num_write;
//! Document bytes (key+meta+value) read for background fetch operations.
cb::RelaxedAtomic<size_t> io_bgfetch_doc_bytes;
//! Number of bytes written (key + value + application rev metadata)
cb::RelaxedAtomic<size_t> io_document_write_bytes;
/* for flush and vb delete, no error handling in KVStore, such
* failure should be tracked in MC-engine */
// How long it takes us to complete a read
Hdr1sfMicroSecHistogram readTimeHisto;
// How big are our reads?
Hdr1sfInt32Histogram readSizeHisto;
// How long it takes us to complete a write
Hdr1sfMicroSecHistogram writeTimeHisto;
// Number of logical bytes written to disk for each document saved
// (document key + meta + value).
Hdr1sfInt32Histogram writeSizeHisto;
// Time spent in delete() calls.
Hdr1sfMicroSecHistogram delTimeHisto;
// Time spent in commit
Hdr1sfMicroSecHistogram commitHisto;
// Time spent in compaction
Hdr1sfMicroSecHistogram compactHisto;
// Time spent in saving documents to disk
Hdr1sfMicroSecHistogram saveDocsHisto;
// Batch size while saving documents
Hdr1sfInt32Histogram batchSize;
//Time spent in vbucket snapshot
Hdr1sfMicroSecHistogram snapshotHisto;
// Count and histogram filesystem read()s per getMulti() request
cb::RelaxedAtomic<size_t> getMultiFsReadCount;
Hdr1sfInt32Histogram getMultiFsReadHisto;
// Histogram of filesystem read()s per getMulti() request, divided by
// the number of documents fetched; gives an average read() count
// per fetched document.
Hdr1sfInt32Histogram getMultiFsReadPerDocHisto;
/// Histogram of disk Write Amplification ratios for each batch of items
/// flushed to disk (each saveDocs() call).
/// Encoded as integer, by multipling the floating-point ratio by 10 -
// e.g. ratio of 3.3 -> 33
HdrHistogram flusherWriteAmplificationHisto{
0, 1000, 2, HdrHistogram::Iterator::IterMode::Percentiles};
// Stats from the underlying OS file operations
FileStats fsStats;
// Underlying stats for OS file operations during compaction
FileStats fsStatsCompaction;
size_t getMemFootPrint() const {
return readTimeHisto.getMemFootPrint() +
readSizeHisto.getMemFootPrint() +
writeTimeHisto.getMemFootPrint() +
writeSizeHisto.getMemFootPrint() +
delTimeHisto.getMemFootPrint() + compactHisto.getMemFootPrint() +
snapshotHisto.getMemFootPrint() + commitHisto.getMemFootPrint() +
saveDocsHisto.getMemFootPrint() + batchSize.getMemFootPrint() +
getMultiFsReadHisto.getMemFootPrint() +
getMultiFsReadPerDocHisto.getMemFootPrint() +
fsStats.getMemFootPrint() + fsStatsCompaction.getMemFootPrint() +
flusherWriteAmplificationHisto.getMemFootPrint();
}
};
/**
* Properties of the storage layer.
*
* If concurrent filesystem access is possible, maxConcurrency() will
* be greater than one. One will need to determine whether more than
* one writer is possible as well as whether more than one reader is
* possible.
*/
class StorageProperties {
public:
enum class EfficientVBDump : bool { Yes, No };
enum class EfficientVBDeletion : bool { Yes, No };
enum class PersistedDeletion : bool { Yes, No };
enum class EfficientGet : bool { Yes, No };
/**
* Does the KVStore allow externally driven compactions (driven via
* ns_server/EPBucket) whilst we do writes?
*/
enum class ConcurrentWriteCompact : bool { Yes, No };
enum class ByIdScan : bool { Yes, No };
StorageProperties(EfficientVBDump evb,
EfficientVBDeletion evd,
PersistedDeletion pd,
EfficientGet eget,
ConcurrentWriteCompact cwc,
ByIdScan byIdScan)
: efficientVBDump(evb),
efficientVBDeletion(evd),
persistedDeletions(pd),
efficientGet(eget),
concWriteCompact(cwc),
byIdScan(byIdScan) {
}
/* True if we can efficiently dump a single vbucket */
bool hasEfficientVBDump() const {
return (efficientVBDump == EfficientVBDump::Yes);
}
/* True if we can efficiently delete a vbucket all at once */
bool hasEfficientVBDeletion() const {
return (efficientVBDeletion == EfficientVBDeletion::Yes);
}
/* True if we can persist deletions to disk */
bool hasPersistedDeletions() const {
return (persistedDeletions == PersistedDeletion::Yes);
}
/* True if we can batch-process multiple get operations at once */
bool hasEfficientGet() const {
return (efficientGet == EfficientGet::Yes);
}
/* True if the underlying storage supports concurrent writing
* and compacting */
bool hasConcWriteCompact() const {
return (concWriteCompact == ConcurrentWriteCompact::Yes);
}
bool hasByIdScan() const {
return byIdScan == ByIdScan::Yes;
}
private:
EfficientVBDump efficientVBDump;
EfficientVBDeletion efficientVBDeletion;
PersistedDeletion persistedDeletions;
EfficientGet efficientGet;
ConcurrentWriteCompact concWriteCompact;
ByIdScan byIdScan;
};
/**
* Base class representing kvstore operations.
*/
class KVStore {
public:
/// Result of flushing a Deletion, passed to the PersistenceCallback.
enum class FlushStateDeletion { Delete, DocNotFound, Failed };
/// Result of flushing a Mutation, passed to the PersistenceCallback.
enum class FlushStateMutation { Insert, Update, Failed };
explicit KVStore(bool read_only = false);
virtual ~KVStore();
/**
* Called when the engine is going away so we can shutdown any backend tasks
* the underlying store create to prevent them from racing with destruction.
*/
virtual void deinitialize() {
}
/**
* Allow the kvstore to add extra statistics information
* back to the client
* @param prefix prefix to use for the stats
* @param add_stat the callback function to add statistics
* @param c the cookie to pass to the callback function
* @param args are additional arguments to be parsed, can be empty
*/
virtual void addStats(const AddStatFn& add_stat,
const void* c,
const std::string& args);
/**
* Request the specified statistic name from the kvstore.
*
* @param name The name of the statistic to fetch.
* @param[out] value Value of the given stat (if exists).
* @return True if the stat exists, is of type size_t and was successfully
* returned, else false.
*/
virtual bool getStat(const char* name, size_t& value) {
return false;
}
/**
* Show kvstore specific timing stats.
*
* @param add_stat the callback function to add statistics
* @param c the cookie to pass to the callback function
*/
virtual void addTimingStats(const AddStatFn& add_stat, const void* c);
/**
* Resets kvstore specific stats
*/
void resetStats() {
st.reset();
}
size_t getMemFootPrint() {
return st.getMemFootPrint();
}
/**
* Reset the vbucket to a clean state.
*/
virtual void reset(Vbid vbid) = 0;
/**
* Begin a transaction (if not already in one).
*
* @param txCtx A transaction context to associate with this transaction.
* The context will be passed to each operations' completion
* callback, so this can be used to hold state common to the entire
* transaction without having to duplicate it in every Callback.
*
* @return false if we cannot begin a transaction
*/
bool begin(std::unique_ptr<TransactionContext> txCtx);
/**
* Commit a transaction (unless not currently in one).
*
* @param commitData a reference to a VB::Commit object which is required
* for persisted metadata updates and collection item counting
* @return false if the commit fails
*/
virtual bool commit(VB::Commit& commitData) = 0;
/**
* Rollback the current transaction.
*/
virtual void rollback() = 0;
/**
* Get the properties of the underlying storage.
*/
virtual StorageProperties getStorageProperties() = 0;
/**
* Set an item into the kv store. cc
*
* @param item The item to store
* @param cb Callback object which will be invoked when the set() has been
* persisted to disk.
*/
virtual void set(queued_item item) = 0;
/**
* Get an item from the kv store.
* @param key The document key to fetch.
* @param vb The vbucket to fetch from.
* @param filter In what form should the item be fetched?
* Item::getDatatype() will reflect the format they are returned in.
*/
virtual GetValue get(const DiskDocKey& key,
Vbid vb,
ValueFilter filter) = 0;
/**
* Convenience version of get() which fetches the value uncompressed.
*/
GetValue get(const DiskDocKey& key, Vbid vb) {
return get(key, vb, ValueFilter::VALUES_DECOMPRESSED);
}
/**
* Retrieve the document with a given key from the underlying storage
* @param kvFileHandle the open file to get from
* @param key the key of a document to be retrieved
* @param vb vbucket id of a document
* @param filter In what form should the item be fetched?
* Item::getDatatype() will reflect the format they are returned in.
* @return the result of the get
*/
virtual GetValue getWithHeader(const KVFileHandle& kvFileHandle,
const DiskDocKey& key,
Vbid vb,
ValueFilter filter) = 0;
/**
* Set the max bucket quota to the given size.
*
* @param size The new max bucket quota size.
*/
virtual void setMaxDataSize(size_t size) {
// Might be overloaded to do some work
}
/**
* Retrieve multiple documents from the underlying storage system at once.
*
* @param vb vbucket id of a document
* @param itms list of items whose documents are going to be retrieved.
*/
virtual void getMulti(Vbid vb, vb_bgfetch_queue_t& itms) {
throw std::runtime_error("Backend does not support getMulti()");
}
/**
* Callback for getRange().
* @param value The fetched value. Note r-value receiver can modify (e.g.
* move-from) it if desired.
*/
using GetRangeCb = std::function<void(GetValue&& value)>;
/**
* Get a range of items from a single vBucket
* (if supported by the kv store).
*
* Searches the given vBucket for all items with keys in the half-open
* range [startKey,endKey). For each item found invokes the given callback.
*
* @param vb vBucket id to fetch from.
* @param startKey The key to start searching at. Search includes this key.
* @param endKey The key to end searching at. Search excludes this key.
* @param callback Callback invoked for each key found.
* @throws std::runtime_error if the range scan could not be successfully
* completed. (Note: finding zero docments in the given range is
* considered successful).
*/
virtual void getRange(Vbid vb,
const DiskDocKey& startKey,
const DiskDocKey& endKey,
const GetRangeCb& cb) {
throw std::runtime_error("Backend does not support getRange()");
}
/**
* Delete an item from the kv store.
*
* @param item The item to delete
*/
virtual void del(queued_item item) = 0;
/**
* Delete a given vbucket database instance from underlying storage
*
* @param vbucket vbucket id
* @param fileRev the revision of the file to delete
*/
virtual void delVBucket(Vbid vbucket, uint64_t fileRev) = 0;
/**
* Get a list of all persisted vbuckets (with their states).
*/
virtual std::vector<vbucket_state *> listPersistedVbuckets() = 0;
/**
* Get a list of all persisted engine and DCP stats. This API is mainly
* invoked during warmup to get the engine stats from the previous session.
*
* @param stats map instance where the engine stats from the previous
* session is stored.
*/
virtual void getPersistedStats(std::map<std::string, std::string> &stats) {
(void) stats;
}
/**
* Persist a snapshot of a collection of stats.
*/
bool snapshotStats(const std::map<std::string, std::string> &m);
/**
* Snapshot vbucket state
* @param vbucketId id of the vbucket that needs to be snapshotted
* @param vbstate state of the vbucket
* @param cb stats callback
*/
virtual bool snapshotVBucket(Vbid vbucketId,
const vbucket_state& vbstate) = 0;
/**
* Compact a database file.
*
* @param vbLock a lock to serialize compaction and flusher to the
* specific vbucket. When called the lock is _HELD_
* so engines who don't need exclusive access should
* release the lock. The lock may be held or released
* upon return, the caller will take the appropriate action.
* @param c shared_ptr to the CompactionContext that includes various
* callbacks and compaction parameters
* @return true if the compaction was successful
*/
virtual bool compactDB(std::unique_lock<std::mutex>& vbLock,
std::shared_ptr<CompactionContext> c) = 0;
virtual vbucket_state* getVBucketState(Vbid vbid) = 0;
void setVBucketState(Vbid vbid, const vbucket_state& vbs);
/**
* Get the number of deleted items that are persisted to a vbucket file
*
* @param vbid The vbucket if of the file to get the number of deletes for.
* @returns the number of deletes which are persisted
* @throws std::runtime_error (and subclasses) if it was not possible to
* obtain a count of persisted deletes.
*/
virtual size_t getNumPersistedDeletes(Vbid vbid) = 0;
/**
* This method will return information about the file whose id
* is passed in as an argument. The information returned contains
* the item count, file size and space used.
*
* @throws std::runtime_error (and subclasses) if it was not possible to
* obtain the DB file info.
*/
virtual DBFileInfo getDbFileInfo(Vbid dbFileId) = 0;
/**
* This method will return file size and space used for the
* entire KV store
*/
virtual DBFileInfo getAggrDbFileInfo() = 0;
/**
* This method will return the total number of items in the vbucket
*
* vbid - vbucket id
*/
virtual size_t getItemCount(Vbid vbid) = 0;
/**
* Rollback the specified vBucket to the state it had at rollbackseqno.
*
* On success, the vBucket should have discarded *at least* back to the
* specified rollbackseqno; if necessary it is valid to rollback further.
* A minimal implementation is permitted to rollback to zero.
*
* @param vbid VBucket to rollback
* @param rollbackseqno Sequence number to rollback to (minimum).
* @param cb For each mutation which has been rolled back (i.e. from the
* selected rollback point to the latest); invoke this callback with the Key
* of the now-discarded update. Callers can use this to undo the effect of
* the discarded updates on their in-memory view.
* @return success==true and details of the sequence numbers after rollback
* if rollback succeeded; else false.
*/
virtual RollbackResult rollback(Vbid vbid,
uint64_t rollbackseqno,
std::unique_ptr<RollbackCB>) = 0;
/**
* This method is called before persisting a batch of data if you'd like to
* do stuff to them that might improve performance at the IO layer.
*/
void optimizeWrites(std::vector<queued_item>& items);
/**
* This method is called after persisting a batch of data to perform any
* pending tasks on the underlying KVStore instance.
*/
virtual void pendingTasks() = 0;
uint64_t getLastPersistedSeqno(Vbid vbid);
bool isReadOnly() const {
return readOnly;
}
bool isReadWrite() const {
return !isReadOnly();
}
KVStoreStats& getKVStoreStat() {
return st;
}
/**
* Get all_docs API, to return the list of all keys in the store
* @param vbid vbucket id of which to collect keys from
* @param start_key key of where to start the scan from once found all keys
* after this should be returned.
* @param count the max number of keys that should be collected by kvstore
* implementation for this vbucket
* @param cb shared pointer to a callback function
* @return engine status code
*/
virtual ENGINE_ERROR_CODE getAllKeys(
Vbid vbid,
const DiskDocKey& start_key,
uint32_t count,
std::shared_ptr<StatusCallback<const DiskDocKey&>> cb) = 0;
/// Does the backend support historical snapshots
virtual bool supportsHistoricalSnapshots() const {
return false;
}
/**
* Create a KVStore seqno range Scan Context with the given options.
* On success, returns a unique_pointer to the ScanContext. The caller can
* then call scan() to execute the scan. The scan context is locked
* to a single version (snapshot) of the database (it does not change
* while the scan is running). The snapshot may either be "historical"
* (returns all of the data (from start seqno) up to the oldest snapshot
* available containing start seqno), or it may be "current" containing
* all of the data "right now").
*
* The caller specifies two callback objects - GetValue and CacheLookup:
*
* 1. GetValue callback is invoked for each object loaded from disk, for
* the caller to process that item.
* If the callback has status ENGINE_SUCCESS then scanning continues.
* If the callback has status ENGINE_ENOMEM then the scan is paused -
* scan() returns early allowing caller to reduce memory pressure.
* If scan() is called again it will resume at the _same_ item which
* returned ENGINE_ENOMEM last time.
*
* 2. CacheLookup callback an an optimization to avoid loading data from
* disk for already-resident items - it is invoked _before_ loading the
* item's value from disk, to give ep-engine's in-memory cache the
* opportunity to fulfill the item (assuming the item is in memory).
* If this callback has status ENGINE_KEY_EEXISTS then the document is
* considered to have been handled purely from memory and the GetValue
* callback is skipped.
* If this callback has status ENGINE_SUCCESS then it wasn't fulfilled
* from memory, and will instead be loaded from disk and GetValue
* callback invoked.
*
* @param cb GetValue callback - ownership passes to the returned object
* @param cl Cache lookup callback - ownership passes to the returned object
* @param vbid The vbucket to scan
* @param startSeqno The seqno to begin scanning from
* @param options DocumentFilter for the scan - e.g. return deleted items
* @param valOptions ValueFilter - e.g. return the document body
* @param source - Should a historical or the current head be used
* @return a BySeqnoScanContext, null if there's an error
*/
virtual std::unique_ptr<BySeqnoScanContext> initBySeqnoScanContext(
std::unique_ptr<StatusCallback<GetValue>> cb,
std::unique_ptr<StatusCallback<CacheLookup>> cl,
Vbid vbid,
uint64_t startSeqno,
DocumentFilter options,
ValueFilter valOptions,
SnapshotSource source) = 0;
/**
* Create a KVStore id range Scan Context with the given options.
* On success, returns a unique_pointer to the ScanContext. The caller can
* then call scan() to execute the scan.
*
* The caller specifies two callback objects - GetValue and CacheLookup:
*
* 1. GetValue callback is invoked for each object loaded from disk, for
* the caller to process that item.
* 2. CacheLookup callback an an optimization to avoid loading data from
* disk for already-resident items - it is invoked _before_ loading the
* item's value from disk, to give ep-engine's in-memory cache the
* opportunity to fulfil the item (assuming the item is in memory).