Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding BlobFailureInjection workload (Cherry-Pick #9833 to snowflake/release-71.3) #9919

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions fdbclient/BackupContainerLocalDirectory.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "fdbclient/BackupContainerLocalDirectory.h"
#include "fdbrpc/AsyncFileReadAhead.actor.h"
#include "flow/IAsyncFile.h"
#include "flow/FaultInjection.h"
#include "flow/Platform.actor.h"
#include "flow/Platform.h"
#include "fdbrpc/simulator.h"
Expand Down Expand Up @@ -67,6 +68,8 @@ class BackupFile : public IBackupFile, ReferenceCounted<BackupFile> {
Future<Void> r = uncancellable(holdWhile(old, m_file->write(old.begin(), size, m_writeOffset)));
m_writeOffset += size;

INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::flush");

return r;
}

Expand All @@ -77,6 +80,9 @@ class BackupFile : public IBackupFile, ReferenceCounted<BackupFile> {
std::string name = f->m_file->getFilename();
f->m_file.clear();
wait(IAsyncFileSystem::filesystem()->renameFile(name, f->m_finalFullPath));

INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::finish");

return Void();
}

Expand Down Expand Up @@ -116,6 +122,8 @@ ACTOR static Future<BackupContainerFileSystem::FilesAndSizesT> listFiles_impl(st
results.push_back({ f.substr(m_path.size() + 1), ::fileSize(f) });
}

INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::listFiles");

return results;
}

