Skip to content

Commit

Permalink
Add log to debug missing client metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
hfu94 committed Mar 28, 2024
1 parent f9a42ee commit b52c389
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 5 deletions.
2 changes: 2 additions & 0 deletions fdbclient/ClientKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ void ClientKnobs::initialize(Randomize randomize) {
init( RESTORE_RANGES_READ_BATCH, 10000 );
init( BLOB_GRANULE_RESTORE_CHECK_INTERVAL, 10 );
init( BACKUP_CONTAINER_LOCAL_ALLOW_RELATIVE_PATH, false );
// FIXME Remove this when client latency issue is resolved
init(WRITE_CLIENT_LATENCY_TRACEEVENT, false);

// Configuration
init( DEFAULT_AUTO_COMMIT_PROXIES, 3 );
Expand Down
44 changes: 40 additions & 4 deletions fdbclient/MultiVersionTransaction.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,10 @@ void DLApi::selectApiVersion(int apiVersion) {
// External clients must support at least this version
// Versions newer than what we understand are rejected in the C bindings
headerVersion = std::max(apiVersion, 400);

if (CLIENT_KNOBS->WRITE_CLIENT_LATENCY_TRACEEVENT) {
std::cout << "DLApiSelectAPIVersion " << apiVersion << std::endl;
TraceEvent("DLApiSelectAPIVersion").detail("Version", apiVersion).log();
}
init();
throwIfError(api->selectApiVersion(apiVersion, headerVersion));
throwIfError(api->setNetworkOption(static_cast<FDBNetworkOption>(FDBNetworkOptions::EXTERNAL_CLIENT), nullptr, 0));
Expand Down Expand Up @@ -2808,7 +2811,10 @@ void MultiVersionApi::disableMultiVersionClientApi() {
if (networkStartSetup || localClientDisabled || disableBypass) {
throw invalid_option();
}

if (CLIENT_KNOBS->WRITE_CLIENT_LATENCY_TRACEEVENT) {
std::cout << "DisableMultiVersionClientApi" << std::endl;
TraceEvent("DisableMultiVersionClientApi").log();
}
bypassMultiClientApi = true;
}

Expand Down Expand Up @@ -2838,7 +2844,9 @@ void MultiVersionApi::addExternalLibrary(std::string path, bool useFutureVersion
threadCount = std::max(threadCount, 1);

if (externalClientDescriptions.count(filename) == 0) {
TraceEvent("AddingExternalClient").detail("LibraryPath", filename).detail("UseFutureVersion", useFutureVersion);
TraceEvent("AddingExternalClientLibrary")
.detail("LibraryPath", filename)
.detail("UseFutureVersion", useFutureVersion);
externalClientDescriptions.emplace(std::make_pair(filename, ClientDesc(path, true, useFutureVersion)));
}
}
Expand All @@ -2859,7 +2867,7 @@ void MultiVersionApi::addExternalLibraryDirectory(std::string path) {
for (auto filename : files) {
std::string lib = abspath(joinPath(path, filename));
if (externalClientDescriptions.count(filename) == 0) {
TraceEvent("AddingExternalClient").detail("LibraryPath", filename);
TraceEvent("AddingExternalClientLibraryThroughDirectory").detail("LibraryPath", filename);
externalClientDescriptions.emplace(std::make_pair(filename, ClientDesc(lib, true, false)));
}
}
Expand Down Expand Up @@ -2997,9 +3005,17 @@ void MultiVersionApi::setNetworkOptionInternal(FDBNetworkOptions::Option option,
validateOption(value, false, true);
setCallbacksOnExternalThreads();
} else if (option == FDBNetworkOptions::EXTERNAL_CLIENT_LIBRARY) {
if (CLIENT_KNOBS->WRITE_CLIENT_LATENCY_TRACEEVENT) {
std::cout << "Using EXTERNAL_CLIENT_LIBRARY" << std::endl;
TraceEvent("UsingEXTERNAL_CLIENT_LIBRARY").log();
}
validateOption(value, true, false, false);
addExternalLibrary(abspath(value.get().toString()), false);
} else if (option == FDBNetworkOptions::EXTERNAL_CLIENT_DIRECTORY) {
if (CLIENT_KNOBS->WRITE_CLIENT_LATENCY_TRACEEVENT) {
std::cout << "Using EXTERNAL_CLIENT_DIRECTORY" << std::endl;
TraceEvent("UsingEXTERNAL_CLIENT_DIRECTORY").log();
}
validateOption(value, true, false, false);
addExternalLibraryDirectory(value.get().toString());
} else if (option == FDBNetworkOptions::DISABLE_LOCAL_CLIENT) {
Expand All @@ -3009,6 +3025,10 @@ void MultiVersionApi::setNetworkOptionInternal(FDBNetworkOptions::Option option,
ASSERT(value.present());
setSupportedClientVersions(value.get());
} else if (option == FDBNetworkOptions::EXTERNAL_CLIENT) {
if (CLIENT_KNOBS->WRITE_CLIENT_LATENCY_TRACEEVENT) {
std::cout << "Using EXTERNAL_CLIENT" << std::endl;
TraceEvent("UsingEXTERNAL_CLIENT").log();
}
MutexHolder holder(lock);
ASSERT(!value.present() && !networkStartSetup);
externalClient = true;
Expand Down Expand Up @@ -3106,6 +3126,10 @@ void MultiVersionApi::setupNetwork() {
}

if (externalClientDescriptions.empty() && !disableBypass) {
if (CLIENT_KNOBS->WRITE_CLIENT_LATENCY_TRACEEVENT) {
std::cout << "ExternalClientDescriptionsEmpty" << std::endl;
TraceEvent("ExternalClientDescriptionsEmpty").log();
}
bypassMultiClientApi = true; // SOMEDAY: we won't be able to set this option once it becomes possible to
// add clients after setupNetwork is called
}
Expand Down Expand Up @@ -3294,6 +3318,14 @@ void MultiVersionApi::addNetworkThreadCompletionHook(void (*hook)(void*), void*

// Creates an IDatabase object that represents a connection to the cluster
Reference<IDatabase> MultiVersionApi::createDatabase(ClusterConnectionRecord const& connectionRecord) {
if (CLIENT_KNOBS->WRITE_CLIENT_LATENCY_TRACEEVENT) {
std::cout << "MultiVersionApiCreateDB disabled=" << localClientDisabled
<< " bypassMultiClientApi=" << bypassMultiClientApi << std::endl;
TraceEvent("MultiVersionApiCreateDB")
.detail("LocalClientDisabled", localClientDisabled ? "True" : "False")
.detail("BypassMultiClientApi", bypassMultiClientApi ? "True" : "False")
.log();
}
lock.enter();
if (!networkSetup) {
lock.leave();
Expand All @@ -3319,6 +3351,10 @@ Reference<IDatabase> MultiVersionApi::createDatabase(ClusterConnectionRecord con
if (bypassMultiClientApi) {
return localDb;
} else {
if (CLIENT_KNOBS->WRITE_CLIENT_LATENCY_TRACEEVENT) {
std::cout << "ConfirmingMultiVersionTxn" << std::endl;
TraceEvent("ConfirmingMultiVersionTxn").log();
}
return Reference<IDatabase>(
new MultiVersionDatabase(this, 0, connectionRecord, Reference<IDatabase>(), localDb));
}
Expand Down
54 changes: 53 additions & 1 deletion fdbclient/NativeAPI.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -840,9 +840,17 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext* cx) {
state std::vector<TrInfoChunk> commitQ;
state int txBytes = 0;

if (CLIENT_KNOBS->WRITE_CLIENT_LATENCY_TRACEEVENT) {
std::cout << "ClientStatusUpdateActor BeforeLoop" << std::endl;
TraceEvent("ClientStatusUpdateActorBeforeLoop").log();
}
loop {
// Need to make sure that we eventually destroy tr. We can't rely on getting cancelled to do this because of
// the cyclic reference to self.
if (CLIENT_KNOBS->WRITE_CLIENT_LATENCY_TRACEEVENT) {
std::cout << "ClientStatusUpdateActor refreshTransaction" << std::endl;
TraceEvent("ClientStatusUpdateActorRefreshTransaction").log();
}
wait(refreshTransaction(cx, &tr));
try {
ASSERT(cx->clientStatusUpdater.outStatusQ.empty());
Expand Down Expand Up @@ -881,6 +889,7 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext* cx) {
BUGGIFY ? deterministicRandom()->randomInt(200e3, 1.5 * CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT)
: 0.8 * CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT;
state std::vector<TrInfoChunk>::iterator tracking_iter = trChunksQ.begin();
state int32_t numItemsCommitted = 0;
ASSERT(commitQ.empty() && (txBytes == 0));
loop {
state std::vector<TrInfoChunk>::iterator iter = tracking_iter;
Expand All @@ -889,6 +898,10 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext* cx) {
try {
while (iter != trChunksQ.end()) {
if (iter->value.size() + iter->key.size() + txBytes > dataSizeLimit) {
if (CLIENT_KNOBS->WRITE_CLIENT_LATENCY_TRACEEVENT) {
std::cout << "Commit trloginfochunk" << std::endl;
TraceEvent("CommitChunk").log();
}
wait(transactionInfoCommitActor(&tr, &commitQ));
tracking_iter = iter;
commitQ.clear();
Expand All @@ -897,8 +910,13 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext* cx) {
commitQ.push_back(*iter);
txBytes += iter->value.size() + iter->key.size();
++iter;
++numItemsCommitted;
}
if (!commitQ.empty()) {
if (CLIENT_KNOBS->WRITE_CLIENT_LATENCY_TRACEEVENT) {
std::cout << "Commit trloginfochunkNonEmpty" << std::endl;
TraceEvent("CommitChunkNonEmpty").log();
}
wait(transactionInfoCommitActor(&tr, &commitQ));
commitQ.clear();
txBytes = 0;
Expand All @@ -916,6 +934,10 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext* cx) {
}
}
}
if (CLIENT_KNOBS->WRITE_CLIENT_LATENCY_TRACEEVENT) {
std::cout << "ClientStatusUpdateActor FinishCommit " << numItemsCommitted << std::endl;
TraceEvent("ClientStatusUpdateActorFinishCommit").log();
}
cx->clientStatusUpdater.outStatusQ.clear();
wait(cx->globalConfig->onInitialized());
double sampleRate =
Expand All @@ -928,6 +950,10 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext* cx) {
wait(delExcessClntTxnEntriesActor(&tr, clientTxnInfoSizeLimit));

wait(delay(CLIENT_KNOBS->CSI_STATUS_DELAY));
if (CLIENT_KNOBS->WRITE_CLIENT_LATENCY_TRACEEVENT) {
std::cout << "ClientStatusUpdateActor finishUpdateRound" << std::endl;
TraceEvent("ClientStatusUpdateActorFinishUpdateRound").log();
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
Expand Down Expand Up @@ -2676,8 +2702,14 @@ void setupNetwork(uint64_t transportId, UseMetrics useMetrics) {
if (g_network)
throw network_already_setup();

if (!networkOptions.logClientInfo.present())
if (!networkOptions.logClientInfo.present()) {
if (CLIENT_KNOBS->WRITE_CLIENT_LATENCY_TRACEEVENT) {
std::cout << "EnableLogClient" << std::endl;
TraceEvent("EnableLogClient").log();
}
// here it enables log client anyway for all txn, if disabled, no longer see CP has it
networkOptions.logClientInfo = true;
}

setupGlobalKnobs();
g_network = newNet2(tlsConfig, false, useMetrics || networkOptions.traceDirectory.present());
Expand Down Expand Up @@ -5482,6 +5514,10 @@ void Transaction::operator=(Transaction&& r) noexcept {

void Transaction::flushTrLogsIfEnabled() {
if (trState && trState->trLogInfo && trState->trLogInfo->logsAdded && trState->trLogInfo->trLogWriter.getData()) {
if (CLIENT_KNOBS->WRITE_CLIENT_LATENCY_TRACEEVENT) {
std::cout << "FlushTrLogsIfEnabled, flushed=" << trState->trLogInfo->flushed << std::endl;
TraceEvent("Test0226FlushTrLogsIfEnabled").detail("Flushed", trState->trLogInfo->flushed).log();
}
ASSERT(trState->trLogInfo->flushed == false);
trState->cx->clientStatusUpdater.inStatusQ.push_back(
{ trState->trLogInfo->identifier, std::move(trState->trLogInfo->trLogWriter) });
Expand Down Expand Up @@ -8966,6 +9002,22 @@ Reference<TransactionLogInfo> Transaction::createTrLogInfoProbabilistically(cons
if (!cx->isError()) {
double clientSamplingProbability =
cx->globalConfig->get<double>(fdbClientInfoTxnSampleRate, CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY);
if (CLIENT_KNOBS->WRITE_CLIENT_LATENCY_TRACEEVENT) {
std::cout << "Using clientSamplingProbability=" << clientSamplingProbability << std::endl
<< "networkOptions.logClientInfo.present()=" << networkOptions.logClientInfo.present()
<< std::endl
<< "BUGGIFY=" << BUGGIFY << std::endl;
TraceEvent("ClientSamplingProbabilityLogger")
.detail("ClientSamplingProbability", clientSamplingProbability)
.detail("NetworkPresent", networkOptions.logClientInfo.present())
.log();
if (networkOptions.logClientInfo.present()) {
TraceEvent("ClientSamplingProbabilityNetworkInfo")
.detail("NetworkInfo", networkOptions.logClientInfo.get())
.log();
std::cout << "networkOptions.logClientInfo.get()=" << networkOptions.logClientInfo.get() << std::endl;
}
}
if (((networkOptions.logClientInfo.present() && networkOptions.logClientInfo.get()) || BUGGIFY) &&
deterministicRandom()->random01() < clientSamplingProbability &&
(!g_network->isSimulated() || !g_simulator->speedUpSimulation)) {
Expand Down
1 change: 1 addition & 0 deletions fdbclient/include/fdbclient/ClientKnobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ class ClientKnobs : public KnobsImpl<ClientKnobs> {
int RESTORE_RANGES_READ_BATCH;
int BLOB_GRANULE_RESTORE_CHECK_INTERVAL;
bool BACKUP_CONTAINER_LOCAL_ALLOW_RELATIVE_PATH;
int WRITE_CLIENT_LATENCY_TRACEEVENT;

// Configuration
int32_t DEFAULT_AUTO_COMMIT_PROXIES;
Expand Down

0 comments on commit b52c389

Please sign in to comment.