Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
Change-Id: I7f8cdc4a7969de512882ea3d384693a1fdc88282
  • Loading branch information
abhijpes committed Jan 20, 2022
2 parents 0308037 + ad0ce7a commit eeaeb64
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 14 deletions.
3 changes: 3 additions & 0 deletions common/common.go
Expand Up @@ -265,6 +265,7 @@ type EventingProducer interface {
SetTrapEvent(value bool)
TimerDebugStats() map[int]map[string]interface{}
UndeployHandler(skipMetaCleanup bool)
UpdateEncryptionLevel(enforceTLS, encryptOn bool)
UpdateMemoryQuota(quota int64)
UsingTimer() bool
VbDcpEventsRemainingToProcess() map[int]int64
Expand Down Expand Up @@ -325,6 +326,7 @@ type EventingConsumer interface {
String() string
TimerDebugStats() map[int]map[string]interface{}
NotifyPrepareTopologyChange(keepNodes, ejectNodes []string)
UpdateEncryptionLevel(enforceTLS, encryptOn bool)
UpdateWorkerQueueMemCap(quota int64)
VbDcpEventsRemainingToProcess() map[int]int64
VbEventingNodeAssignMapUpdate(map[uint16]string)
Expand Down Expand Up @@ -392,6 +394,7 @@ type EventingSuperSup interface {
SpanBlobDump(appName string) (interface{}, error)
StopProducer(appName string, skipMetaCleanup bool, updateMetakv bool)
TimerDebugStats(appName string) (map[int]map[string]interface{}, error)
UpdateEncryptionLevel(enforceTLS, encryptOn bool)
VbDcpEventsRemainingToProcess(appName string) map[int]int64
VbDistributionStatsFromMetadata(appName string) map[string]map[string]string
VbSeqnoStats(appName string) (map[int][]map[string]interface{}, error)
Expand Down
7 changes: 7 additions & 0 deletions consumer/exported_functions.go
Expand Up @@ -822,6 +822,13 @@ func (c *Consumer) PauseConsumer() {
c.WorkerVbMapUpdate(nil)
}

func (c *Consumer) UpdateEncryptionLevel(enforceTLS, encryptOn bool) {
logPrefix := "Consumer::UpdateEncryptionLevel"
logging.Infof("%s [%s:%s:%d] Received notification to update encryption level to: enforceTLS: %v encryptOn: %v",
logPrefix, c.workerName, c.tcpPort, c.Pid(), enforceTLS, encryptOn)
c.sendUpdateEncryptionLevel(enforceTLS, encryptOn)
}

func (c *Consumer) NotifyWorker() {
atomic.StoreUint32(&c.notifyWorker, 1)
}
Expand Down
18 changes: 18 additions & 0 deletions consumer/handle_messages.go
Expand Up @@ -575,6 +575,24 @@ func (c *Consumer) sendPauseConsumer() {
logPrefix, c.workerName, c.tcpPort, c.Pid())
}

func (c *Consumer) sendUpdateEncryptionLevel(enforceTLS, encryptOn bool) {
logPrefix := "Consumer::sendUpdateEncryptionLevel"
level := c.getEncryptionLevelName(enforceTLS, encryptOn)
encryptHeader, hBuilder := c.makeHeader(configChange, updateEncryptionLevel, 0, level)
msg := &msgToTransmit{
msg: &message{
Header: encryptHeader,
},
sendToDebugger: false,
prioritize: true,
headerBuilder: hBuilder,
}

c.sendMessage(msg)
logging.Infof("%s [%s:%s:%d] Sending encryption change message to consumer",
logPrefix, c.workerName, c.tcpPort, c.Pid())
}

func (c *Consumer) sendUpdateProcessedSeqNo(vb uint16, seqNo uint64) {
logPrefix := "Consumer::sendUpdateProcessedSeqNo"

Expand Down
20 changes: 14 additions & 6 deletions consumer/protocol.go
Expand Up @@ -81,6 +81,7 @@ const (
const (
configOpcode int8 = iota
updateFeatureMatrix
updateEncryptionLevel
)

// message and opcode types for interpreting messages from C++ To Go
Expand Down Expand Up @@ -153,6 +154,7 @@ func (c *Consumer) makeVbFilterHeader(partition int16, meta string) ([]byte, *fl
func (c *Consumer) makePauseConsumerHeader() ([]byte, *flatbuffers.Builder) {
return c.makeHeader(pauseConsumer, 0, 0, "")
}

func (c *Consumer) makeProcessedSeqNoHeader(partition int16, meta string) ([]byte, *flatbuffers.Builder) {
return c.filterEventHeader(processedSeqNo, partition, meta)
}
Expand Down Expand Up @@ -334,16 +336,21 @@ func (c *Consumer) makeV8InitPayload(appName, debuggerPort, currHost, eventingDi
n1qlConsistency := builder.CreateString(c.n1qlConsistency)
languageCompatibility := builder.CreateString(c.languageCompatibility)
certFile := builder.CreateString("")
encryptionLevel := builder.CreateString("control_or_off")

var securitySetting *common.SecuritySetting
if c.superSup != nil {
securitySetting = c.superSup.GetSecuritySetting()
if securitySetting != nil && securitySetting.EncryptData == true {
if len(securitySetting.CAFile) > 0 {
certFile = builder.CreateString(securitySetting.CAFile)
} else {
certFile = builder.CreateString(securitySetting.CertFile)
if securitySetting != nil {
if securitySetting.EncryptData {
if len(securitySetting.CAFile) > 0 {
certFile = builder.CreateString(securitySetting.CAFile)
} else {
certFile = builder.CreateString(securitySetting.CertFile)
}
}

level := c.getEncryptionLevelName(securitySetting.DisableNonSSLPorts, securitySetting.EncryptData)
encryptionLevel = builder.CreateString(level)
}
}

Expand Down Expand Up @@ -384,6 +391,7 @@ func (c *Consumer) makeV8InitPayload(appName, debuggerPort, currHost, eventingDi
payload.PayloadAddBucketCacheSize(builder, c.bucketCacheSize)
payload.PayloadAddBucketCacheAge(builder, c.bucketCacheAge)
payload.PayloadAddCertFile(builder, certFile)
payload.PayloadAddEncryptionLevel(builder, encryptionLevel)

if c.n1qlPrepareAll {
payload.PayloadAddN1qlPrepareAll(builder, 0x1)
Expand Down
11 changes: 11 additions & 0 deletions consumer/util.go
Expand Up @@ -117,3 +117,14 @@ func (c *Consumer) checkBinaryDocAllowed() bool {
binDocSupportVersion := common.CouchbaseVerMap["6.6.2"]
return langCompatibility.Compare(binDocSupportVersion)
}

func (c *Consumer) getEncryptionLevelName(enforceTLS, encryptOn bool) string {
encryptionLevel := "control_or_off"
if encryptOn {
encryptionLevel = "all"
if enforceTLS {
encryptionLevel = "strict"
}
}
return encryptionLevel
}
8 changes: 3 additions & 5 deletions features/include/lcb_utils.h
Expand Up @@ -86,21 +86,19 @@ RetryLcbCommand(lcb_INSTANCE *instance, CmdType &cmd, int max_retry_count,
int retry_count = 1;
std::pair<lcb_STATUS, Result> result;
auto start = GetUnixTime();

while (true) {
result = callable(instance, cmd);

if ((result.first == LCB_SUCCESS && result.second.rc == LCB_SUCCESS) ||
(!IsRetriable(result.first) && !IsRetriable(result.second.rc)) ||
(max_retry_count && retry_count >= max_retry_count))
break;
(max_retry_count && retry_count >= max_retry_count)) {
break;
}

if (max_retry_secs > 0) {
auto now = GetUnixTime();
if (now - start >= max_retry_secs)
break;
}

++retry_count;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
Expand Down
49 changes: 49 additions & 0 deletions features/include/lcbinstance.h
@@ -0,0 +1,49 @@
#ifndef LCBINSTANCE_H
#define LCBINSTANCE_H

#include "utils.h"
#include "lcb_utils.h"
#include <libcouchbase/couchbase.h>

class LcbInstance {
public:
LcbInstance &operator=(const LcbInstance &) = delete;
LcbInstance(LcbInstance &conn) = delete;
LcbInstance() { skip_lcb_ops_.store(false); }

LcbInstance &operator=(LcbInstance &&conn) {
if (this != &conn) {
std::swap(handle_, conn.handle_);
}
return *this;
}

LcbInstance(LcbInstance &&conn) { std::swap(handle_, conn.handle_); }

~LcbInstance() {
if (handle_ != nullptr) {
lcb_destroy(handle_);
}
}

void SetSkipLcbOps() {
skip_lcb_ops_.store(true);
}

void ResetSkipLcbOps() {
skip_lcb_ops_.store(false);
}

template <typename CmdType, typename Callable>
std::pair<lcb_STATUS, Result> Execute(CmdType &cmd, Callable &&callable, int max_retry_count, uint32_t max_retry_secs) {
if (skip_lcb_ops_.load() == true || handle_ == nullptr) {
return {LCB_ERR_GENERIC, Result()};
}
return RetryLcbCommand(handle_, *cmd, max_retry_count, max_retry_secs, callable);
}
lcb_INSTANCE *handle_{nullptr};
private:
std::atomic<bool> skip_lcb_ops_{false};
};

#endif
1 change: 1 addition & 0 deletions flatbuf/payload.fbs
Expand Up @@ -58,6 +58,7 @@ table Payload {
curl_max_allowed_resp_size:int64; // max allowed size of curl response
lcb_timeout:int;
certFile:string; // TLS certFile, null string if encryption is disabled
encryption_level:string; // encryption level, at the time this consumer is being created
}

root_type Payload;
6 changes: 6 additions & 0 deletions producer/exported_functions.go
Expand Up @@ -861,6 +861,12 @@ func (p *Producer) cleanupMetadataImpl(id int, vbsToCleanup []uint16, undeployWG
return nil
}

func (p *Producer) UpdateEncryptionLevel(enforceTLS, encryptOn bool) {
for _, c := range p.getConsumers() {
go c.UpdateEncryptionLevel(enforceTLS, encryptOn)
}
}

// UpdateMemoryQuota allows tuning of memory quota for Eventing
func (p *Producer) UpdateMemoryQuota(quota int64) {
logPrefix := "Producer::UpdateMemoryQuota"
Expand Down
1 change: 1 addition & 0 deletions service_manager/manager.go
Expand Up @@ -389,6 +389,7 @@ func (m *ServiceMgr) initService() {
RootCAs: rootCertPool}
m.configMutex.RUnlock()
util.SetSecurityConfig(setting)
// Pushes encryption level updates to consumer
m.superSup.SetSecuritySetting(setting)
return nil
}
Expand Down
14 changes: 13 additions & 1 deletion supervisor/exported_functions.go
Expand Up @@ -679,8 +679,14 @@ func (s *SuperSupervisor) CheckLifeCycleOpsDuringRebalance() bool {

// SetSecuritySetting Sets the new security settings and returns whether reload is required or not
func (s *SuperSupervisor) SetSecuritySetting(setting *common.SecuritySetting) bool {
defer func() {
s.securityMutex.Unlock()
if setting != nil {
// Push encryption change notifications to all consumers of all producers
s.UpdateEncryptionLevel(setting.DisableNonSSLPorts, setting.EncryptData)
}
}()
s.securityMutex.Lock()
defer s.securityMutex.Unlock()
if s.securitySetting != nil {
// TODO: 7.0.1 Change return value based on EncryptData and DisableNonSSLPorts since both can change
if s.securitySetting.EncryptData == false && setting.EncryptData == false {
Expand Down Expand Up @@ -750,3 +756,9 @@ func (s *SuperSupervisor) GetBSCSnapshot() (map[string]map[string][]string, erro
}
return snapshot, nil
}

func (s *SuperSupervisor) UpdateEncryptionLevel(enforceTLS, encryptOn bool) {
for _, p := range s.runningFns() {
p.UpdateEncryptionLevel(enforceTLS, encryptOn)
}
}
2 changes: 1 addition & 1 deletion v8_consumer/include/commands.h
Expand Up @@ -71,7 +71,7 @@ enum timer_opcode { oTimer, oCronTimer, Timer_Opcode_Unknown };

enum debugger_opcode { oDebuggerStart, oDebuggerStop, Debugger_Opcode_Unknown };

enum config_opcode { oUpdateDisableFeatureList, Config_Opcode_Unknown };
enum config_opcode { oUpdateDisableFeatureList, oUpdateEncryptionLevel, Config_Opcode_Unknown };

event_type getEvent(int8_t event);
v8_worker_opcode getV8WorkerOpcode(int8_t opcode);
Expand Down
16 changes: 15 additions & 1 deletion v8_consumer/include/timer_store.h
Expand Up @@ -24,6 +24,7 @@
#include "timer_iterator.h"
#include "utils.h"
#include "v8worker.h"
#include "lcbinstance.h"

namespace timer {
class TimerStore {
Expand Down Expand Up @@ -53,6 +54,18 @@ class TimerStore {

lcb_INSTANCE *GetTimerStoreHandle() const;

void SetFailFastTimerScans() {
if (crud_handle_ != nullptr && connected_.load()) {
crud_handle_->SetSkipLcbOps();
}
}

void ResetFailFastTimerScans() {
if (crud_handle_ != nullptr && connected_.load()) {
crud_handle_->ResetSkipLcbOps();
}
}

private:
void Connect();
std::pair<bool, lcb_STATUS> InitSpan(int partition);
Expand Down Expand Up @@ -108,7 +121,8 @@ class TimerStore {
std::string metadata_scope_;
std::string metadata_collection_;
size_t metadata_collection_length_, metadata_scope_length_;
lcb_INSTANCE *crud_handle_{nullptr};
std::unique_ptr<LcbInstance> crud_handle_{nullptr};
std::atomic<bool> connected_{false};
std::mutex store_lock_;
int32_t num_vbuckets_{1024};
int32_t timer_reduction_ratio_{1};
Expand Down
4 changes: 4 additions & 0 deletions v8_consumer/include/v8worker.h
Expand Up @@ -294,6 +294,10 @@ class V8Worker {

void StopTimerScan();

void SetFailFastTimerScans();

void ResetFailFastTimerScans();

void UpdatePartitions(const std::unordered_set<int64_t> &vbuckets);

std::unordered_set<int64_t> GetPartitions() const;
Expand Down
28 changes: 28 additions & 0 deletions v8_consumer/src/client.cc
Expand Up @@ -13,6 +13,7 @@
#include <string>
#include <thread>

#include "lcb_utils.h"
#include "breakpad.h"
#include "bucket_cache.h"
#include "client.h"
Expand All @@ -22,6 +23,7 @@
uint64_t timer_responses_sent(0);
uint64_t messages_parsed(0);

std::string curr_encryption_level("control_or_off");
std::atomic<int64_t> e_app_worker_setting_lost = {0};
std::atomic<int64_t> e_dcp_lost = {0};
std::atomic<int64_t> e_debugger_lost = {0};
Expand Down Expand Up @@ -561,6 +563,7 @@ void AppWorker::RouteMessageWithResponse(
payload->curr_eventing_sslport()->str());
server_settings->host_addr.assign(payload->curr_host()->str());
server_settings->certFile.assign(payload->certFile()->str());
curr_encryption_level = payload->encryption_level()->str();

handler_instance_id = payload->function_instance_id()->str();
handler_config->curl_max_allowed_resp_size =
Expand Down Expand Up @@ -772,6 +775,7 @@ void AppWorker::RouteMessageWithResponse(
case ePauseConsumer: {
pause_consumer_.store(true);
std::unordered_map<int64_t, uint64_t> lps_map;
LOG(logInfo) << "Received pause event from Go" << std::endl;
for (int16_t idx = 0; idx < thr_count_; ++idx) {
auto worker = workers_[idx];
auto lck = worker->GetAndLockBucketOpsLock();
Expand All @@ -784,6 +788,7 @@ void AppWorker::RouteMessageWithResponse(
lps_map[vb] = lps;
}
}
LOG(logInfo) << "Pause event processing complete. Sending ack" << std::endl;
SendPauseAck(lps_map);
} break;
case eApp_Worker_Setting:
Expand Down Expand Up @@ -925,6 +930,29 @@ void AppWorker::RouteMessageWithResponse(
v8_worker.second->PushFront(std::move(msg));
}
break;
case oUpdateEncryptionLevel: {
auto new_encryption_level = worker_msg->header.metadata;
if (curr_encryption_level != new_encryption_level) {
LOG(logInfo) << "Encryption level changed from " << curr_encryption_level
<< " to " << new_encryption_level << std::endl;
if (new_encryption_level == "strict" && curr_encryption_level == "control_or_off") {
// No point in allowing timer store lcb handles to attempt store operations
LOG(logInfo) << "Disabling timer store lcb handle ops" << std::endl;
for (auto &v8_worker : workers_)
v8_worker.second->SetFailFastTimerScans();
} else if (curr_encryption_level == "strict") {
// New level will definitely be more relaxed.
// Allow timer store lcb handles to attempt store operations
// This is a noop if handles are already in their correct state
LOG(logInfo) << "Re-enabling timer store lcb handle ops if disabled before" << std::endl;
for (auto &v8_worker : workers_)
v8_worker.second->ResetFailFastTimerScans();
}
}
// TODO : Use the flags to repair lcb handles
curr_encryption_level = new_encryption_level;
break;
}
default:
LOG(logError) << "Received invalid debugger opcode" << std::endl;
break;
Expand Down
2 changes: 2 additions & 0 deletions v8_consumer/src/commands.cc
Expand Up @@ -114,5 +114,7 @@ debugger_opcode getDebuggerOpcode(int8_t opcode) {
config_opcode getConfigOpcode(int8_t opcode) {
if (opcode == 1)
return oUpdateDisableFeatureList;
if (opcode == 2)
return oUpdateEncryptionLevel;
return Config_Opcode_Unknown;
}

0 comments on commit eeaeb64

Please sign in to comment.