Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple CAS Stress Test #8582

Open
wants to merge 1 commit into
base: next
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion llvm/include/llvm/CAS/ObjectStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ class ObjectStore {
virtual Expected<bool> isMaterialized(ObjectRef Ref) const = 0;

/// Validate the underlying object referred by CASID.
virtual Error validate(const CASID &ID) = 0;
virtual Error validateObject(const CASID &ID) = 0;

/// Validate the entire CAS and maybe attempting to recover from bad state.
virtual Error validate(bool Shallow = true) = 0;

protected:
/// Load the object referenced by \p Ref.
Expand Down
3 changes: 3 additions & 0 deletions llvm/include/llvm/CAS/OnDiskGraphDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ class OnDiskGraphDB {
return make_range(Refs.begin(), Refs.end());
}

/// Validate the CAS
Error validate(bool Shallow);

/// \returns Total size of stored objects.
///
/// NOTE: There's a possibility that the returned size is not including a
Expand Down
2 changes: 1 addition & 1 deletion llvm/lib/CAS/BuiltinCAS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ Expected<ObjectRef> BuiltinCAS::store(ArrayRef<ObjectRef> Refs,
Refs, Data);
}

Error BuiltinCAS::validate(const CASID &ID) {
Error BuiltinCAS::validateObject(const CASID &ID) {
auto Ref = getReference(ID);
if (!Ref)
return createUnknownObjectError(ID);
Expand Down
2 changes: 1 addition & 1 deletion llvm/lib/CAS/BuiltinCAS.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class BuiltinCAS : public ObjectStore {
"corrupt storage");
}

Error validate(const CASID &ID) final;
Error validateObject(const CASID &ID) final;
};

