Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http://ci-eventing.northscale.in/eventing-22.01.2020-00.41.pass.html

Change-Id: Ieb5aa8abc014188aefb3c37149e80cb44f4375c5
  • Loading branch information
jeelanp2003 committed Jan 22, 2020
2 parents 1b4facb + b70a2e5 commit d04ae06
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 12 deletions.
2 changes: 1 addition & 1 deletion consumer/exported_functions.go
Expand Up @@ -711,13 +711,13 @@ func (c *Consumer) GetAssignedVbs(workerName string) ([]uint16, error) {
// UpdateWorkerQueueMemCap revises the memory cap for cpp worker, dcp and timer queues
func (c *Consumer) UpdateWorkerQueueMemCap(quota int64) {
logPrefix := "Consumer::updateWorkerQueueMemCap"

prevWorkerMemCap := c.workerQueueMemCap
prevDCPFeedMemCap := c.aggDCPFeedMemCap

divisor := int64(2)
c.workerQueueMemCap = (quota / divisor) * 1024 * 1024
c.aggDCPFeedMemCap = (quota / divisor) * 1024 * 1024
c.sendWorkerMemQuota(quota * 1024 * 1024)

logging.Infof("%s [%s:%s:%d] Updated memory quota: %d MB previous worker quota: %d MB dcp feed quota: %d MB",
logPrefix, c.workerName, c.tcpPort, c.Pid(), c.workerQueueMemCap/(1024*1024),
Expand Down
12 changes: 12 additions & 0 deletions consumer/handle_messages.go
Expand Up @@ -126,6 +126,18 @@ func (c *Consumer) sendWorkerThrMap(thrPartitionMap map[int][]uint16, sendToDebu
c.sendMessage(m)
}

func (c *Consumer) sendWorkerMemQuota(memSize int64) {
header, hBuilder := c.makeHeader(appWorkerSetting, workerThreadMemQuota, 0, strconv.FormatInt(memSize, 10))
m := &msgToTransmit{
msg: &message{
Header: header,
},
prioritize: true,
headerBuilder: hBuilder,
}
c.sendMessage(m)
}

func (c *Consumer) SendAssignedVbs() {
logPrefix := "Consumer::SendAssignedVbs"
vbuckets, err := c.GetAssignedVbs(c.ConsumerName())
Expand Down
1 change: 1 addition & 0 deletions consumer/protocol.go
Expand Up @@ -73,6 +73,7 @@ const (
workerThreadPartitionMap
timerContextSize
vbMap
workerThreadMemQuota
)

// message and opcode types for interpreting messages from C++ To Go
Expand Down
1 change: 1 addition & 0 deletions consumer/v8_consumer.go
Expand Up @@ -307,6 +307,7 @@ func (c *Consumer) HandleV8Worker() error {
c.sendLogLevel(c.logLevel, false)
c.sendWorkerThrMap(nil, false)
c.sendWorkerThrCount(0, false)
c.sendWorkerMemQuota(c.aggDCPFeedMemCap * int64(2))
err := util.Retry(util.NewFixedBackoff(clusterOpRetryInterval), c.retryCount, getEventingNodeAddrOpCallback, c)
if err == common.ErrRetryTimeout {
logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid())
Expand Down
20 changes: 19 additions & 1 deletion service_manager/http_handlers.go
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/couchbase/cbauth"
"github.com/couchbase/cbauth/metakv"
"github.com/couchbase/eventing/audit"
"github.com/couchbase/eventing/common"
"github.com/couchbase/eventing/consumer"
Expand Down Expand Up @@ -914,11 +915,28 @@ func (m *ServiceMgr) getRebalanceProgress(w http.ResponseWriter, r *http.Request

// Report back state of rebalance on current node
func (m *ServiceMgr) getRebalanceStatus(w http.ResponseWriter, r *http.Request) {
logPrefix := "ServiceMgr::getRebalanceStatus"
if !m.validateAuth(w, r, EventingPermissionManage) {
return
}

w.Write([]byte(strconv.FormatBool(m.superSup.RebalanceStatus())))
children, err := metakv.ListAllChildren(metakvRebalanceTokenPath)
if err != nil {
logging.Errorf("%s failed to list children from rebalance path. error: %v", logPrefix, err)
w.Write([]byte(strconv.FormatBool(m.superSup.RebalanceStatus())))
return
}

status := false
for _, child := range children {
if string(child.Value) != stopRebalance {
status = true
break
} else {
status = m.superSup.RebalanceStatus()
}
}
w.Write([]byte(strconv.FormatBool(status)))
}

// Report back state of bootstrap on current node
Expand Down
1 change: 1 addition & 0 deletions service_manager/manager.go
Expand Up @@ -722,6 +722,7 @@ func (m *ServiceMgr) onRebalanceDoneLocked(err error) {

m.rebalancer = nil
m.rebalanceCtx = nil
util.Retry(util.NewFixedBackoff(time.Second), nil, cleanupEventingMetaKvPath, metakvRebalanceTokenPath)

m.updateStateLocked(func(s *state) {
s.rebalanceTask = newTask
Expand Down
7 changes: 5 additions & 2 deletions v8_consumer/include/client.h
Expand Up @@ -26,6 +26,7 @@ const size_t MAX_BUF_SIZE = 65536;
const int HEADER_FRAGMENT_SIZE = 4; // uint32
const int PAYLOAD_FRAGMENT_SIZE = 4; // uint32
const int SIZEOF_UINT32 = 4;
const size_t MAX_V8_HEAP_SIZE = 1.4 * 1024 * 1024 * 1024;

int64_t timer_context_size;

Expand Down Expand Up @@ -81,7 +82,7 @@ class AppWorker {

void ReadStdinLoop();

void ScanTimerLoop();
void EventGenLoop();

static void StopUvLoop(uv_async_t *);

Expand All @@ -93,7 +94,9 @@ class AppWorker {
std::thread main_uv_loop_thr_;
std::thread feedback_uv_loop_thr_;
std::thread stdin_read_thr_;
std::thread scan_timer_thr_;
std::thread event_gen_thr_;

size_t memory_quota_;

protected:
void WriteResponseWithRetry(uv_stream_t *handle,
Expand Down
3 changes: 2 additions & 1 deletion v8_consumer/include/commands.h
Expand Up @@ -49,14 +49,15 @@ enum dcp_opcode { oDelete, oMutation, DCP_Opcode_Unknown };

enum filter_opcode { oVbFilter, oProcessedSeqNo, Filter_Opcode_Unknown };

enum internal_opcode { oScanTimer, oUpdateVbMap, Internal_Opcode_Unknown };
enum internal_opcode { oScanTimer, oUpdateV8HeapSize, oRunGc, Internal_Opcode_Unknown };

enum app_worker_setting_opcode {
oLogLevel,
oWorkerThreadCount,
oWorkerThreadMap,
oTimerContextSize,
oVbMap,
oWorkerMemQuota,
App_Worker_Setting_Opcode_Unknown
};

Expand Down
3 changes: 3 additions & 0 deletions v8_consumer/include/v8worker.h
Expand Up @@ -307,6 +307,7 @@ class V8Worker {
std::thread *terminator_thr_;
BlockingDeque<std::unique_ptr<WorkerMessage>> *worker_queue_;

size_t v8_heap_size_;
std::mutex lcb_exception_mtx_;
std::map<int, int64_t> lcb_exceptions_;
IsolateData data_;
Expand All @@ -330,6 +331,8 @@ class V8Worker {
int8_t msg_type, int8_t response_opcode);
bool ExecuteScript(const v8::Local<v8::String> &script);

void UpdateV8HeapSize();
void ForceRunGarbageCollector();
Histogram *latency_stats_;
Histogram *curl_latency_stats_;

Expand Down
50 changes: 43 additions & 7 deletions v8_consumer/src/client.cc
Expand Up @@ -806,6 +806,12 @@ void AppWorker::RouteMessageWithResponse(

LOG(logInfo) << "Updating vbucket map, vbmap :" << oss.str() << std::endl;
} break;

case oWorkerMemQuota: {
memory_quota_ = std::stoll(worker_msg->header.metadata);
msg_priority_ = true;
break;
}
default:
LOG(logError) << "Opcode "
<< getAppWorkerSettingOpcode(worker_msg->header.opcode)
Expand Down Expand Up @@ -950,6 +956,10 @@ AppWorker::~AppWorker() {
feedback_uv_loop_thr_.join();
}

if (event_gen_thr_.joinable()) {
event_gen_thr_.join();
}

if (main_uv_loop_thr_.joinable()) {
main_uv_loop_thr_.join();
}
Expand Down Expand Up @@ -986,27 +996,53 @@ void AppWorker::ReadStdinLoop() {
stdin_read_thr_ = std::move(thr);
}

void AppWorker::ScanTimerLoop() {
void AppWorker::EventGenLoop() {
std::this_thread::sleep_for(std::chrono::seconds(2));
auto evt_generator = [](AppWorker *worker) {
while (!worker->thread_exit_cond_.load()) {
{
std::lock_guard<std::mutex> lck(worker->workers_map_mutex_);
if (worker->using_timer_ && worker->v8worker_init_done_ &&
!worker->pause_consumer_.load()) {

if (worker->v8worker_init_done_ && !worker->pause_consumer_.load()) {
// Scan for timers
if (worker->using_timer_) {
for (auto &v8_worker : worker->workers_) {
std::unique_ptr<WorkerMessage> msg(new WorkerMessage);
msg->header.event = eInternal + 1;
msg->header.opcode = oScanTimer;
v8_worker.second->PushFront(std::move(msg));
}
}

// Update the v8 heap size
for (auto &v8_worker : worker->workers_) {
std::unique_ptr<WorkerMessage> msg(new WorkerMessage);
msg->header.event = eInternal + 1;
msg->header.opcode = oScanTimer;
msg->header.opcode = oUpdateV8HeapSize;
v8_worker.second->PushFront(std::move(msg));
}

// Check for memory growth
int64_t approx_memory = 0;
for (const auto &v8_worker : worker->workers_) {
approx_memory += v8_worker.second->worker_queue_->GetMemory() + v8_worker.second->v8_heap_size_;
}

for (auto &v8_worker : worker->workers_) {
if (v8_worker.second->v8_heap_size_ > MAX_V8_HEAP_SIZE || approx_memory > worker->memory_quota_ * 0.8) {
std::unique_ptr<WorkerMessage> msg(new WorkerMessage);
msg->header.event = eInternal + 1;
msg->header.opcode = oRunGc;
v8_worker.second->PushFront(std::move(msg));
}
}
}
}
std::this_thread::sleep_for(std::chrono::seconds(7));
}
};
std::thread thr(evt_generator, this);
scan_timer_thr_ = std::move(thr);
event_gen_thr_ = std::move(thr);
}

void AppWorker::StopUvLoop(uv_async_t *async) {
Expand Down Expand Up @@ -1106,9 +1142,9 @@ int main(int argc, char **argv) {
}

worker->ReadStdinLoop();
worker->ScanTimerLoop();
worker->EventGenLoop();
worker->stdin_read_thr_.join();
worker->scan_timer_thr_.join();
worker->event_gen_thr_.join();
worker->main_uv_loop_thr_.join();
worker->feedback_uv_loop_thr_.join();

Expand Down
2 changes: 2 additions & 0 deletions v8_consumer/src/commands.cc
Expand Up @@ -80,6 +80,8 @@ app_worker_setting_opcode getAppWorkerSettingOpcode(int8_t opcode) {
return oTimerContextSize;
if (opcode == 5)
return oVbMap;
if (opcode == 6)
return oWorkerMemQuota;
return App_Worker_Setting_Opcode_Unknown;
}

Expand Down
20 changes: 20 additions & 0 deletions v8_consumer/src/v8worker.cc
Expand Up @@ -489,6 +489,14 @@ void V8Worker::RouteMessage() {
}
break;
}
case oUpdateV8HeapSize: {
UpdateV8HeapSize();
break;
}
case oRunGc: {
ForceRunGarbageCollector();
break;
}
default:
LOG(logError) << "Received invalid internal opcode" << std::endl;
break;
Expand Down Expand Up @@ -1176,3 +1184,15 @@ void UpdateCurlLatencyHistogram(
auto w = UnwrapData(isolate)->v8worker;
w->UpdateCurlLatencyHistogram(start);
}

void V8Worker::UpdateV8HeapSize() {
v8::HeapStatistics stats;
v8::Locker locker(isolate_);
isolate_->GetHeapStatistics(&stats);
v8_heap_size_ = stats.total_heap_size();
}

void V8Worker::ForceRunGarbageCollector() {
v8::Locker locker(isolate_);
isolate_->LowMemoryNotification();
}

0 comments on commit d04ae06

Please sign in to comment.