Skip to content

Commit

Permalink
Merge pull request #4782 from sfc-gh-ljoswiak/fixes/global-config-pro…
Browse files Browse the repository at this point in the history
…cesses

Fix global config not triggering changes on server processes
  • Loading branch information
sfc-gh-mpilman committed May 8, 2021
2 parents 2fdadc8 + 3f6ef14 commit ccfc48d
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 83 deletions.
22 changes: 11 additions & 11 deletions fdbclient/ActorLineageProfiler.cpp
Expand Up @@ -296,17 +296,6 @@ boost::asio::io_context& ActorLineageProfilerT::context() {

SampleIngestor::~SampleIngestor() {}

// Callback used to update the sampling profilers run frequency whenever the
// frequency changes.
void samplingProfilerUpdateFrequency(std::optional<std::any> freq) {
double frequency = 0;
if (freq.has_value()) {
frequency = std::any_cast<double>(freq.value());
}
TraceEvent(SevInfo, "SamplingProfilerUpdateFrequency").detail("Frequency", frequency);
ActorLineageProfiler::instance().setFrequency(frequency);
}

void ProfilerConfigT::reset(std::map<std::string, std::string> const& config) {
bool expectNoMore = false, useFluentD = false, useTCP = false;
std::string endpoint;
Expand Down Expand Up @@ -370,6 +359,17 @@ std::map<std::string, std::string> ProfilerConfigT::getConfig() const {
return res;
}

// Callback used to update the sampling profilers run frequency whenever the
// frequency changes.
void samplingProfilerUpdateFrequency(std::optional<std::any> freq) {
double frequency = 0;
if (freq.has_value()) {
frequency = std::any_cast<double>(freq.value());
}
TraceEvent(SevInfo, "SamplingProfilerUpdateFrequency").detail("Frequency", frequency);
ActorLineageProfiler::instance().setFrequency(frequency);
}

// Callback used to update the sample collector window size.
void samplingProfilerUpdateWindow(std::optional<std::any> window) {
double duration = 0;
Expand Down
75 changes: 32 additions & 43 deletions fdbclient/GlobalConfig.actor.cpp
Expand Up @@ -39,26 +39,12 @@ const KeyRef samplingWindow = LiteralStringRef("visibility/sampling/window");

GlobalConfig::GlobalConfig() : lastUpdate(0) {}

void GlobalConfig::create(DatabaseContext* cx, Reference<AsyncVar<ClientDBInfo>> dbInfo) {
if (g_network->global(INetwork::enGlobalConfig) == nullptr) {
auto config = new GlobalConfig{};
config->cx = Database(cx);
config->dbInfo = dbInfo;
g_network->setGlobal(INetwork::enGlobalConfig, config);
config->_updater = updater(config);
}
}

GlobalConfig& GlobalConfig::globalConfig() {
void* res = g_network->global(INetwork::enGlobalConfig);
ASSERT(res);
return *reinterpret_cast<GlobalConfig*>(res);
}

void GlobalConfig::updateDBInfo(Reference<AsyncVar<ClientDBInfo>> dbInfo) {
// this->dbInfo = dbInfo;
}

Key GlobalConfig::prefixedKey(KeyRef key) {
return key.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG).begin);
}
Expand Down Expand Up @@ -120,7 +106,7 @@ void GlobalConfig::insert(KeyRef key, ValueRef value) {
callbacks[stableKey](data[stableKey]->value);
}
} catch (Error& e) {
TraceEvent("GlobalConfigTupleParseError").detail("What", e.what());
TraceEvent(SevWarn, "GlobalConfigTupleParseError").detail("What", e.what());
}
}