/// Create a \p UnifiedOnDiskCache instance that uses \p BLAKE3 hashing.
Expand Down
6 changes: 6 additions & 0 deletions llvm/lib/CAS/InMemoryCAS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ class InMemoryCAS : public BuiltinCAS {
storeFromNullTerminatedRegion(ArrayRef<uint8_t> ComputedHash,
sys::fs::mapped_file_region Map) override;

Error validate(bool Shallow) final;

CASID getID(const InMemoryIndexValueT &I) const {
StringRef Hash = toStringRef(I.Hash);
return CASID::create(&getContext(), Hash);
Expand Down Expand Up @@ -296,6 +298,10 @@ InMemoryCAS::storeFromNullTerminatedRegion(ArrayRef<uint8_t> ComputedHash,
return toReference(Node);
}

Error InMemoryCAS::validate(bool Shallow) {
return Error::success();
}

Expected<ObjectRef> InMemoryCAS::storeImpl(ArrayRef<uint8_t> ComputedHash,
ArrayRef<ObjectRef> Refs,
ArrayRef<char> Data) {
Expand Down
2 changes: 1 addition & 1 deletion llvm/lib/CAS/ObjectStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ Error ObjectStore::validateTree(ObjectRef Root) {
auto [I, Inserted] = ValidatedRefs.insert(Ref);
if (!Inserted)
continue; // already validated.
if (Error E = validate(getID(Ref)))
if (Error E = validateObject(getID(Ref)))
return E;
Expected<ObjectHandle> Obj = load(Ref);
if (!Obj)
Expand Down
6 changes: 6 additions & 0 deletions llvm/lib/CAS/OnDiskCAS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class OnDiskCAS : public BuiltinCAS {

void print(raw_ostream &OS) const final;

Error validate(bool Shallow) final;

static Expected<std::unique_ptr<OnDiskCAS>> open(StringRef Path);

OnDiskCAS(std::shared_ptr<ondisk::UnifiedOnDiskCache> UniDB_)
Expand Down Expand Up @@ -84,6 +86,10 @@ class OnDiskCAS : public BuiltinCAS {

void OnDiskCAS::print(raw_ostream &OS) const { DB->print(OS); }

Error OnDiskCAS::validate(bool Shallow) {
return DB->validate(Shallow);
}

CASID OnDiskCAS::getID(ObjectRef Ref) const {
ArrayRef<uint8_t> Hash = DB->getDigest(convertRef(Ref));
return CASID::create(&getContext(), toStringRef(Hash));
Expand Down
26 changes: 26 additions & 0 deletions llvm/lib/CAS/OnDiskGraphDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@
#include "llvm/ADT/StringExtras.h"
#include "llvm/Support/Alignment.h"
#include "llvm/Support/Errc.h"
#include "llvm/Support/Error.h"
#include "llvm/Support/MemoryBuffer.h"
#include "llvm/Support/Path.h"
#include "llvm/Support/Process.h"
#include "llvm/Support/raw_ostream.h"

#define DEBUG_TYPE "on-disk-cas"

Expand Down Expand Up @@ -997,6 +999,30 @@ ArrayRef<char> OnDiskGraphDB::getObjectData(ObjectHandle Node) const {
return Content.Record->getData();
}

Error OnDiskGraphDB::validate(bool Shallow) {
llvm::raw_null_ostream Null;
Error Err = Error::success();
Index.print(Null, [&](ArrayRef<char> Data) {
assert(Data.size() == sizeof(TrieRecord));
assert(isAligned(Align::Of<TrieRecord>(), Data.size()));
auto *R = reinterpret_cast<const TrieRecord *>(Data.data());
TrieRecord::Data D = R->load();
if (D.SK == TrieRecord::StorageKind::Unknown) {
std::string ErrMsg;
llvm::raw_string_ostream SS(ErrMsg);
SS << "unknown record @ " << R << " offset: " << (void*) D.Offset.get();
if (Err)
Err = llvm::joinErrors(
std::move(Err),
createStringError(inconvertibleErrorCode(), SS.str()));
else
Err = createStringError(inconvertibleErrorCode(), SS.str());
}
});

return Err;
}

InternalRefArrayRef OnDiskGraphDB::getInternalRefs(ObjectHandle Node) const {
if (std::optional<DataRecordHandle> Record =
getContentFromHandle(Node).Record)
Expand Down
7 changes: 6 additions & 1 deletion llvm/lib/CAS/PluginCAS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,12 @@ class PluginObjectStore
size_t getNumRefs(ObjectHandle Node) const final;
ArrayRef<char> getData(ObjectHandle Node,
bool RequiresNullTerminator = false) const final;
Error validate(const CASID &ID) final {
Error validateObject(const CASID &ID) final {
// Not supported yet. Always return success.
return Error::success();
}

Error validate(bool Shallow) final {
// Not supported yet. Always return success.
return Error::success();
}
Expand Down
80 changes: 78 additions & 2 deletions llvm/tools/llvm-cas/llvm-cas.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
#include "llvm/Support/MemoryBuffer.h"
#include "llvm/Support/Path.h"
#include "llvm/Support/PrefixMapper.h"
#include "llvm/Support/RandomNumberGenerator.h"
#include "llvm/Support/StringSaver.h"
#include "llvm/Support/Threading.h"
#include "llvm/Support/ThreadPool.h"
#include "llvm/Support/raw_ostream.h"
#include <memory>
#include <system_error>
Expand Down Expand Up @@ -63,7 +66,9 @@ static int putCacheKey(ObjectStore &CAS, ActionCache &AC,
ArrayRef<std::string> Objects);
static int getCacheResult(ObjectStore &CAS, ActionCache &AC, const CASID &ID);
static int validateObject(ObjectStore &CAS, const CASID &ID);
static int validateCAS(ObjectStore &CAS);
static int ingestCasIDFile(cas::ObjectStore &CAS, ArrayRef<std::string> CASIDs);
static int stressTest(StringRef CASPath);

int main(int Argc, char **Argv) {
InitLLVM X(Argc, Argv);
Expand Down Expand Up @@ -102,6 +107,8 @@ int main(int Argc, char **Argv) {
PutCacheKey,
GetCacheResult,
Validate,
ValidateCAS,
StressTest,
};
cl::opt<CommandKind> Command(
cl::desc("choose command action:"),
Expand All @@ -126,7 +133,9 @@ int main(int Argc, char **Argv) {
"set a value for a cache key"),
clEnumValN(GetCacheResult, "get-cache-result",
"get the result value from a cache key"),
clEnumValN(Validate, "validate", "validate the object for CASID")),
clEnumValN(Validate, "validate", "validate the object for CASID"),
clEnumValN(ValidateCAS, "validate-cas", "validate the CAS"),
clEnumValN(StressTest, "stress-test", "stress test the CAS")),
cl::init(CommandKind::Invalid));

cl::ParseCommandLineOptions(Argc, Argv, "llvm-cas CAS tool\n");
Expand All @@ -141,6 +150,9 @@ int main(int Argc, char **Argv) {
ExitOnErr(
createStringError(inconvertibleErrorCode(), "missing --cas=<path>"));

if (Command == StressTest)
return stressTest(CASPath);

std::shared_ptr<ObjectStore> CAS;
std::shared_ptr<ActionCache> AC;
std::optional<StringRef> CASFilePath;
Expand Down Expand Up @@ -169,6 +181,9 @@ int main(int Argc, char **Argv) {
if (Command == Dump)
return dump(*CAS);

if (Command == ValidateCAS)
return validateCAS(*CAS);

if (Command == MakeBlob)
return makeBlob(*CAS, DataPath);

Expand Down Expand Up @@ -321,6 +336,12 @@ int dump(ObjectStore &CAS) {
return 0;
}

int validateCAS(ObjectStore &CAS) {
ExitOnError ExitOnErr("llvm-cas: validate-cas: ");
ExitOnErr(CAS.validate());
return 0;
}

int makeBlob(ObjectStore &CAS, StringRef DataPath) {
ExitOnError ExitOnErr("llvm-cas: make-blob: ");
std::unique_ptr<MemoryBuffer> Buffer = ExitOnErr(openBuffer(DataPath));
Expand Down Expand Up @@ -661,7 +682,62 @@ static int getCacheResult(ObjectStore &CAS, ActionCache &AC, const CASID &ID) {

int validateObject(ObjectStore &CAS, const CASID &ID) {
ExitOnError ExitOnErr("llvm-cas: validate: ");
ExitOnErr(CAS.validate(ID));
ExitOnErr(CAS.validateObject(ID));
outs() << ID << ": validated successfully\n";
return 0;
}

int stressTest(StringRef CASPath) {
ExitOnError ExitOnErr("llvm-cas: stress test: ");
// Totally size to be inserted.
const uint64_t TotalSize = 4 * 1024LL * 1024LL * 1024LL;
// Biggest size for each item.
const size_t MaxItem = 4 * 1024LL;
// Timeout for the test in second.
unsigned Timeout = 15 * 60;
unsigned Iter = 0;
std::chrono::steady_clock::time_point Start =
std::chrono::steady_clock::now();

// Run multi-thread random insert till timeout is reached. Maybe should even
// spawn sub-processes and add more possibilities for insert collision?
while (true) {
if (std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - Start)
.count() > Timeout)
break;

llvm::outs() << "Start iteration: " << Iter << "\n";
auto EC = llvm::sys::fs::remove_directories(CASPath);
if (EC)
return 1;

StdThreadPool Pool;
const uint64_t SizePerThread = TotalSize / Pool.getMaxConcurrency();
auto CAS = ExitOnErr(createOnDiskUnifiedCASDatabases(CASPath));
for (unsigned I = 0; I < Pool.getMaxConcurrency(); ++ I) {
Pool.async([&]() {
uint64_t Allocated = 0;
std::vector<char> Buf;
Buf.resize(MaxItem);
while (Allocated < SizePerThread) {
unsigned Size = random() % MaxItem;
getRandomBytes(Buf.data(), Size);
StringRef Data(Buf.data(), Size);

// Maybe add some random children too.
ExitOnErr(CAS.first->storeFromString({}, Data));
Allocated += Size;
}
});
}

Pool.wait();
llvm::outs() << "Verify CAS (size: "
<< *ExitOnErr(CAS.first->getStorageSize()) << ")\n";
ExitOnErr(CAS.first->validate());
++Iter;
}

return 0;
}
12 changes: 6 additions & 6 deletions llvm/unittests/CAS/ObjectStoreTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ multiline text multiline text multiline text multiline text multiline text)",

// Run validation on all CASIDs.
for (int I = 0, E = IDs.size(); I != E; ++I)
ASSERT_THAT_ERROR(CAS1->validate(IDs[I]), Succeeded());
ASSERT_THAT_ERROR(CAS1->validateObject(IDs[I]), Succeeded());

// Check that the blobs can be retrieved multiple times.
for (int I = 0, E = IDs.size(); I != E; ++I) {
Expand Down Expand Up @@ -125,17 +125,17 @@ TEST_P(CASTest, BlobsBig) {
Succeeded());
ASSERT_THAT_ERROR(CAS->createProxy(std::nullopt, String1).moveInto(ID2),
Succeeded());
ASSERT_THAT_ERROR(CAS->validate(*ID1), Succeeded());
ASSERT_THAT_ERROR(CAS->validate(*ID2), Succeeded());
ASSERT_THAT_ERROR(CAS->validateObject(*ID1), Succeeded());
ASSERT_THAT_ERROR(CAS->validateObject(*ID2), Succeeded());
ASSERT_EQ(ID1, ID2);

String1.append(String2);
ASSERT_THAT_ERROR(CAS->createProxy(std::nullopt, String2).moveInto(ID1),
Succeeded());
ASSERT_THAT_ERROR(CAS->createProxy(std::nullopt, String2).moveInto(ID2),
Succeeded());
ASSERT_THAT_ERROR(CAS->validate(*ID1), Succeeded());
ASSERT_THAT_ERROR(CAS->validate(*ID2), Succeeded());
ASSERT_THAT_ERROR(CAS->validateObject(*ID1), Succeeded());
ASSERT_THAT_ERROR(CAS->validateObject(*ID2), Succeeded());
ASSERT_EQ(ID1, ID2);
String2.append(String1);
}
Expand Down Expand Up @@ -275,7 +275,7 @@ TEST_P(CASTest, NodesBig) {
}

for (auto ID : CreatedNodes)
ASSERT_THAT_ERROR(CAS->validate(CAS->getID(ID)), Succeeded());
ASSERT_THAT_ERROR(CAS->validateObject(CAS->getID(ID)), Succeeded());
}

/// Common test functionality for creating blobs in parallel. You can vary which
Expand Down
4 changes: 2 additions & 2 deletions llvm/unittests/CAS/TreeSchemaTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ TEST(TreeSchemaTest, Trees) {

// Run validation.
for (int I = 1, E = FlatIDs.size(); I != E; ++I)
ASSERT_THAT_ERROR(CAS1->validate(FlatIDs[I]), Succeeded());
ASSERT_THAT_ERROR(CAS1->validateObject(FlatIDs[I]), Succeeded());

// Confirm these trees don't exist in a fresh CAS instance. Skip the first
// tree, which is empty and could be implicitly in some CAS.
Expand Down Expand Up @@ -179,7 +179,7 @@ TEST(TreeSchemaTest, Trees) {
ASSERT_THAT_ERROR(Schema.create(NewEntries).moveInto(Tree),
Succeeded());
ASSERT_EQ(*ID, Tree->getID());
ASSERT_THAT_ERROR(CAS->validate(*ID), Succeeded());
ASSERT_THAT_ERROR(CAS->validateObject(*ID), Succeeded());
Tree.reset();
std::optional<ObjectRef> Ref = CAS->getReference(*ID);
ASSERT_TRUE(Ref);
Expand Down