Skip to content

Commit

Permalink
MB-36468: Fix for data mismatch in timers
Browse files Browse the repository at this point in the history
Change-Id: If70ff85c709c2fbc4708e3707b02e7b0268d0074
Reviewed-on: http://review.couchbase.org/118107
Reviewed-by: CI Bot
Reviewed-by: Jeelan Basha Poola <jeelan.poola@couchbase.com>
Tested-by: Jeelan Basha Poola <jeelan.poola@couchbase.com>
  • Loading branch information
nandsatya authored and jeelanp2003 committed Dec 2, 2019
1 parent 53dcf8d commit 8fb1b10
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 25 deletions.
28 changes: 28 additions & 0 deletions features/include/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "log.h"

#define EXCEPTION_STR_SIZE 20
#define NUM_VBUCKETS 1024

#define TO(maybe, local) \
(To((maybe), (local), __FILE__, __FUNCTION__, __LINE__))
Expand Down Expand Up @@ -131,6 +132,33 @@ class Utils {
v8::Persistent<v8::Object> global_;
};

template <typename T> class AtomicWrapper {
public:
AtomicWrapper() : data(T()) {}

explicit AtomicWrapper(T const &val) : data(val) {}

explicit AtomicWrapper(std::atomic<T> const &val) : data(val.load()) {}

AtomicWrapper(AtomicWrapper const &other) : data(other.data.load()) {}

AtomicWrapper &operator=(AtomicWrapper const &other) {
data.store(other.data.load());
return *this;
}

inline void Set(T val) { data.store(val); }

inline T Get() { return data.load(); }

private:
std::atomic<T> data;
};

using AtomicInt64 = AtomicWrapper<int64_t>;
using AtomicUint64 = AtomicWrapper<uint64_t>;
using AtomicBool = AtomicWrapper<bool>;

int WinSprintf(char **strp, const char *fmt, ...);

