/
checkpoint_manager.h
721 lines (623 loc) · 27.6 KB
/
checkpoint_manager.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2018 Couchbase, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "checkpoint_types.h"
#include "cursor.h"
#include "ep_types.h"
#include "monotonic.h"
#include "queue_op.h"
#include "vbucket.h"
#include <memcached/engine_common.h>
#include <memcached/vbucket.h>
#include <memory>
#include <optional>
#include <unordered_map>
#include <utility>
class Checkpoint;
class CheckpointConfig;
class CheckpointCursor;
class EPStats;
class PreLinkDocumentContext;
class VBucket;
template <typename... RV>
class Callback;
using LockHolder = std::lock_guard<std::mutex>;
/**
* snapshot_range_t + a HCS for flushing to disk from Disk checkpoints which
* is required as we can't work out a correct PCS on a replica due to de-dupe.
*/
struct CheckpointSnapshotRange {
// Getters for start and end to allow us to use this in the same way as a
// normal snapshot_range_t
uint64_t getStart() const {
return range.getStart();
}
uint64_t getEnd() const {
return range.getEnd();
}
snapshot_range_t range;
// HCS that should be flushed. Currently should only be set for Disk
// Checkpoint runs.
std::optional<uint64_t> highCompletedSeqno = {};
// HPS that should be flushed when the entire range has been persisted.
// This is the seqno of the latest prepare in this checkpoint.
std::optional<uint64_t> highPreparedSeqno = {};
};
/**
* Representation of a checkpoint manager that maintains the list of checkpoints
* for a given vbucket.
*/
class CheckpointManager {
friend class Checkpoint;
friend class EventuallyPersistentEngine;
friend class Consumer;
friend class CheckpointManagerTestIntrospector;
public:
using FlusherCallback = std::shared_ptr<Callback<Vbid>>;
/// Return type of getNextItemsForCursor()
struct ItemsForCursor {
ItemsForCursor() = default;
ItemsForCursor(CheckpointType checkpointType,
std::optional<uint64_t> maxDeletedRevSeqno,
std::optional<uint64_t> highCompletedSeqno,
uint64_t visibleSeqno)
: checkpointType(checkpointType),
maxDeletedRevSeqno(maxDeletedRevSeqno),
highCompletedSeqno(highCompletedSeqno),
visibleSeqno(visibleSeqno) {
}
std::vector<CheckpointSnapshotRange> ranges;
bool moreAvailable = {false};
/**
* The natural place for this is CheckpointSnapshotRange, as this is per
* snapshot. Originally placed here as CM::getNextItemsForCursor() never
* returns multiple snapshots of different types.
*/
CheckpointType checkpointType = CheckpointType::Memory;
std::optional<uint64_t> maxDeletedRevSeqno = {};
/**
* HCS that must be sent to Replica when the Active is streaming a
* Disk Checkpoint. The same as checkpoint-type, the natural place for
* this is CheckpointSnapshotRange, where we already have it.
*
* I am not re-using the member in ranges for:
* 1) keeping this change smaller, as the code in ActiveStream would
* require changes for dealing with it
* 2) highlighing the fact that here we expect a *single* range when we
* use this member
*
* The correctness of the latter is ensured by the fact that
* CM::getNextItemsForCursor() never returns multiple Disk Checkpoints.
*
* @todo: This member should be removed (and the one in SnapRange used)
* as soon as we refactor the DCP stream code in CheckpointManager and
* ActiveStream.
*/
std::optional<uint64_t> highCompletedSeqno;
/**
* The max visible seqno for first Checkpoint returned, e.g. if multiple
* Checkpoints are returned as follows:
* cp1[mutation:1, prepare:2] cp2[mutation:3,mutation:4]
* This value would be 1
*/
uint64_t visibleSeqno;
/// Set only for persistence cursor, resets the CM state after flush.
UniqueFlushHandle flushHandle;
};
/// Return type of expelUnreferencedCheckpointItems()
struct ExpelResult {
size_t expelCount = {0};
size_t estimateOfFreeMemory = {0};
};
CheckpointManager(EPStats& st,
Vbid vbucket,
CheckpointConfig& config,
int64_t lastSeqno,
uint64_t lastSnapStart,
uint64_t lastSnapEnd,
uint64_t maxVisibleSeqno,
FlusherCallback cb);
uint64_t getOpenCheckpointId();
uint64_t getLastClosedCheckpointId();
void setOpenCheckpointId(uint64_t id);
/**
* Remove closed unreferenced checkpoints and return them through the
* vector.
* @param vbucket the vbucket that this checkpoint manager belongs to.
* @param newOpenCheckpointCreated the flag indicating if the new open
* checkpoint was created as a result of running this function.
* @return the number of non-meta items that are purged from checkpoint
* @param limit Max number of checkpoint that can be removed.
* No limit by default, overidden only for testing.
*/
size_t removeClosedUnrefCheckpoints(
VBucket& vbucket,
bool& newOpenCheckpointCreated,
size_t limit = std::numeric_limits<size_t>::max());
/**
* Attempt to expel (i.e. eject from memory) items in the oldest checkpoint
* that still has cursor registered in it. This is to help avoid very large
* checkpoints which consume a large amount of memory.
* @returns ExpelResult - this is a structure containing two elements. The
* first element is the number of that have been expelled. The second
* element is an estimate of the amount of memory that will be recovered.
*/
ExpelResult expelUnreferencedCheckpointItems();
/**
* Register the cursor for getting items whose bySeqno values are between
* startBySeqno and endBySeqno, and close the open checkpoint if endBySeqno
* belongs to the open checkpoint.
* @param startBySeqno start bySeqno.
* @return Cursor registration result which consists of (1) the bySeqno with
* which the cursor can start and (2) flag indicating if the cursor starts
* with the first item on a checkpoint.
*/
CursorRegResult registerCursorBySeqno(const std::string& name,
uint64_t startBySeqno);
/**
* Remove the cursor for a given connection.
* @param pointer to the clients cursor, can be null and is non constant
* so currentCheckpoint member can be set to checkpointList.end() to prevent
* further use of currentCheckpoint iterator.
* @return true if the cursor is removed successfully.
*/
bool removeCursor(CheckpointCursor* cursor);
/**
* Removes the backup persistence cursor created at getItemsForCursor().
*/
void removeBackupPersistenceCursor();
/**
* Moves the pcursor back to the backup cursor.
* Note that:
* 1) it is logical move, the function has constant complexity
* 2) the backup cursor is logically removed (as it becomes the new
* pcursor)
* @return aggregated flush stats to roll back the VBucket counters by
*/
VBucket::AggregatedFlushStats resetPersistenceCursor();
/**
* Queue an item to be written to persistent layer.
* @param vb the vbucket that a new item is pushed into.
* @param qi item to be persisted.
* @param generateBySeqno yes/no generate the seqno for the item
* @param preLinkDocumentContext A context object needed for the
* pre link document API in the server API. It is notified
* with the generated CAS before the object is made available
* for other threads. May be nullptr if the document originates
* from a context where the document shouldn't be updated.
* @param assignedSeqnoCallback a function that is called with the seqno
* of the item. This is called with the queueLock held.
* @return true if an item queued increases the size of persistence queue
* by 1.
*/
bool queueDirty(VBucket& vb,
queued_item& qi,
const GenerateBySeqno generateBySeqno,
const GenerateCas generateCas,
PreLinkDocumentContext* preLinkDocumentContext,
std::function<void(int64_t)> assignedSeqnoCallback = {});
/*
* Queue writing of the VBucket's state to persistent layer.
* @param vb the vbucket that a new item is pushed into.
*/
void queueSetVBState(VBucket& vb);
/**
* Add all outstanding items for the given cursor name to the vector. Only
* fetches items for contiguous Checkpoints of the same type.
*
* @param cursor CheckpointCursor to read items from and advance
* @param items container which items will be appended to.
* @return The low/high sequence number added to `items` on success,
* or (0,0) if no items were added.
*/
CheckpointManager::ItemsForCursor getNextItemsForCursor(
CheckpointCursor* cursor, std::vector<queued_item>& items);
/**
* Add all outstanding items for persistence to the vector. Only fetches
* items for contiguous Checkpoints of the same type.
*
* @param items container which items will be appended to.
* @return The low/high sequence number added to `items` on success,
* or (0,0) if no items were added.
*/
CheckpointManager::ItemsForCursor getNextItemsForPersistence(
std::vector<queued_item>& items) {
return getNextItemsForCursor(persistenceCursor, items);
}
/**
* Add items for the given cursor to the vector, stopping on a checkpoint
* boundary which is greater or equal to `approxLimit`. The cursor is
* advanced to point after the items fetched.
*
* Can fetch items of contiguous Memory Checkpoints.
* Never fetches (1) items of contiguous Disk checkpoints or (2) items of
* checkpoints of different types.
*
* Note: It is only valid to fetch complete checkpoints; as such we cannot
* limit to a precise number of items.
*
* @param cursor CheckpointCursor to read items from and advance
* @param[in/out] items container which items will be appended to.
* @param approxLimit Approximate number of items to add.
* @return An ItemsForCursor object containing:
* range: the low/high sequence number of the checkpoints(s) added to
* `items`;
* moreAvailable: true if there are still items available for this
* checkpoint (i.e. the limit was hit).
* maxDeletedRevSeqno for the items returned
* highCompletedSeqno for the items returned
* maxVisibleSeqno initialised to that of the first checkpoint, this works
* for the ActiveStream use-case who just needs a single
* value to seed it's snapshot loop.
*/
ItemsForCursor getItemsForCursor(CheckpointCursor* cursor,
std::vector<queued_item>& items,
size_t approxLimit);
/**
* Add items for persistence to the vector, stopping on a checkpoint
* boundary which is greater or equal to `approxLimit`. The persistence
* cursor is advanced to point after the items fetched. Only fetches
* items for contiguous Checkpoints of the same type.
*
* Note: It is only valid to fetch complete checkpoints; as such we cannot
* limit to a precise number of items.
*
* @param[in/out] items container which items will be appended to.
* @param approxLimit Approximate number of items to add.
* @return An ItemsForCursor object containing:
* range: the low/high sequence number of the checkpoints(s) added to
* `items`;
* moreAvailable: true if there are still items available for this
* checkpoint (i.e. the limit was hit).
*/
ItemsForCursor getItemsForPersistence(std::vector<queued_item>& items,
size_t approxLimit) {
return getItemsForCursor(persistenceCursor, items, approxLimit);
}
/**
* Return the total number of items (including meta items) that belong to
* this checkpoint manager.
*/
size_t getNumItems() const {
return numItems;
}
/**
* Returns the number of non-meta items in the currently open checkpoint.
*/
size_t getNumOpenChkItems() const;
/* WARNING! This method can return inaccurate counts - see MB-28431. It
* at *least* can suffer from overcounting by at least 1 (in scenarios as
* yet not clear).
* As such it is *not* safe to use when a precise count of remaining
* items is needed.
*
* Returns the count of Items (excluding meta items) that the given cursor
* has yet to process (i.e. between the cursor's current position and the
* end of the last checkpoint).
*/
size_t getNumItemsForCursor(const CheckpointCursor* cursor) const;
/* WARNING! This method can return inaccurate counts - see MB-28431. It
* at *least* can suffer from overcounting by at least 1 (in scenarios as
* yet not clear).
* As such it is *not* safe to use when a precise count of remaining
* items is needed.
*
* Returns the count of Items (excluding meta items) that the persistence
* cursor has yet to process (i.e. between the cursor's current position and
* the end of the last checkpoint).
*/
size_t getNumItemsForPersistence() const {
return getNumItemsForCursor(persistenceCursor);
}
void clear(vbucket_state_t vbState);
/**
* Clear all the checkpoints managed by this checkpoint manager.
*/
void clear(VBucket& vb, uint64_t seqno);
const CheckpointConfig &getCheckpointConfig() const {
return checkpointConfig;
}
void addStats(const AddStatFn& add_stat, const void* cookie);
/**
* Create a new open checkpoint by force.
*
* @param force create a new checkpoint even if the existing one
* contains no non-meta items
* @return the new open checkpoint id
*/
uint64_t createNewCheckpoint(bool force = false);
/**
* Return memory consumption of all the checkpoints managed
*/
size_t getMemoryUsage_UNLOCKED() const;
size_t getMemoryUsage() const;
/**
* Return memory overhead of all the checkpoints managed
*/
size_t getMemoryOverhead_UNLOCKED() const;
/**
* Return memory overhead of all the checkpoints managed
*/
size_t getMemoryOverhead() const;
/**
* Return memory consumption of unreferenced checkpoints
*/
size_t getMemoryUsageOfUnrefCheckpoints() const;
/**
* Function returns a list of cursors to drop so as to unreference
* certain checkpoints within the manager, invoked by the cursor-dropper.
* @return a container of weak_ptr to cursors
*/
std::vector<Cursor> getListOfCursorsToDrop();
/**
* @return True if at least one checkpoint is unreferenced and can
* be removed.
*/
bool hasClosedCheckpointWhichCanBeRemoved() const;
/**
* @return true if only the backup pcursor is blocking checkpoint removal.
* Ie, some checkpoints will be eligible for removal as soon as the backup
* pcursor is removed.
*/
bool isEligibleForCheckpointRemovalAfterPersistence() const;
void createSnapshot(uint64_t snapStartSeqno,
uint64_t snapEndSeqno,
std::optional<uint64_t> highCompletedSeqno,
CheckpointType checkpointType,
uint64_t maxVisibleSnapEnd);
/**
* Extend the open checkpoint to contain more mutations. Allowed only for
* Memory checkpoints.
* Note:
* 1) We forbid merging of checkpoints of different type for multiple
* reasons (eg, MB-42780).
* 2) Extending a Disk checkpoint would be theoretically possible, but the
* function doesn't support it (eg, we would need to update other quantities
* like the HCS). Adding support for that doesn't seem necessary. The
* original idea behind "extending a checkpoint" is that under load the
* active may send many/tiny snapshots. Creating a checkpoint for every
* snapshot would be unnecessarily expensive at runtime and also we would
* end up quickly with a huge CheckpointList, which would degrade the
* performance of some code-paths in the CM (eg, checkpoint removal).
*
* @param snapEnd
* @param maxVisibleSnapEnd
* @throws std::logic_error If the user tries to extend a Disk checkpoint
*/
void extendOpenCheckpoint(uint64_t snapEnd, uint64_t maxVisibleSnapEnd);
snapshot_info_t getSnapshotInfo();
uint64_t getOpenSnapshotStartSeqno() const;
/**
* Return the visible end seqno for the current snapshot. This logically
* matches the end which would be returned by getSnapshotInfo, but for the
* visible end.
*
* @return The end seqno for the current snapshot. For replication, if only
* a marker has been received, the value returned is for the prev
* complete snapshot.
*/
uint64_t getVisibleSnapshotEndSeqno() const;
void notifyFlusher();
int64_t getHighSeqno() const;
uint64_t getMaxVisibleSeqno() const;
/// @return the persistence cursor which can be null
CheckpointCursor* getPersistenceCursor() const {
return persistenceCursor;
}
/// @return the backup-pcursor
std::shared_ptr<CheckpointCursor> getBackupPersistenceCursor();
void dump() const;
/**
* Take the cursors from another checkpoint manager and reset them in the
* process - used as part of vbucket reset.
* @param other the manager we are taking cursors from
*/
void takeAndResetCursors(CheckpointManager& other);
/// @return true if the current open checkpoint is a DiskCheckpoint
bool isOpenCheckpointDisk();
void updateStatsForStateChange(vbucket_state_t from, vbucket_state_t to);
/**
* Sets the callback to be invoked whenever memory usage changes due to a
* new queued item or checkpoint removal (or checkpoint expelling, in
* versions this is implemented in). This allows changes in checkpoint
* memory usage to be monitored.
*/
void setOverheadChangedCallback(
std::function<void(int64_t delta)> callback);
/**
* Gets the callback to be invoked whenever memory usage changes due to a
* new queued item or checkpoint removal (or checkpoint expelling, in
* versions this is implemented in).
*/
std::function<void(int64_t delta)> getOverheadChangedCallback() const;
/**
* Member std::function variable, to allow us to inject code into
* removeCursor_UNLOCKED() for unit MB36146
*/
std::function<void(const CheckpointCursor* cursor, Vbid vbid)>
runGetItemsHook;
protected:
/**
* Advance the given cursor. Protected as it's valid to call this from
* getItemsForCursor but not from anywhere else (as it will return an entire
* checkpoint and never leave a cursor placed at the checkpoint_end).
*
* Note: This function skips empty items. If the cursor moves into a new
* checkpoint, then after this call it will point to the checkpoint_start
* item into the new checkpoint.
*
* @return true if advanced, false otherwise
*/
bool incrCursor(CheckpointCursor& cursor);
uint64_t getOpenCheckpointId_UNLOCKED(const LockHolder& lh);
uint64_t getLastClosedCheckpointId_UNLOCKED(const LockHolder& lh);
void setOpenCheckpointId_UNLOCKED(const LockHolder& lh, uint64_t id);
// Helper method for queueing methods - update the global and per-VBucket
// stats after queueing a new item to a checkpoint.
// Must be called with queueLock held (LockHolder passed in as argument to
// 'prove' this).
void updateStatsForNewQueuedItem_UNLOCKED(const LockHolder& lh,
VBucket& vb,
const queued_item& qi);
/**
* function to invoke whenever memory usage changes due to a new
* queued item or checkpoint removal (or checkpoint expelling, in versions
* this ins implemented in).
* Must be declared before checkpointList to ensure it still exists
* when any Checkpoints within the list are destroyed during destruction
* of this CheckpointManager.
*/
std::function<void(int64_t delta)> overheadChangedCallback{[](int64_t) {}};
bool removeCursor_UNLOCKED(CheckpointCursor* cursor);
CursorRegResult registerCursorBySeqno_UNLOCKED(const LockHolder& lh,
const std::string& name,
uint64_t startBySeqno);
/**
* Called by getItemsForCursor() for registering a copy of the persistence
* cursor before pcursor moves.
* The copy is used for resetting the pcursor to the backup position (in
* the case of flush failure) for re-attempting the flush.
*
* The function forbids to overwrite an existing backup pcursor.
*
* @param lh Lock to CM::queueLock
* @throws logic_error if the user attempts to overwrite the backup pcursor
*/
void registerBackupPersistenceCursor(const LockHolder& lh);
size_t getNumItemsForCursor_UNLOCKED(const CheckpointCursor* cursor) const;
void clear_UNLOCKED(vbucket_state_t vbState, uint64_t seqno);
/*
* @return a reference to the open checkpoint
*/
Checkpoint& getOpenCheckpoint_UNLOCKED(const LockHolder& lh) const;
/*
* Closes the current open checkpoint and adds a new open checkpoint to
* the checkpointList.
*
* @param id for the new checkpoint
* @param snapStartSeqno for the new checkpoint
* @param snapEndSeqno for the new checkpoint
* @param visibleSnapEnd for the new checkpoint
* @param highCompletedSeqno optional SyncRep HCS to be flushed to disk
* @param checkpointType is the checkpoint created from a replica receiving
* a disk snapshot?
*/
void addNewCheckpoint_UNLOCKED(uint64_t id,
uint64_t snapStartSeqno,
uint64_t snapEndSeqno,
uint64_t visibleSnapEnd,
std::optional<uint64_t> highCompletedSeqno,
CheckpointType checkpointType);
/*
* Closes the current open checkpoint and adds a new open checkpoint to
* the checkpointList.
* Note: the function sets snapStart and snapEnd to 'lastBySeqno' for the
* new checkpoint.
*
* @param id for the new checkpoint
*/
void addNewCheckpoint_UNLOCKED(uint64_t id);
/*
* Add an open checkpoint to the checkpointList.
*
* @param id for the new checkpoint
* @param snapStartSeqno for the new checkpoint
* @param snapEndSeqno for the new checkpoint
* @param highCompletedSeqno the SyncRepl HCS to be flushed to disk
* @param checkpointType is the checkpoint created from a replica receiving
* a disk snapshot?
*/
void addOpenCheckpoint(uint64_t id,
uint64_t snapStart,
uint64_t snapEnd,
uint64_t visibleSnapEnd,
std::optional<uint64_t> highCompletedSeqno,
CheckpointType checkpointType);
/**
* Moves the cursor to the empty item into the next checkpoint (if any).
*
* @param cursor
* @return true if the cursor has moved, false otherwise
*/
bool moveCursorToNextCheckpoint(CheckpointCursor &cursor);
/**
* Check the current open checkpoint to see if we need to create the new open checkpoint.
* @param forceCreation is to indicate if a new checkpoint is created due to online update or
* high memory usage.
* @param timeBound is to indicate if time bound should be considered in creating a new
* checkpoint.
* @return the previous open checkpoint Id if we create the new open checkpoint. Otherwise
* return 0.
*/
uint64_t checkOpenCheckpoint_UNLOCKED(const LockHolder& lh,
bool forceCreation,
bool timeBound);
bool isLastMutationItemInCheckpoint(CheckpointCursor &cursor);
bool isCheckpointCreationForHighMemUsage_UNLOCKED(const LockHolder& lh,
const VBucket& vbucket);
void resetCursors();
queued_item createCheckpointItem(uint64_t id,
Vbid vbid,
queue_op checkpoint_op);
CheckpointList checkpointList;
EPStats &stats;
CheckpointConfig &checkpointConfig;
mutable std::mutex queueLock;
const Vbid vbucketId;
// Total number of items (including meta items) in /all/ checkpoints managed
// by this object.
std::atomic<size_t> numItems;
Monotonic<int64_t> lastBySeqno;
/**
* The highest seqno of all items that are visible, i.e. normal mutations or
* mutations which have been prepared->committed. The main use of this value
* is to give clients that don't support sync-replication a view of the
* vbucket which they can receive (via dcp), i.e this value would not change
* to the seqno of a prepare.
*/
Monotonic<int64_t> maxVisibleSeqno;
/**
* cursors: stores all known CheckpointCursor objects which are held via
* shared_ptr. When a client creates a cursor we store the shared_ptr and
* give out a weak_ptr allowing cursors to be simply de-registered. We use
* the client's chosen name as the key
*/
using cursor_index =
std::unordered_map<std::string, std::shared_ptr<CheckpointCursor>>;
cursor_index cursors;
const FlusherCallback flusherCB;
static constexpr const char* pCursorName = "persistence";
Cursor pCursor;
CheckpointCursor* persistenceCursor = nullptr;
// Only for persistence, we register a copy of the cursor before the cursor
// moves. Then:
// 1) if flush succeeds, we just remove the copy
// 2) if flush fails, we reset the pcursor to the copy
//
// That allows to rely entirely on the CM for re-attemping the flush after
// failure.
static constexpr const char* backupPCursorName = "persistence-backup";
/**
* Flush stats that are accounted when we persist an item between the
* backup and persistence cursors. Should the flush fail we need to undo
* the stat updates or we'll overcount them.
*/
VBucket::AggregatedFlushStats persistenceFailureStatOvercounts;
friend std::ostream& operator<<(std::ostream& os, const CheckpointManager& m);
};
// Outputs a textual description of the CheckpointManager.
std::ostream& operator<<(std::ostream& os, const CheckpointManager& m);