Expand Down Expand Up @@ -159,29 +145,32 @@ ACTOR Future<Void> GlobalConfig::migrate(GlobalConfig* self) {
state Optional<Value> sampleRate = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_sample_rate/"_sr)));
state Optional<Value> sizeLimit = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_size_limit/"_sr)));

loop {
try {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
// The value doesn't matter too much, as long as the key is set.
tr->set(migratedKey.contents(), "1"_sr);
if (sampleRate.present()) {
const double sampleRateDbl =
BinaryReader::fromStringRef<double>(sampleRate.get().contents(), Unversioned());
Tuple rate = Tuple().appendDouble(sampleRateDbl);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
}
if (sizeLimit.present()) {
const int64_t sizeLimitInt =
BinaryReader::fromStringRef<int64_t>(sizeLimit.get().contents(), Unversioned());
Tuple size = Tuple().append(sizeLimitInt);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
}

wait(tr->commit());
return Void();
} catch (Error& e) {
throw;
try {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
// The value doesn't matter too much, as long as the key is set.
tr->set(migratedKey.contents(), "1"_sr);
if (sampleRate.present()) {
const double sampleRateDbl =
BinaryReader::fromStringRef<double>(sampleRate.get().contents(), Unversioned());
Tuple rate = Tuple().appendDouble(sampleRateDbl);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
}
if (sizeLimit.present()) {
const int64_t sizeLimitInt =
BinaryReader::fromStringRef<int64_t>(sizeLimit.get().contents(), Unversioned());
Tuple size = Tuple().append(sizeLimitInt);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
}

wait(tr->commit());
return Void();
} catch (Error& e) {
// If multiple fdbserver processes are started at once, they will all
// attempt this migration at the same time, sometimes resulting in
// aborts due to conflicts. Purposefully avoid retrying, making this
// migration best-effort.
TraceEvent(SevInfo, "GlobalConfigMigrationError").detail("What", e.what());
throw;
}
}

Expand All @@ -201,18 +190,18 @@ ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {

// Applies updates to the local copy of the global configuration when this
// process receives an updated history.
ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self) {
// wait(self->cx->onConnected());
ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, const ClientDBInfo* dbInfo) {
wait(self->cx->onConnected());
wait(self->migrate(self));

wait(self->refresh(self));
self->initialized.send(Void());

loop {
try {
wait(self->dbInfo->onChange());
wait(self->dbInfoChanged.onTrigger());

auto& history = self->dbInfo->get().history;
auto& history = dbInfo->history;
if (history.size() == 0) {
continue;
}
Expand All @@ -222,8 +211,8 @@ ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self) {
// history updates or the protocol version changed, so it
// must re-read the entire configuration range.
wait(self->refresh(self));
if (self->dbInfo->get().history.size() > 0) {
self->lastUpdate = self->dbInfo->get().history.back().version;
if (dbInfo->history.size() > 0) {
self->lastUpdate = dbInfo->history.back().version;
}
} else {
// Apply history in order, from lowest version to highest
Expand Down
34 changes: 21 additions & 13 deletions fdbclient/GlobalConfig.actor.h
Expand Up @@ -67,23 +67,31 @@ struct ConfigValue : ReferenceCounted<ConfigValue> {

class GlobalConfig : NonCopyable {
public:
// Creates a GlobalConfig singleton, accessed by calling GlobalConfig().
// This function should only be called once by each process (however, it is
// idempotent and calling it multiple times will have no effect).
static void create(DatabaseContext* cx, Reference<AsyncVar<ClientDBInfo>> dbInfo);
// Creates a GlobalConfig singleton, accessed by calling globalConfig().
// This function requires a database context object to allow global
// configuration to run transactions on the database, and an AsyncVar
// object to watch for changes on. The ClientDBInfo pointer should point to
// a ClientDBInfo object which will contain the updated global
// configuration history when the given AsyncVar changes. This function
// should only be called once (however, it is idempotent and calling it
// multiple times will have no effect).
template <class T>
static void create(DatabaseContext* cx, Reference<AsyncVar<T>> db, const ClientDBInfo* dbInfo) {
if (g_network->global(INetwork::enGlobalConfig) == nullptr) {
auto config = new GlobalConfig{};
config->cx = Database(cx);
g_network->setGlobal(INetwork::enGlobalConfig, config);
config->_updater = updater(config, dbInfo);
// Bind changes in `db` to the `dbInfoChanged` AsyncTrigger.
forward(db, std::addressof(config->dbInfoChanged));
}
}

// Returns a reference to the global GlobalConfig object. Clients should
// call this function whenever they need to read a value out of the global
// configuration.
static GlobalConfig& globalConfig();

// Updates the ClientDBInfo object used by global configuration to read new
// data. For server processes, this value needs to be set by the cluster
// controller, but global config is initialized before the cluster
// controller is, so this function provides a mechanism to update the
// object after initialization.
void updateDBInfo(Reference<AsyncVar<ClientDBInfo>> dbInfo);

// Use this function to turn a global configuration key defined above into
// the full path needed to set the value in the database.
//
Expand Down Expand Up @@ -156,10 +164,10 @@ class GlobalConfig : NonCopyable {

ACTOR static Future<Void> migrate(GlobalConfig* self);
ACTOR static Future<Void> refresh(GlobalConfig* self);
ACTOR static Future<Void> updater(GlobalConfig* self);
ACTOR static Future<Void> updater(GlobalConfig* self, const ClientDBInfo* dbInfo);

Database cx;
Reference<AsyncVar<ClientDBInfo>> dbInfo;
AsyncTrigger dbInfoChanged;
Future<Void> _updater;
Promise<Void> initialized;
AsyncTrigger configChanged;
Expand Down
7 changes: 3 additions & 4 deletions fdbclient/NativeAPI.actor.cpp
Expand Up @@ -962,10 +962,6 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
getValueSubmitted.init(LiteralStringRef("NativeAPI.GetValueSubmitted"));
getValueCompleted.init(LiteralStringRef("NativeAPI.GetValueCompleted"));

GlobalConfig::create(this, clientInfo);
GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency);
GlobalConfig::globalConfig().trigger(samplingWindow, samplingProfilerUpdateWindow);

monitorProxiesInfoChange = monitorProxiesChange(clientInfo, &proxiesChangeTrigger);
clientStatusUpdater.actor = clientStatusUpdateActor(this);
cacheListMonitor = monitorCacheList(this);
Expand Down Expand Up @@ -1568,6 +1564,9 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
/*switchable*/ true);
}

GlobalConfig::create(db, clientInfo, std::addressof(clientInfo->get()));
GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency);
GlobalConfig::globalConfig().trigger(samplingWindow, samplingProfilerUpdateWindow);
return Database(db);
}

Expand Down
15 changes: 11 additions & 4 deletions fdbserver/ClusterController.actor.cpp
Expand Up @@ -135,9 +135,7 @@ class ClusterControllerData {
true,
TaskPriority::DefaultEndpoint,
true)) // SOMEDAY: Locality!
{
GlobalConfig::globalConfig().updateDBInfo(clientInfo);
}
{}

void setDistributor(const DataDistributorInterface& interf) {
auto newInfo = serverInfo->get();
Expand Down Expand Up @@ -3771,7 +3769,7 @@ ACTOR Future<Void> monitorGlobalConfig(ClusterControllerData::DBInfo* db) {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state Optional<Value> globalConfigVersion = wait(tr.get(globalConfigVersionKey));
state ClientDBInfo clientInfo = db->clientInfo->get();
state ClientDBInfo clientInfo = db->serverInfo->get().client;

if (globalConfigVersion.present()) {
// Since the history keys end with versionstamps, they
Expand Down Expand Up @@ -3829,6 +3827,15 @@ ACTOR Future<Void> monitorGlobalConfig(ClusterControllerData::DBInfo* db) {
}

clientInfo.id = deterministicRandom()->randomUniqueID();

// Update ServerDBInfo so fdbserver processes receive updated history.
ServerDBInfo serverInfo = db->serverInfo->get();
serverInfo.id = deterministicRandom()->randomUniqueID();
serverInfo.infoGeneration = ++db->dbInfoCount;
serverInfo.client = clientInfo;
db->serverInfo->set(serverInfo);

// Update ClientDBInfo so client processes receive updated history.
db->clientInfo->set(clientInfo);
}

Expand Down
18 changes: 10 additions & 8 deletions fdbserver/worker.actor.cpp
Expand Up @@ -147,12 +147,16 @@ Database openDBOnServer(Reference<AsyncVar<ServerDBInfo>> const& db,
bool enableLocalityLoadBalance,
bool lockAware) {
auto info = makeReference<AsyncVar<ClientDBInfo>>();
return DatabaseContext::create(info,
extractClientInfo(db, info),
enableLocalityLoadBalance ? db->get().myLocality : LocalityData(),
enableLocalityLoadBalance,
taskID,
lockAware);
auto cx = DatabaseContext::create(info,
extractClientInfo(db, info),
enableLocalityLoadBalance ? db->get().myLocality : LocalityData(),
enableLocalityLoadBalance,
taskID,
lockAware);
GlobalConfig::create(cx.getPtr(), db, std::addressof(db->get().client));
GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency);
GlobalConfig::globalConfig().trigger(samplingWindow, samplingProfilerUpdateWindow);
return cx;
}

struct ErrorInfo {
Expand Down Expand Up @@ -1049,8 +1053,6 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
metricsLogger = runMetrics(openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, lockAware),
KeyRef(metricsPrefix));
}

GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency);
}

errorForwarders.add(resetAfter(degraded,
Expand Down
10 changes: 10 additions & 0 deletions flow/genericactors.actor.h
Expand Up @@ -697,6 +697,16 @@ class AsyncTrigger : NonCopyable {
AsyncVar<Void> v;
};

// Binds an AsyncTrigger object to an AsyncVar, so when the AsyncVar changes
// the AsyncTrigger is triggered.
ACTOR template <class T>
void forward(Reference<AsyncVar<T>> from, AsyncTrigger* to) {
loop {
wait(from->onChange());
to->trigger();
}
}

class Debouncer : NonCopyable {
public:
explicit Debouncer(double delay) { worker = debounceWorker(this, delay); }
Expand Down

0 comments on commit ccfc48d

Please sign in to comment.