v8::Local<v8::String> v8Str(v8::Isolate *isolate, const char *str);
Expand Down
2 changes: 1 addition & 1 deletion v8_consumer/include/timer_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Iterator {
private:
std::pair<bool, bool> GetNextTimer(TimerEvent &tevent);
TimerStore *store_;
std::unordered_set<int64_t>::iterator partn_iter_;
int64_t curr_partition_{0};
int64_t stop_;
std::string top_key_;
lcb_CAS top_cas_{0};
Expand Down
13 changes: 7 additions & 6 deletions v8_consumer/include/timer_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
#include "lcb_utils.h"
#include "timer_defs.h"
#include "timer_iterator.h"
#include "utils.h"

namespace timer {

class TimerStore {
public:
explicit TimerStore(v8::Isolate *isolate, const std::string &prefix,
Expand All @@ -38,13 +38,16 @@ class TimerStore {

Iterator GetIterator();

void UpdatePartition(const std::unordered_set<int64_t> &partitions);
void AddPartition(int64_t partition);

void RemovePartition(int64_t partition);

void SyncSpan();

private:
void Connect();
std::pair<bool, lcb_error_t> SyncSpan(int partition);
std::pair<bool, lcb_error_t> SyncSpanLocked(int partition);

bool ExpandSpan(int64_t partition, int64_t point);

Expand All @@ -65,14 +68,12 @@ class TimerStore {
std::pair<lcb_error_t, Result> Get(const std::string &key);

v8::Isolate *isolate_;
std::unordered_set<int64_t> new_partions_;
std::unordered_set<int64_t> partitons_;
std::atomic_bool is_dirty_;
std::vector<bool> partitons_;
std::unordered_map<int64_t, TimerSpan> span_map_;
std::string prefix_;
std::string conn_str_;
lcb_t crud_handle_{nullptr};
std::mutex sync_lock;
std::mutex store_lock_;
friend class Iterator;
};
} // namespace timer
Expand Down
4 changes: 1 addition & 3 deletions v8_consumer/include/v8worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ typedef std::chrono::nanoseconds nsecs;

#define SECS_TO_NS 1000 * 1000 * 1000ULL

#define NUM_VBUCKETS 1024

extern int64_t timer_context_size;

using atomic_ptr_t = std::shared_ptr<std::atomic<uint64_t>>;
Expand Down Expand Up @@ -257,7 +255,7 @@ class V8Worker {

void EraseVbFilter(int vb_no);

void UpdateBucketopsSeqno(int vb_no, uint64_t seq_no);
void UpdateBucketopsSeqnoLocked(int vb_no, uint64_t seq_no);

uint64_t GetBucketopsSeqno(int vb_no);

Expand Down
6 changes: 1 addition & 5 deletions v8_consumer/src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ void AppWorker::RouteMessageWithResponse(
if (kSuccess == workers_[worker_index]->ParseMetadata(
worker_msg->header.metadata, vb_no, seq_no)) {
auto lck = workers_[worker_index]->GetAndLockFilterLock();
workers_[worker_index]->UpdateBucketopsSeqno(vb_no, seq_no);
workers_[worker_index]->UpdateBucketopsSeqnoLocked(vb_no, seq_no);
}
}
break;
Expand Down Expand Up @@ -794,10 +794,6 @@ void AppWorker::RouteMessageWithResponse(
for (size_t idx = 0; idx < thr_count_; ++idx) {
auto worker = workers_[idx];
worker->UpdatePartitions(partitions[idx]);
std::unique_ptr<WorkerMessage> msg(new WorkerMessage);
msg->header.event = eInternal + 1;
msg->header.opcode = oUpdateVbMap;
worker->PushFront(std::move(msg));
}
std::ostringstream oss;
std::copy(vbuckets.begin(), vbuckets.end(),
Expand Down
18 changes: 8 additions & 10 deletions v8_consumer/src/v8worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -495,12 +495,6 @@ void V8Worker::RouteMessage() {
}
break;
}
case oUpdateVbMap: {
if (timer_store_) {
timer_store_->SyncSpan();
}
break;
}
default:
LOG(logError) << "Received invalid internal opcode" << std::endl;
break;
Expand Down Expand Up @@ -534,7 +528,7 @@ void V8Worker::UpdateSeqNumLocked(const int vb, const uint64_t seq_num) {
currently_processed_vb_ = vb;
currently_processed_seqno_ = seq_num;
vb_seq_[vb]->store(seq_num, std::memory_order_seq_cst);
UpdateBucketopsSeqno(vb, seq_num);
processed_bucketops_[vb] = seq_num;
}

void V8Worker::HandleDeleteEvent(const std::unique_ptr<WorkerMessage> &msg) {
Expand Down Expand Up @@ -1035,6 +1029,9 @@ int V8Worker::ParseMetadataWithAck(const std::string &metadata_str, int &vb_no,

void V8Worker::UpdateVbFilter(int vb_no, uint64_t seq_no) {
vbfilter_map_[vb_no].push_back(seq_no);
if (timer_store_) {
timer_store_->RemovePartition(vb_no);
}
}

uint64_t V8Worker::GetVbFilter(int vb_no) {
Expand All @@ -1051,8 +1048,11 @@ void V8Worker::EraseVbFilter(int vb_no) {
}
}

void V8Worker::UpdateBucketopsSeqno(int vb_no, uint64_t seq_no) {
void V8Worker::UpdateBucketopsSeqnoLocked(int vb_no, uint64_t seq_no) {
processed_bucketops_[vb_no] = seq_no;
if (timer_store_) {
timer_store_->AddPartition(vb_no);
}
}

uint64_t V8Worker::GetBucketopsSeqno(int vb_no) {
Expand Down Expand Up @@ -1137,8 +1137,6 @@ void V8Worker::TaskDurationWatcher() {

void V8Worker::UpdatePartitions(const std::unordered_set<int64_t> &vbuckets) {
partitions_ = vbuckets;
if (timer_store_)
timer_store_->UpdatePartition(vbuckets);
}

std::unordered_set<int64_t> V8Worker::GetPartitions() const {
Expand Down

0 comments on commit 8fb1b10

Please sign in to comment.