-
Notifications
You must be signed in to change notification settings - Fork 29
/
tapconnection.hh
1212 lines (1014 loc) · 32.9 KB
/
tapconnection.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#ifndef TAPCONNECTION_HH
#define TAPCONNECTION_HH 1
#include <set>
#include "common.hh"
#include "atomic.hh"
#include "mutex.hh"
#include "locks.hh"
#include "vbucket.hh"
// forward decl
class EventuallyPersistentEngine;
class TapConnMap;
class BackFillVisitor;
class TapBGFetchCallback;
class CompleteBackfillOperation;
class Dispatcher;
class Item;
struct TapStatBuilder;
struct TapAggStatBuilder;
struct PopulateEventsBody;
#define MAX_TAP_KEEP_ALIVE 3600
#define MAX_TAKEOVER_TAP_LOG_SIZE 500
#define TAP_OPAQUE_ENABLE_AUTO_NACK 0
#define TAP_OPAQUE_INITIAL_VBUCKET_STREAM 1
#define TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC 2
#define TAP_OPAQUE_OPEN_CHECKPOINT 3
#define TAP_OPAQUE_START_ONLINEUPDATE 4
#define TAP_OPAQUE_STOP_ONLINEUPDATE 5
#define TAP_OPAQUE_REVERT_ONLINEUPDATE 6
#define TAP_OPAQUE_CLOSE_TAP_STREAM 7
#define TAP_OPAQUE_CLOSE_BACKFILL 8
/**
* A tap event that represents a change to the state of a vbucket.
*
* The tap stream may include other events than data mutation events,
* but the data structures in the TapProducer does only store a key
* for the item to store. We don't want to add more data to those elements,
* because that could potentially consume a lot of memory (the tap queue
* may have a lot of elements).
*/
class TapVBucketEvent {
public:
/**
* Create a new instance of the TapVBucketEvent and initialize
* its members.
* @param ev Type of event
* @param b The bucket this event belongs to
* @param s The state change for this event
*/
TapVBucketEvent(tap_event_t ev, uint16_t b, vbucket_state_t s) :
event(ev), vbucket(b), state(s) {}
tap_event_t event;
uint16_t vbucket;
vbucket_state_t state;
};
/**
* Represents an item that has been sent over tap, but may need to be
* rolled back if acks fail.
*/
class TapLogElement {
public:
TapLogElement(uint32_t s, const TapVBucketEvent &e) :
seqno(s),
event(e.event),
vbucket(e.vbucket),
state(e.state)
{
// EMPTY
}
TapLogElement(uint32_t s, const queued_item &qi) :
seqno(s),
vbucket(qi->getVBucketId()),
item(qi)
{
switch(item->getOperation()) {
case queue_op_set:
event = TAP_MUTATION;
break;
case queue_op_del:
event = TAP_DELETION;
break;
case queue_op_flush:
event = TAP_FLUSH;
break;
case queue_op_checkpoint_start:
event = TAP_CHECKPOINT_START;
break;
case queue_op_checkpoint_end:
event = TAP_CHECKPOINT_END;
break;
case queue_op_online_update_start:
event = TAP_OPAQUE;
state = (vbucket_state_t)htonl(TAP_OPAQUE_START_ONLINEUPDATE);
break;
case queue_op_online_update_end:
event = TAP_OPAQUE;
state = (vbucket_state_t)htonl(TAP_OPAQUE_STOP_ONLINEUPDATE);
break;
case queue_op_online_update_revert:
event = TAP_OPAQUE;
state = (vbucket_state_t)htonl(TAP_OPAQUE_REVERT_ONLINEUPDATE);
break;
default:
break;
}
}
uint32_t seqno;
tap_event_t event;
uint16_t vbucket;
vbucket_state_t state;
queued_item item;
};
/**
* An item queued for background fetch from tap.
*/
class TapBGFetchQueueItem {
public:
TapBGFetchQueueItem(const std::string &k, uint64_t i,
uint16_t vb, uint16_t vbv) :
key(k), id(i), vbucket(vb), vbversion(vbv) {}
const std::string key;
const uint64_t id;
const uint16_t vbucket;
const uint16_t vbversion;
};
typedef enum {
backfill,
checkpoint_start,
checkpoint_end,
checkpoint_end_synced
} tap_checkpoint_state;
/**
* Checkpoint state of each vbucket in TAP stream.
*/
class TapCheckpointState {
public:
TapCheckpointState() : currentCheckpointId(0), openCheckpointIdAtBackfillEnd(0),
lastSeqNum(0), lastItem(false) {}
TapCheckpointState(uint16_t vb, uint64_t checkpointId, tap_checkpoint_state s) :
vbucket(vb), currentCheckpointId(checkpointId),
openCheckpointIdAtBackfillEnd(0), lastSeqNum(0), lastItem(false), state(s) {}
TapCheckpointState(const TapCheckpointState &other) {
vbucket = other.vbucket;
currentCheckpointId = other.currentCheckpointId;
state = other.state;
}
uint16_t vbucket;
// Id of the checkpoint that is currently referenced by the given TAP client's cursor.
uint64_t currentCheckpointId;
// Id of the current open checkpoint at the time of backfill completion.
uint64_t openCheckpointIdAtBackfillEnd;
// Last sequence number sent to the slave.
uint32_t lastSeqNum;
// True if the TAP cursor reaches to the last item at its current checkpoint.
bool lastItem;
tap_checkpoint_state state;
};
/**
* An abstract class representing a TAP connection. There are two different
* types of a TAP connection, a producer and a consumer. The producers needs
* to be able of being kept across connections, but the consumers don't contain
* anything that can't be recreated.
*/
class TapConnection {
protected:
/**
* We need to be able to generate unique names, so let's just use a 64 bit counter
*/
static Atomic<uint64_t> tapCounter;
/**
* The engine that owns the connection
*/
EventuallyPersistentEngine &engine;
/**
* The cookie representing this connection (provided by the memcached code)
*/
const void *cookie;
/**
* The name for this connection
*/
std::string name;
/**
* Tap connection creation time
*/
rel_time_t created;
/**
* when this tap conneciton expires.
*/
rel_time_t expiryTime;
/**
* Is this tap conenction connected?
*/
bool connected;
/**
* Should we disconnect as soon as possible?
*/
bool disconnect;
/**
* Number of times this connection was disconnected
*/
Atomic<size_t> numDisconnects;
bool supportAck;
bool supportCheckpointSync;
Atomic<bool> reserved;
TapConnection(EventuallyPersistentEngine &theEngine,
const void *c, const std::string &n) :
engine(theEngine),
cookie(c),
name(n),
created(ep_current_time()),
expiryTime((rel_time_t)-1),
connected(true),
disconnect(false),
supportAck(false),
supportCheckpointSync(false),
reserved(false)
{ /* EMPTY */ }
template <typename T>
void addStat(const char *nm, T val, ADD_STAT add_stat, const void *c) {
std::stringstream tap;
tap << name << ":" << nm;
std::stringstream value;
value << val;
std::string n = tap.str();
add_stat(n.data(), static_cast<uint16_t>(n.length()),
value.str().data(), static_cast<uint32_t>(value.str().length()),
c);
}
void addStat(const char *nm, bool val, ADD_STAT add_stat, const void *c) {
addStat(nm, val ? "true" : "false", add_stat, c);
}
public:
/**
* Release the reference "upstream".
* @param force Should we force the release upstream even if the
* internal state indicates that the object isn't
* reserved upstream.
*/
void releaseReference(bool force = false);
//! cookie used by this connection
const void *getCookie() const;
//! cookie used by this connection
void setCookie(const void *c) {
cookie = c;
}
static uint64_t nextTapId() {
return tapCounter++;
}
static std::string getAnonName() {
std::stringstream s;
s << "eq_tapq:anon_";
s << nextTapId();
return s.str();
}
virtual ~TapConnection() { /* EMPTY */ }
virtual const std::string &getName() const { return name; }
void setName(const std::string &n) { name.assign(n); }
void setReserved(bool r) { reserved = r; }
bool isReserved() const { return reserved; }
virtual const char *getType() const = 0;
virtual void addStats(ADD_STAT add_stat, const void *c) {
addStat("type", getType(), add_stat, c);
addStat("created", created, add_stat, c);
addStat("connected", connected, add_stat, c);
addStat("pending_disconnect", doDisconnect(), add_stat, c);
addStat("supports_ack", supportAck, add_stat, c);
addStat("reserved", reserved, add_stat, c);
if (numDisconnects > 0) {
addStat("disconnects", numDisconnects, add_stat, c);
}
}
virtual void processedEvent(tap_event_t event, ENGINE_ERROR_CODE ret) {
(void)event;
(void)ret;
}
/**
* Some of the TAP objects may have large amounts of internal data
* to clean up. To avoid blocking the dispatcher for a long time just clean
* up some items at the time.
*
* @return true if all of the internal data structures are cleaned up and
* its safe to kill the object
*/
virtual bool cleanSome() {
return true;
}
void setSupportAck(bool ack) {
supportAck = ack;
}
bool supportsAck() const {
return supportAck;
}
void setSupportCheckpointSync(bool checkpointSync) {
supportCheckpointSync = checkpointSync;
}
bool supportsCheckpointSync() const {
return supportCheckpointSync;
}
void setExpiryTime(rel_time_t t) {
expiryTime = t;
}
rel_time_t getExpiryTime() {
return expiryTime;
}
void setConnected(bool s) {
if (!s) {
++numDisconnects;
}
connected = s;
}
bool isConnected() {
return connected;
}
bool doDisconnect() {
return disconnect;
}
void setDisconnect(bool val) {
disconnect = val;
}
};
/**
* Holder class for the
*/
class TapConsumer : public TapConnection {
private:
Atomic<size_t> numDelete;
Atomic<size_t> numDeleteFailed;
Atomic<size_t> numFlush;
Atomic<size_t> numFlushFailed;
Atomic<size_t> numMutation;
Atomic<size_t> numMutationFailed;
Atomic<size_t> numOpaque;
Atomic<size_t> numOpaqueFailed;
Atomic<size_t> numVbucketSet;
Atomic<size_t> numVbucketSetFailed;
Atomic<size_t> numCheckpointStart;
Atomic<size_t> numCheckpointStartFailed;
Atomic<size_t> numCheckpointEnd;
Atomic<size_t> numCheckpointEndFailed;
Atomic<size_t> numUnknown;
public:
TapConsumer(EventuallyPersistentEngine &theEngine,
const void *c,
const std::string &n);
virtual void processedEvent(tap_event_t event, ENGINE_ERROR_CODE ret);
virtual void addStats(ADD_STAT add_stat, const void *c);
virtual const char *getType() const { return "consumer"; };
virtual bool processCheckpointCommand(tap_event_t event, uint16_t vbucket,
uint64_t checkpointId);
virtual void checkVBOpenCheckpoint(uint16_t);
virtual bool processOnlineUpdateCommand(uint32_t event, uint16_t vbucket);
void setBackfillPhase(bool isBackfill, uint16_t vbucket);
bool isBackfillPhase(uint16_t vbucket);
};
/**
* Class used by the EventuallyPersistentEngine to keep track of all
* information needed per Tap connection.
*/
class TapProducer : public TapConnection {
public:
virtual void addStats(ADD_STAT add_stat, const void *c);
virtual void processedEvent(tap_event_t event, ENGINE_ERROR_CODE ret);
virtual const char *getType() const { return "producer"; };
virtual bool cleanSome();
bool isSuspended() const;
void setSuspended(bool value);
bool isTimeForNoop();
void setTimeForNoop();
void completeBackfill() {
LockHolder lh(queueLock);
if (pendingBackfillCounter > 0) {
--pendingBackfillCounter;
}
completeBackfillCommon_UNLOCKED();
}
void scheduleDiskBackfill() {
LockHolder lh(queueLock);
++diskBackfillCounter;
}
void completeDiskBackfill() {
LockHolder lh(queueLock);
if (diskBackfillCounter > 0) {
--diskBackfillCounter;
}
completeBackfillCommon_UNLOCKED();
}
/**
* Invoked each time a background item fetch completes.
*/
void gotBGItem(Item *item, bool implicitEnqueue);
/**
* Invoked once per batch bg fetch job.
*/
void completedBGFetchJob();
private:
friend class EventuallyPersistentEngine;
friend class TapConnMap;
friend class BackFillVisitor;
friend class TapBGFetchCallback;
friend struct TapStatBuilder;
friend struct TapAggStatBuilder;
friend struct PopulateEventsBody;
void completeBackfillCommon_UNLOCKED() {
if (complete_UNLOCKED() && idle_UNLOCKED()) {
// There is no data for this connection..
// Just go ahead and disconnect it.
setDisconnect(true);
}
}
/**
* Add a new item to the tap queue. You need to hold the queue lock
* before calling this function
* The item may be ignored if the TapProducer got a vbucket filter
* associated and the item's vbucket isn't part of the filter.
*
* @return true if the the queue was empty
*/
bool addEvent_UNLOCKED(const queued_item &it) {
if (vbucketFilter(it->getVBucketId())) {
bool wasEmpty = queue->empty();
queue->push_back(it);
++queueSize;
queueMemSize.incr(sizeof(queued_item));
return wasEmpty;
} else {
return queue->empty();
}
}
/**
* Add a new item to the tap queue.
* The item may be ignored if the TapProducer got a vbucket filter
* associated and the item's vbucket isn't part of the filter.
*
* @return true if the the queue was empty
*/
bool addEvent(const queued_item &it) {
LockHolder lh(queueLock);
return addEvent_UNLOCKED(it);
}
/**
* Add a key to the tap queue. You need the queue lock to call this
* @return true if the the queue was empty
*/
bool addEvent_UNLOCKED(const std::string &key, uint16_t vbid, enum queue_operation op) {
queued_item qi(new QueuedItem(key, vbid, op));
return addEvent_UNLOCKED(qi);
}
bool addEvent(const std::string &key, uint16_t vbid, enum queue_operation op) {
LockHolder lh(queueLock);
return addEvent_UNLOCKED(key, vbid, op);
}
void addTapLogElement_UNLOCKED(const queued_item &qi) {
if (supportAck) {
TapLogElement log(seqno, qi);
tapLog.push_back(log);
}
}
void addTapLogElement(const queued_item &qi) {
LockHolder lh(queueLock);
addTapLogElement_UNLOCKED(qi);
}
void addTapLogElement_UNLOCKED(const TapVBucketEvent &e) {
if (supportAck) {
// add to the log!
TapLogElement log(seqno, e);
tapLog.push_back(log);
}
}
/**
* Get the next item from the queue
*/
queued_item next(bool &shouldPause);
void addVBucketHighPriority_UNLOCKED(TapVBucketEvent &ev) {
vBucketHighPriority.push(ev);
}
/**
* Add a new high priority TapVBucketEvent to this TapProducer. A high
* priority TapVBucketEvent will bypass the the normal queue of events to
* be sent to the client, and be sent the next time it is possible to
* send data over the tap connection.
*/
void addVBucketHighPriority(TapVBucketEvent &ev) {
LockHolder lh(queueLock);
addVBucketHighPriority_UNLOCKED(ev);
}
/**
* Get the next high priority TapVBucketEvent for this TapProducer
*/
TapVBucketEvent nextVBucketHighPriority_UNLOCKED() {
TapVBucketEvent ret(TAP_PAUSE, 0, vbucket_state_active);
if (!vBucketHighPriority.empty()) {
ret = vBucketHighPriority.front();
vBucketHighPriority.pop();
// We might have objects in our queue that aren't in our filter
// If so, just skip them..
switch (ret.event) {
case TAP_OPAQUE:
opaqueCommandCode = (uint32_t)ret.state;
if (opaqueCommandCode == htonl(TAP_OPAQUE_ENABLE_AUTO_NACK) ||
opaqueCommandCode == htonl(TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC) ||
opaqueCommandCode == htonl(TAP_OPAQUE_CLOSE_BACKFILL)) {
break;
}
// FALLTHROUGH
default:
if (!vbucketFilter(ret.vbucket)) {
return nextVBucketHighPriority_UNLOCKED();
}
}
++recordsFetched;
addTapLogElement_UNLOCKED(ret);
}
return ret;
}
TapVBucketEvent nextVBucketHighPriority() {
LockHolder lh(queueLock);
return nextVBucketHighPriority_UNLOCKED();
}
void addVBucketLowPriority_UNLOCKED(TapVBucketEvent &ev) {
vBucketLowPriority.push(ev);
}
/**
* Add a new low priority TapVBucketEvent to this TapProducer. A low
* priority TapVBucketEvent will only be sent when the tap connection
* doesn't have any other events to send.
*/
void addVBucketLowPriority(TapVBucketEvent &ev) {
LockHolder lh(queueLock);
addVBucketLowPriority_UNLOCKED(ev);
}
/**
* Get the next low priority TapVBucketEvent for this TapProducer.
*/
TapVBucketEvent nextVBucketLowPriority_UNLOCKED() {
TapVBucketEvent ret(TAP_PAUSE, 0, vbucket_state_active);
if (!vBucketLowPriority.empty()) {
ret = vBucketLowPriority.front();
vBucketLowPriority.pop();
// We might have objects in our queue that aren't in our filter
// If so, just skip them..
if (!vbucketFilter(ret.vbucket)) {
return nextVBucketHighPriority_UNLOCKED();
}
++recordsFetched;
addTapLogElement_UNLOCKED(ret);
}
return ret;
}
TapVBucketEvent nextVBucketLowPriority() {
LockHolder lh(queueLock);
return nextVBucketLowPriority_UNLOCKED();
}
void addCheckpointMessage_UNLOCKED(const queued_item &item) {
checkpointMsgs.push(item);
}
/**
* Add a checkpoint start / end message to the checkpoint message queue. These messages
* are used for synchronizing checkpoints between tap producer and consumer.
*/
void addCheckpointMessage(const queued_item &item) {
LockHolder lh(queueLock);
addCheckpointMessage_UNLOCKED(item);
}
queued_item nextCheckpointMessage_UNLOCKED() {
queued_item item(new QueuedItem("", 0xffff, queue_op_empty));
if (!checkpointMsgs.empty()) {
item = checkpointMsgs.front();
checkpointMsgs.pop();
if (!vbucketFilter(item->getVBucketId())) {
return nextCheckpointMessage_UNLOCKED();
}
++checkpointMsgCounter;
++recordsFetched;
addTapLogElement_UNLOCKED(item);
}
return item;
}
queued_item nextCheckpointMessage() {
LockHolder lh(queueLock);
return nextCheckpointMessage_UNLOCKED();
}
bool hasQueuedItem_UNLOCKED() {
return !queue->empty() || hasNextFromCheckpoints_UNLOCKED();
}
bool empty_UNLOCKED() {
return bgQueueSize == 0 && bgResultSize == 0 && (bgJobIssued - bgJobCompleted) == 0 &&
!hasQueuedItem_UNLOCKED();
}
bool idle_UNLOCKED() {
return empty_UNLOCKED() && vBucketLowPriority.empty() && vBucketHighPriority.empty() &&
checkpointMsgs.empty() && tapLog.empty();
}
bool idle() {
LockHolder lh(queueLock);
return idle_UNLOCKED();
}
bool hasItem() {
return bgResultSize != 0;
}
bool hasQueuedItem() {
LockHolder lh(queueLock);
return hasQueuedItem_UNLOCKED();
}
bool empty() {
LockHolder lh(queueLock);
return empty_UNLOCKED();
}
/**
* Find out how many items are still remaining from backfill.
*/
size_t getBackfillRemaining_UNLOCKED() {
return bgResultSize + bgQueueSize
+ (bgJobIssued - bgJobCompleted) + queueSize;
}
size_t getBackfillRemaining() {
LockHolder lh(queueLock);
return getBackfillRemaining_UNLOCKED();
}
size_t getQueueSize() {
LockHolder lh(queueLock);
return queueSize;
}
size_t getQueueMemory() {
return queueMemSize;
}
size_t getRemaingOnDisk() {
LockHolder lh(queueLock);
return bgQueueSize + (bgJobIssued - bgJobCompleted);
}
size_t getQueueFillTotal() {
return queueFill;
}
size_t getQueueDrainTotal() {
return queueDrain;
}
size_t getQueueBackoff() {
return numTapNack;
}
bool shouldNotify() {
bool ret = false;
// Don't notify if we've got a pending notification
if (!notifySent) {
// Always notify for disconnects, but only disconnect if
// we're paused and got data to send
if (doDisconnect() || (paused && !empty())) {
ret = true;
}
}
return ret;
}
/**
* Get the total number of remaining items from all checkpoints.
*/
size_t getRemainingOnCheckpoints();
bool hasNextFromCheckpoints_UNLOCKED();
bool hasNextFromCheckpoints() {
LockHolder lh(queueLock);
return hasNextFromCheckpoints_UNLOCKED();
}
Item* nextFetchedItem();
void flush() {
LockHolder lh(queueLock);
pendingFlush = true;
/* No point of keeping the rep queue when someone wants to flush it */
queue->clear();
queueSize = 0;
queueMemSize = 0;
// Clear bg-fetched items.
while (!backfilledItems.empty()) {
Item *i(backfilledItems.front());
assert(i);
delete i;
backfilledItems.pop();
}
bgResultSize = 0;
// Clear the checkpoint message queue as well
while (!checkpointMsgs.empty()) {
checkpointMsgs.pop();
}
}
bool shouldFlush() {
bool ret = pendingFlush;
pendingFlush = false;
return ret;
}
// This method is called while holding the tapNotifySync lock.
void appendQueue(std::list<queued_item> *q) {
LockHolder lh(queueLock);
queue->splice(queue->end(), *q);
queueSize = queue->size();
for(std::list<queued_item>::iterator i = q->begin(); i != q->end(); ++i) {
queueMemSize.incr((*i)->size());
}
}
bool isPendingDiskBackfill() {
LockHolder lh(queueLock);
return diskBackfillCounter > 0;
}
/**
* A backfill is pending if the backfill thread is still running
*/
bool isPendingBackfill_UNLOCKED() {
return doRunBackfill || pendingBackfillCounter > 0 || diskBackfillCounter > 0;
}
bool isPendingBackfill() {
LockHolder lh(queueLock);
return isPendingBackfill_UNLOCKED();
}
/**
* Items from backfill are all successfully transmitted to the destination?
*/
bool isBackfillCompleted_UNLOCKED() {
return backfillCompleted;
}
bool isBackfillCompleted() {
LockHolder lh(queueLock);
return isBackfillCompleted_UNLOCKED();
}
void scheduleBackfill_UNLOCKED(const std::vector<uint16_t> &vblist);
void scheduleBackfill(const std::vector<uint16_t> &vblist) {
LockHolder lh(queueLock);
scheduleBackfill_UNLOCKED(vblist);
}
bool runBackfill(VBucketFilter &vbFilter) {
LockHolder lh(queueLock);
bool rv = doRunBackfill;
if (doRunBackfill) {
doRunBackfill = false;
++pendingBackfillCounter; // Will be decremented when each backfill thread is completed
vbFilter = backFillVBucketFilter;
}
return rv;
}
/**
* A TapProducer is complete when it has nothing to transmit and
* a disconnect was requested at the end.
*/
bool complete_UNLOCKED(void) {
return (dumpQueue || doTakeOver) && isBackfillCompleted_UNLOCKED() && empty_UNLOCKED();
}
bool complete(void) {
LockHolder lh(queueLock);
return complete_UNLOCKED();
}
/**
* Queue an item to be background fetched.
*
* @param key the item's key
* @param id the disk id of the item to fetch
* @param vb the vbucket ID
* @param vbv the vbucket version
*/
void queueBGFetch(const std::string &key, uint64_t id, uint16_t vb, uint16_t vbv);
/**
* Run some background fetch jobs.
*/
void runBGFetch(Dispatcher *dispatcher, const void *cookie);
TapProducer(EventuallyPersistentEngine &theEngine,
const void *cookie,
const std::string &n,
uint32_t f);
~TapProducer() {
assert(cleanSome());
delete queue;
assert(!isReserved());
}
ENGINE_ERROR_CODE processAck(uint32_t seqno, uint16_t status, const std::string &msg);
/**
* Is the tap ack window full?
* @return true if the window is full and no more items should be sent
*/
bool windowIsFull();
/**
* Should we request a TAP ack for this message?
* @param event the event type for this message
* @param vbucket the vbucket Id for this message
* @return true if we should request a tap ack (and start a new sequence)
*/
bool requestAck(tap_event_t event, uint16_t vbucket);
/**
* Get the current tap sequence number.
*/
uint32_t getSeqno() {
return seqno;
}
/**
* Rollback the tap stream to the last ack
*/
void rollback();
void encodeVBucketStateTransition(const TapVBucketEvent &ev, void **es,
uint16_t *nes, uint16_t *vbucket) const;
void evaluateFlags();
bool waitForBackfill();
bool waitForCheckpointMsgAck();
void setRegisteredClient(bool isRegisteredClient);
void setClosedCheckpointOnlyFlag(bool isClosedCheckpointOnly);
bool SetCursorToOpenCheckpoint(uint16_t vbucket);
void setTakeOverCompletionPhase(bool completionPhase) {
takeOverCompletionPhase = completionPhase;
}
bool checkBackfillCompletion_UNLOCKED();
bool checkBackfillCompletion() {
LockHolder lh(queueLock);
return checkBackfillCompletion_UNLOCKED();
}
void setBackfillAge(uint64_t age, bool reconnect);
void setVBucketFilter(const std::vector<uint16_t> &vbuckets);
const VBucketFilter &getVBucketFilter() {
LockHolder lh(queueLock);
return vbucketFilter;
}
bool checkVBucketFilter(uint16_t vbucket) {
LockHolder lh(queueLock);
return vbucketFilter(vbucket);
}
/**
* Register the unified queue cursor for this TAP producer.
*/
void registerTAPCursor(std::map<uint16_t, uint64_t> &lastCheckpointIds);
bool hasPendingAcks() {
LockHolder lh(queueLock);
return !tapLog.empty();
}
size_t getTapAckLogSize(void) {
LockHolder lh(queueLock);
return tapLog.size();
}
void popTapLog(void) {
LockHolder lh(queueLock);
tapLog.pop_back();
}
void reschedule_UNLOCKED(const std::list<TapLogElement>::iterator &iter);
//! Lock held during queue operations.
Mutex queueLock;
/**
* The queue of keys that needs to be sent (this is the "live stream")
*/
std::list<queued_item> *queue;
/**
* Calling size() on a list is a heavy operation (it will traverse
* the list to determine the size).. During tap backfill we're calling
* this for every message we want to send to determine if we should
* require a tap ack or not. Let's cache the value to stop eating up
* the CPU :-)