Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http: //ci-eventing.northscale.in/eventing-17.05.2021-12.09.pass.html
Change-Id: Id3f1608953e01a5b7140ed696097c2d72ad272ac
  • Loading branch information
AnkitPrabhu committed May 18, 2021
2 parents 56fbfd8 + 1b9915e commit deeb89a
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 38 deletions.
2 changes: 0 additions & 2 deletions consumer/process_events.go
Expand Up @@ -284,13 +284,11 @@ func (c *Consumer) processDCPEvents() {
}
case mcd.DCP_STREAMEND:
logging.Infof("%s [%s:%s:%d] vb: %d got STREAMEND", logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket)

c.vbProcessingStats.updateVbStat(e.VBucket, "vb_stream_request_metadata_updated", false)
lastReadSeqNo := c.vbProcessingStats.getVbStat(e.VBucket, "last_read_seq_no").(uint64)
c.vbProcessingStats.updateVbStat(e.VBucket, "seq_no_at_stream_end", lastReadSeqNo)
c.vbProcessingStats.updateVbStat(e.VBucket, "timestamp", time.Now().Format(time.RFC3339))
lastSentSeqNo := c.vbProcessingStats.getVbStat(e.VBucket, "last_sent_seq_no").(uint64)

if lastSentSeqNo == 0 {
logging.Infof("STREAMEND without streaming any mutation last_read_seqno: %d last_sent_seqno: %d", lastReadSeqNo, lastSentSeqNo)
c.handleStreamEnd(e.VBucket, lastReadSeqNo)
Expand Down
32 changes: 32 additions & 0 deletions supervisor/super_supervisor.go
Expand Up @@ -503,6 +503,7 @@ func (s *SuperSupervisor) TopologyChangeNotifCallback(path string, value []byte,
if _, ok := s.bootstrappingApps[appName]; ok {
logging.Infof("%s [%d] Function: %s already bootstrapping", logPrefix, s.runningFnsCount(), appName)
s.appListRWMutex.Unlock()
s.waitAndNotifyTopologyChange(appName, topologyChangeMsg)
continue
}

Expand Down Expand Up @@ -875,3 +876,34 @@ func (s *SuperSupervisor) isFnRunningFromPrimary(appName string) (bool, error) {

return false, fmt.Errorf("function not running")
}

func (s *SuperSupervisor) waitAndNotifyTopologyChange(appName string, topologyChangeMsg *common.TopologyChangeMsg) {
logPrefix := "SuperSupervisor::waitAndNotifyTopologyChange"
maxWaitTime := 2 * s.servicesNotifierRetryTm
ticker := time.NewTicker(time.Duration(1) * time.Second)
stopTicker := time.NewTicker(time.Duration(maxWaitTime) * time.Minute)
defer func() {
stopTicker.Stop()
ticker.Stop()
}()

for {
select {
case <-ticker.C:
s.appListRWMutex.RLock()
_, present := s.bootstrappingApps[appName]
s.appListRWMutex.RUnlock()
if present {
continue
}

if eventingProducer, ok := s.runningFns()[appName]; ok {
eventingProducer.NotifyTopologyChange(topologyChangeMsg)
}
return
case <-stopTicker.C:
logging.Errorf("%s: App %s not able to bootstrap in %v minutes", logPrefix, appName, maxWaitTime)
return
}
}
}
4 changes: 2 additions & 2 deletions v8_consumer/include/client.h
Expand Up @@ -169,7 +169,8 @@ class AppWorker {

int32_t num_vbuckets_{1024};

std::map<int16_t, int16_t> partition_thr_map_;
std::map<int16_t, int16_t> elected_partition_thr_map_;
std::map<int16_t, int16_t> current_partition_thr_map_;

// Controls the number of virtual partitions, in order to shard work among
// worker threads
Expand Down Expand Up @@ -197,7 +198,6 @@ class AppWorker {
std::mutex workers_map_mutex_;

std::shared_ptr<vb_seq_map_t> vb_seq_;
std::shared_ptr<std::vector<std::vector<uint64_t>>> vbfilter_map_;
std::shared_ptr<std::vector<uint64_t>> processed_bucketops_;
std::shared_ptr<vb_lock_map_t> vb_locks_;
};
Expand Down
8 changes: 4 additions & 4 deletions v8_consumer/include/v8worker.h
Expand Up @@ -233,8 +233,7 @@ class V8Worker {
const std::string &user_prefix, Histogram *latency_stats,
Histogram *curl_latency_stats, const std::string &ns_server_port,
const int32_t &num_vbuckets, vb_seq_map_t *vb_seq,
std::vector<std::vector<uint64_t>> *vbfilter_map,
std::vector<uint64_t> *processed_bucketops, vb_lock_map_t *vb_locks);
std::vector<uint64_t> *processed_bucketops, vb_lock_map_t *vb_locks, int worker_idx);
~V8Worker();

int V8WorkerLoad(std::string source_s);
Expand Down Expand Up @@ -278,7 +277,7 @@ class V8Worker {

uint64_t GetBucketopsSeqno(int vb_no);

std::unique_lock<std::mutex> GetAndLockFilterLock();
std::unique_lock<std::mutex> GetAndLockBucketOpsLock();

int ParseMetadata(const std::string &metadata, int &vb_no,
uint64_t &seq_no) const;
Expand Down Expand Up @@ -382,8 +381,9 @@ class V8Worker {

vb_seq_map_t *vb_seq_;
vb_lock_map_t *vb_locks_;
int worker_idx_;

std::vector<std::vector<uint64_t>> *vbfilter_map_;
std::vector<std::vector<uint64_t>> vbfilter_map_;
std::vector<uint64_t> *processed_bucketops_;
std::mutex bucketops_lock_;

Expand Down
44 changes: 21 additions & 23 deletions v8_consumer/src/client.cc
Expand Up @@ -288,8 +288,6 @@ void AppWorker::InitVbMapResources() {
(*vb_seq_)[i] = atomic_ptr_t(new std::atomic<uint64_t>(0));
(*vb_locks_)[i] = new std::mutex();
}
vbfilter_map_ =
std::make_shared<std::vector<std::vector<uint64_t>>>(num_vbuckets_);
processed_bucketops_ =
std::make_shared<std::vector<uint64_t>>(num_vbuckets_, 0);
}
Expand Down Expand Up @@ -500,7 +498,7 @@ void AppWorker::RouteMessageWithResponse(
server_settings_t *server_settings;
handler_config_t *handler_config;

int worker_index;
int16_t worker_index;
nlohmann::json estats;
std::map<int, int64_t> agg_lcb_exceptions;
std::string handler_instance_id;
Expand Down Expand Up @@ -576,8 +574,8 @@ void AppWorker::RouteMessageWithResponse(
platform.release(), handler_config, server_settings,
function_name_, function_id_, handler_instance_id, user_prefix_,
&latency_stats_, &curl_latency_stats_, ns_server_port_,
num_vbuckets_, vb_seq_.get(), vbfilter_map_.get(),
processed_bucketops_.get(), vb_locks_.get());
num_vbuckets_, vb_seq_.get(),
processed_bucketops_.get(), vb_locks_.get(), i);

LOG(logInfo) << "Init index: " << i << " V8Worker: " << w
<< std::endl;
Expand Down Expand Up @@ -681,7 +679,7 @@ void AppWorker::RouteMessageWithResponse(
case eDCP:
switch (getDCPOpcode(worker_msg->header.opcode)) {
case oDelete:
worker_index = partition_thr_map_[worker_msg->header.partition];
worker_index = current_partition_thr_map_[worker_msg->header.partition];
if (workers_[worker_index] != nullptr) {
enqueued_dcp_delete_msg_counter++;
workers_[worker_index]->PushBack(std::move(worker_msg));
Expand All @@ -692,7 +690,7 @@ void AppWorker::RouteMessageWithResponse(
}
break;
case oMutation:
worker_index = partition_thr_map_[worker_msg->header.partition];
worker_index = current_partition_thr_map_[worker_msg->header.partition];
if (workers_[worker_index] != nullptr) {
enqueued_dcp_mutation_msg_counter++;
workers_[worker_index]->PushBack(std::move(worker_msg));
Expand All @@ -703,7 +701,7 @@ void AppWorker::RouteMessageWithResponse(
}
break;
case oNoOp:
worker_index = partition_thr_map_[worker_msg->header.partition];
worker_index = current_partition_thr_map_[worker_msg->header.partition];
if (workers_[worker_index] != nullptr) {
workers_[worker_index]->PushBack(std::move(worker_msg));
}
Expand All @@ -718,18 +716,17 @@ void AppWorker::RouteMessageWithResponse(
case eFilter:
switch (getFilterOpcode(worker_msg->header.opcode)) {
case oVbFilter: {

worker_index = partition_thr_map_[worker_msg->header.partition];
auto worker = workers_[worker_index];
if (worker != nullptr) {
worker_index = current_partition_thr_map_[worker_msg->header.partition];
if (workers_[worker_index] != nullptr) {
LOG(logInfo) << "Received filter event from Go "
<< worker_msg->header.metadata << std::endl;
auto worker = workers_[worker_index];
int vb_no = 0, skip_ack = 0;
uint64_t filter_seq_no = 0;
if (kSuccess ==
worker->ParseMetadataWithAck(worker_msg->header.metadata, vb_no,
filter_seq_no, skip_ack, true)) {
auto lck = worker->GetAndLockFilterLock();
auto lck = worker->GetAndLockBucketOpsLock();
auto last_processed_seq_no = worker->GetBucketopsSeqno(vb_no);
if (last_processed_seq_no < filter_seq_no) {
worker->UpdateVbFilter(vb_no, filter_seq_no);
Expand All @@ -745,15 +742,16 @@ void AppWorker::RouteMessageWithResponse(
}
} break;
case oProcessedSeqNo:
worker_index = partition_thr_map_[worker_msg->header.partition];
worker_index = elected_partition_thr_map_[worker_msg->header.partition];
if (workers_[worker_index] != nullptr) {
LOG(logInfo) << "Received update processed seq_no event from Go "
<< worker_msg->header.metadata << std::endl;
current_partition_thr_map_[worker_msg->header.partition] = worker_index;
int vb_no = 0;
uint64_t seq_no = 0;
if (kSuccess == workers_[worker_index]->ParseMetadata(
worker_msg->header.metadata, vb_no, seq_no)) {
auto lck = workers_[worker_index]->GetAndLockFilterLock();
auto lck = workers_[worker_index]->GetAndLockBucketOpsLock();
workers_[worker_index]->UpdateBucketopsSeqnoLocked(vb_no, seq_no);
workers_[worker_index]->AddTimerPartition(vb_no);
}
Expand All @@ -770,7 +768,7 @@ void AppWorker::RouteMessageWithResponse(
std::unordered_map<int64_t, uint64_t> lps_map;
for (int16_t idx = 0; idx < thr_count_; ++idx) {
auto worker = workers_[idx];
auto lck = worker->GetAndLockFilterLock();
auto lck = worker->GetAndLockBucketOpsLock();
worker->StopTimerScan();
auto partitions = worker->GetPartitions();
for (auto vb : partitions) {
Expand Down Expand Up @@ -812,7 +810,7 @@ void AppWorker::RouteMessageWithResponse(
for (unsigned int j = 0; j < thr_map->Get(i)->partitions()->size();
j++) {
auto p_id = thr_map->Get(i)->partitions()->Get(j);
partition_thr_map_[p_id] = thread_id;
elected_partition_thr_map_[p_id] = thread_id;
}
}
msg_priority_ = true;
Expand Down Expand Up @@ -843,10 +841,10 @@ void AppWorker::RouteMessageWithResponse(
for (int vbIdx = 0; vbIdx < numVbs;) {
auto idx = vbIdx;
for (; idx < vbIdx + vbsPerThread; idx++) {
partition_thr_map_[vbuckets[idx]] = threadId;
elected_partition_thr_map_[vbuckets[idx]] = threadId;
}
if (chuncks > 0) {
partition_thr_map_[vbuckets[idx]] = threadId;
elected_partition_thr_map_[vbuckets[idx]] = threadId;
chuncks--;
idx++;
}
Expand Down Expand Up @@ -884,7 +882,7 @@ void AppWorker::RouteMessageWithResponse(
case eDebugger:
switch (getDebuggerOpcode(worker_msg->header.opcode)) {
case oDebuggerStart:
worker_index = partition_thr_map_[worker_msg->header.partition];
worker_index = elected_partition_thr_map_[worker_msg->header.partition];
if (workers_[worker_index] != nullptr) {
workers_[worker_index]->PushBack(std::move(worker_msg));
msg_priority_ = true;
Expand All @@ -894,7 +892,7 @@ void AppWorker::RouteMessageWithResponse(
}
break;
case oDebuggerStop:
worker_index = partition_thr_map_[worker_msg->header.partition];
worker_index = elected_partition_thr_map_[worker_msg->header.partition];
if (workers_[worker_index] != nullptr) {
workers_[worker_index]->PushBack(std::move(worker_msg));
msg_priority_ = true;
Expand Down Expand Up @@ -1144,8 +1142,8 @@ std::vector<std::unordered_set<int64_t>>
AppWorker::PartitionVbuckets(const std::vector<int64_t> &vbuckets) const {
std::vector<std::unordered_set<int64_t>> partitions(thr_count_);
for (auto vb : vbuckets) {
auto it = partition_thr_map_.find(vb);
if (it != end(partition_thr_map_)) {
auto it = elected_partition_thr_map_.find(vb);
if (it != end(elected_partition_thr_map_)) {
partitions[it->second].insert(vb);
}
}
Expand Down
14 changes: 7 additions & 7 deletions v8_consumer/src/v8worker.cc
Expand Up @@ -266,14 +266,13 @@ V8Worker::V8Worker(
const std::string &user_prefix, Histogram *latency_stats,
Histogram *curl_latency_stats, const std::string &ns_server_port,
const int32_t &num_vbuckets, vb_seq_map_t *vb_seq,
std::vector<std::vector<uint64_t>> *vbfilter_map,
std::vector<uint64_t> *processed_bucketops, vb_lock_map_t *vb_locks)
std::vector<uint64_t> *processed_bucketops, vb_lock_map_t *vb_locks, int worker_idx)
: app_name_(h_config->app_name), settings_(server_settings),
num_vbuckets_(num_vbuckets),
timer_reduction_ratio_(
int(num_vbuckets / h_config->num_timer_partitions)),
latency_stats_(latency_stats), curl_latency_stats_(curl_latency_stats),
vb_seq_(vb_seq), vb_locks_(vb_locks), vbfilter_map_(vbfilter_map),
vb_seq_(vb_seq), vb_locks_(vb_locks), worker_idx_(worker_idx),
processed_bucketops_(processed_bucketops), platform_(platform),
function_name_(function_name), function_id_(function_id),
user_prefix_(user_prefix), ns_server_port_(ns_server_port),
Expand All @@ -293,6 +292,7 @@ V8Worker::V8Worker(
scan_timer_.store(false);
update_v8_heap_.store(false);
run_gc_.store(false);
vbfilter_map_ = std::vector<std::vector<uint64_t>>(num_vbuckets_);

v8::Isolate::CreateParams create_params;
create_params.array_buffer_allocator =
Expand Down Expand Up @@ -1288,13 +1288,13 @@ int V8Worker::ParseMetadataWithAck(const std::string &metadata_str, int &vb_no,

void V8Worker::UpdateVbFilter(int vb_no, uint64_t seq_no) {
auto lock = GetAndLockVbLock(vb_no);
(*vbfilter_map_)[vb_no].push_back(seq_no);
vbfilter_map_[vb_no].push_back(seq_no);
lock.unlock();
}

uint64_t V8Worker::GetVbFilter(int vb_no) {
auto lock = GetAndLockVbLock(vb_no);
auto &filters = (*vbfilter_map_)[vb_no];
auto &filters = vbfilter_map_[vb_no];
lock.unlock();
if (filters.empty())
return 0;
Expand All @@ -1303,7 +1303,7 @@ uint64_t V8Worker::GetVbFilter(int vb_no) {

void V8Worker::EraseVbFilter(int vb_no) {
auto lock = GetAndLockVbLock(vb_no);
auto &filters = (*vbfilter_map_)[vb_no];
auto &filters = vbfilter_map_[vb_no];
lock.unlock();
if (!filters.empty()) {
filters.erase(filters.begin());
Expand Down Expand Up @@ -1446,7 +1446,7 @@ lcb_INSTANCE *V8Worker::GetTimerLcbHandle() const {
return timer_store_->GetTimerStoreHandle();
}

std::unique_lock<std::mutex> V8Worker::GetAndLockFilterLock() {
std::unique_lock<std::mutex> V8Worker::GetAndLockBucketOpsLock() {
return std::unique_lock<std::mutex>(bucketops_lock_);
}

Expand Down

0 comments on commit deeb89a

Please sign in to comment.