Expand Down Expand Up @@ -217,6 +225,7 @@ Future<Reference<IAsyncFile>> BackupContainerLocalDirectory::readFile(const std:
if (usesEncryption()) {
flags |= IAsyncFile::OPEN_ENCRYPTED;
}
INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::readFile");
// Simulation does not properly handle opening the same file from multiple machines using a shared filesystem,
// so create a symbolic link to make each file opening appear to be unique. This could also work in production
// but only if the source directory is writeable which shouldn't be required for a restore.
Expand Down Expand Up @@ -268,6 +277,7 @@ Future<Reference<IAsyncFile>> BackupContainerLocalDirectory::readFile(const std:
}

Future<Reference<IBackupFile>> BackupContainerLocalDirectory::writeFile(const std::string& path) {
INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::writeFile");
int flags = IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_CREATE |
IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE;
if (usesEncryption()) {
Expand All @@ -286,6 +296,7 @@ Future<Void> BackupContainerLocalDirectory::writeEntireFile(const std::string& p

Future<Void> BackupContainerLocalDirectory::deleteFile(const std::string& path) {
::deleteFile(joinPath(m_path, path));
INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::deleteFile");
return Void();
}

Expand Down
5 changes: 3 additions & 2 deletions fdbrpc/include/fdbrpc/SimulatorProcessInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ struct ProcessInfo : NonCopyable {
INetworkConnections* network;

uint64_t fault_injection_r;
double fault_injection_p1, fault_injection_p2;
double fault_injection_p1, fault_injection_p2, blob_inject_failure_rate;
bool failedDisk;

UID uid;
Expand All @@ -80,7 +80,8 @@ struct ProcessInfo : NonCopyable {
: name(name), coordinationFolder(coordinationFolder), dataFolder(dataFolder), machine(nullptr),
addresses(addresses), address(addresses.address), locality(locality), startingClass(startingClass),
failed(false), excluded(false), cleared(false), rebooting(false), drProcess(false), network(net),
fault_injection_r(0), fault_injection_p1(0), fault_injection_p2(0), failedDisk(false) {
fault_injection_r(0), fault_injection_p1(0), fault_injection_p2(0), blob_inject_failure_rate(0),
failedDisk(false) {
uid = deterministicRandom()->randomUniqueID();
}

Expand Down
2 changes: 2 additions & 0 deletions fdbrpc/include/fdbrpc/simulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ class ISimulator : public INetwork {
KillType* ktFinal = nullptr) = 0;
virtual bool killAll(KillType kt, bool forceKill = false, KillType* ktFinal = nullptr) = 0;
// virtual KillType getMachineKillState( UID zoneID ) = 0;
virtual void processInjectBlobFault(ProcessInfo* machine, double failureRate) = 0;
virtual void processStopInjectBlobFault(ProcessInfo* machine) = 0;
virtual bool canKillProcesses(std::vector<ProcessInfo*> const& availableProcesses,
std::vector<ProcessInfo*> const& deadProcesses,
KillType kt,
Expand Down
36 changes: 36 additions & 0 deletions fdbrpc/sim2.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,29 @@ bool simulator_should_inject_fault(const char* context, const char* file, int li
return false;
}

bool simulator_should_inject_blob_fault(const char* context, const char* file, int line, int error_code) {
if (!g_network->isSimulated() || !faultInjectionActivated)
return false;

auto p = g_simulator->getCurrentProcess();

if (!g_simulator->speedUpSimulation && deterministicRandom()->random01() < p->blob_inject_failure_rate) {
CODE_PROBE(true, "A blob fault was injected", probe::assert::simOnly, probe::context::sim2);
CODE_PROBE(error_code == error_code_http_request_failed,
"A failed http request was injected",
probe::assert::simOnly,
probe::context::sim2);
TraceEvent("BlobFaultInjected")
.detail("Context", context)
.detail("File", file)
.detail("Line", line)
.detail("ErrorCode", error_code);
return true;
}

return false;
}

void ISimulator::disableFor(const std::string& desc, double time) {
disabledMap[desc] = time;
}
Expand Down Expand Up @@ -1494,6 +1517,7 @@ class Sim2 final : public ISimulator, public INetworkConnections {
// The following function will determine if a machine can be remove in case when it has a blob worker
bool canKillMachineWithBlobWorkers(Optional<Standalone<StringRef>> machineId, KillType kt, KillType* ktFinal) {
// Allow if no blob workers, or it's a reboot(without removing the machine)
// FIXME: this should be ||
if (!blobGranulesEnabled && kt >= KillType::RebootAndDelete) {
return true;
}
Expand Down Expand Up @@ -2339,6 +2363,18 @@ class Sim2 final : public ISimulator, public INetworkConnections {
g_clogging.unclogPair(from, to);
}

void processInjectBlobFault(ProcessInfo* machine, double failureRate) override {
CODE_PROBE(true, "Simulated process beginning blob fault", probe::context::sim2, probe::assert::simOnly);
should_inject_blob_fault = simulator_should_inject_blob_fault;
ASSERT(machine->blob_inject_failure_rate == 0.0);
machine->blob_inject_failure_rate = failureRate;
}

void processStopInjectBlobFault(ProcessInfo* machine) override {
CODE_PROBE(true, "Simulated process stopping blob fault", probe::context::sim2, probe::assert::simOnly);
machine->blob_inject_failure_rate = 0.0;
}

std::vector<ProcessInfo*> getAllProcesses() const override {
std::vector<ProcessInfo*> processes;
for (auto& c : machines) {
Expand Down
3 changes: 2 additions & 1 deletion fdbserver/BlobManager.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5230,7 +5230,8 @@ ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
} catch (Error& e) {
// These should not get an error that then causes a transaction retry loop. All error handling
// should be done in the purge calls
if (e.code() == error_code_operation_cancelled ||
// FIXME: retry purging if it gets blobstore errors instead of killing blob manager
if (e.code() == error_code_operation_cancelled || e.code() == error_code_http_request_failed ||
e.code() == error_code_blob_manager_replaced || e.code() == error_code_platform_error) {
throw e;
}
Expand Down
177 changes: 177 additions & 0 deletions fdbserver/workloads/BlobFailureInjection.actor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* BlobFailureInjection.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "fdbrpc/simulator.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/FaultInjection.h"
#include "flow/DeterministicRandom.h"
#include "fdbrpc/SimulatorProcessInfo.h"
#include "flow/actorcompiler.h" // This must be the last #include.

/*
* The BlobFailureInjection workload is designed to simulate blob storage becoming temporarily flaky or unavailable,
* from a single host to the whole cluster.
* TODO: add blob storage becoming permanently flaky or unavailable on a single host, to ensure the system moves work
* away accordingly. Could also handle that through attrition workload maybe?
* FIXME: make this work outside simulation. Talk to workers like DiskFailureInjection does and add S3BlobStore and
* AzureBlobStore fault injection points.
*/
struct BlobFailureInjectionWorkload : FailureInjectionWorkload {
static constexpr auto NAME = "BlobFailureInjection";

bool enabled;
double enableProbability = 0.5;
double testDuration = 10.0;

std::vector<ISimulator::ProcessInfo*> currentlyAffected;

BlobFailureInjectionWorkload(WorkloadContext const& wcx, NoOptions) : FailureInjectionWorkload(wcx) {
enabled = !clientId && g_network->isSimulated() && faultInjectionActivated;
}

BlobFailureInjectionWorkload(WorkloadContext const& wcx) : FailureInjectionWorkload(wcx) {
// only do this on the "first" client, and only when in simulation and only when fault injection is enabled
enabled = !clientId && g_network->isSimulated() && faultInjectionActivated;
enableProbability = getOption(options, "enableProbability"_sr, enableProbability);
testDuration = getOption(options, "testDuration"_sr, testDuration);
enabled = (enabled && deterministicRandom()->random01() < enableProbability);
}

Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override { return _start(cx, this); }

bool shouldInject(DeterministicRandom& random,
const WorkloadRequest& work,
const unsigned alreadyAdded) const override {
return alreadyAdded < 1 && work.useDatabase && 0.1 / (1 + alreadyAdded) > random.random01();
}

void undoFaultInjection() {
if (!currentlyAffected.empty()) {
TraceEvent("BlobFailureInjectionUnFailing").detail("Count", currentlyAffected.size());
}
for (auto& it : currentlyAffected) {
TraceEvent("BlobFailureInjectionUnFailingProcess").detail("Addr", it->address);
g_simulator->processStopInjectBlobFault(it);
}
currentlyAffected.clear();
}

ACTOR Future<Void> _start(Database cx, BlobFailureInjectionWorkload* self) {
if (!self->enabled) {
return Void();
}

CODE_PROBE(true, "Running workload with blob failure injection");
TraceEvent("BlobFailureInjectionBegin").log();

auto processes = getServers();
deterministicRandom()->randomShuffle(processes);

wait(timeout(reportErrors(self->worker(cx, self, processes), "BlobFailureInjectionWorkerError"),
self->testDuration,
Void()));

// Undo all fault injection before exiting, if worker didn't
self->undoFaultInjection();
TraceEvent("BlobFailureInjectionEnd").log();

return Void();
}

// TODO: share code with machine attrition
static std::vector<ISimulator::ProcessInfo*> getServers() {
std::vector<ISimulator::ProcessInfo*> machines;
std::vector<ISimulator::ProcessInfo*> all = g_simulator->getAllProcesses();
for (int i = 0; i < all.size(); i++)
if (!all[i]->failed && all[i]->name == std::string("Server") &&
all[i]->startingClass != ProcessClass::TesterClass)
machines.push_back(all[i]);
return machines;
}

ACTOR Future<Void> worker(Database cx,
BlobFailureInjectionWorkload* self,
std::vector<ISimulator::ProcessInfo*> processes) {
int minFailureDuration = 5;
int maxFailureDuration = std::max(10, (int)(self->testDuration / 2));

state double failureDuration =
deterministicRandom()->randomSkewedUInt32(minFailureDuration, maxFailureDuration);
// add a random amount between 0 and 1, otherwise it's a whole number
failureDuration += deterministicRandom()->random01();
state double delayBefore =
deterministicRandom()->random01() * (std::max<double>(0.0, self->testDuration - failureDuration));

wait(delay(delayBefore));

// TODO: pick one random worker, a subset of workers, or entire cluster randomly

int amountToFail = 1;
if (deterministicRandom()->coinflip()) {
if (deterministicRandom()->coinflip()) {
// fail all processes
amountToFail = processes.size();
} else if (processes.size() > 3) {
// fail a random amount of processes up to half
amountToFail = deterministicRandom()->randomInt(2, std::max<int>(3, processes.size() / 2));
}
} // fail 1 process 50% of the time
ASSERT(amountToFail <= processes.size());
ASSERT(amountToFail > 0);

double failureRate;
if (deterministicRandom()->coinflip()) {
// fail all requests - blob store is completely unreachable
failureRate = 1.0;
} else {
// fail a random percentage of requests, biasing towards low percentages.
// This is based on the intuition that failing 98% of requests is not very different than failing 99%, but
// failing 0.1% vs 1% is different
failureRate = deterministicRandom()->randomSkewedUInt32(1, 1000) / 1000.0;
}

CODE_PROBE(true, "blob failure injection killing processes");

TraceEvent("BlobFailureInjectionFailing")
.detail("Count", amountToFail)
.detail("Duration", failureDuration)
.detail("FailureRate", failureRate)
.log();
for (int i = 0; i < amountToFail; i++) {
TraceEvent("BlobFailureInjectionFailingProcess").detail("Addr", processes[i]->address);
self->currentlyAffected.push_back(processes[i]);
g_simulator->processInjectBlobFault(processes[i], failureRate);
}

wait(delay(failureDuration));

self->undoFaultInjection();

return Void();
}

Future<bool> check(Database const& cx) override { return true; }
void getMetrics(std::vector<PerfMetric>& m) override {}
};

WorkloadFactory<BlobFailureInjectionWorkload> BlobFailureInjectionWorkloadFactory;
// TODO enable once bugs fixed!
// FailureInjectorFactory<BlobFailureInjectionWorkload> BlobFailureInjectionFailureWorkloadFactory;
1 change: 1 addition & 0 deletions flow/FaultInjection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "flow/FaultInjection.h"

bool (*should_inject_fault)(const char* context, const char* file, int line, int error_code) = 0;
bool (*should_inject_blob_fault)(const char* context, const char* file, int line, int error_code) = 0;
bool faultInjectionActivated = true;

void enableFaultInjection(bool enabled) {
Expand Down
12 changes: 12 additions & 0 deletions flow/include/flow/FaultInjection.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,23 @@

#define SHOULD_INJECT_FAULT(context) (should_inject_fault && should_inject_fault(context, __FILE__, __LINE__, 0))

#define INJECT_BLOB_FAULT(error_type, context) \
do { \
if (should_inject_blob_fault && \
should_inject_blob_fault(context, __FILE__, __LINE__, error_code_##error_type)) \
throw error_type().asInjectedFault(); \
} while (0)

#define SHOULD_INJECT_BLOB_FAULT(context) \
(should_inject_blob_fault && should_inject_blob_fault(context, __FILE__, __LINE__, 0))

extern bool (*should_inject_fault)(const char* context, const char* file, int line, int error_code);
extern bool (*should_inject_blob_fault)(const char* context, const char* file, int line, int error_code);
extern bool faultInjectionActivated;
extern void enableFaultInjection(bool enabled); // Enable fault injection called from fdbserver actor main function
#else
#define INJECT_FAULT(error_type, context)
#define INJECT_BLOB_FAULT(error_type, context)
#endif

#endif
4 changes: 4 additions & 0 deletions tests/fast/BlobGranuleMoveVerifyCycle.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ testTitle = 'BlobGranuleMoveVerifyCycle'
machinesToLeave = 3
reboot = true
testDuration = 60.0

[[test.workload]]
testName = 'BlobFailureInjection'
testDuration = 60.0
4 changes: 4 additions & 0 deletions tests/fast/BlobGranuleVerifyAtomicOps.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,7 @@ testTitle = 'BlobGranuleVerifyAtomicOps'
machinesToLeave = 3
reboot = true
testDuration = 30.0

[[test.workload]]
testName = 'BlobFailureInjection'
testDuration = 30.0
4 changes: 4 additions & 0 deletions tests/fast/BlobGranuleVerifyCycle.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@ testTitle = 'BlobGranuleVerifyCycle'
machinesToLeave = 3
reboot = true
testDuration = 60.0

[[test.workload]]
testName = 'BlobFailureInjection'
testDuration = 60.0
4 changes: 4 additions & 0 deletions tests/fast/BlobGranuleVerifySmall.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ testTitle = 'BlobGranuleVerifySmall'
reboot = true
testDuration = 60.0

[[test.workload]]
testName = 'BlobFailureInjection'
testDuration = 60.0

Loading