Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binary request generator/replayer #307

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions cachelib/cachebench/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,25 @@ target_link_libraries(cachelib_cachebench PUBLIC
gflags
)

add_library (cachelib_binary_trace_gen
./runner/Runner.cpp
./runner/Stressor.cpp
./util/CacheConfig.cpp
./util/Config.cpp
./workload/BlockChunkCache.cpp
./workload/BlockChunkReplayGenerator.cpp
./workload/PieceWiseCache.cpp
./workload/OnlineGenerator.cpp
./workload/WorkloadGenerator.cpp
./workload/PieceWiseReplayGenerator.cpp
)
add_dependencies(cachelib_binary_trace_gen thrift_generated_files)
target_link_libraries(cachelib_binary_trace_gen PUBLIC
cachelib_datatype
cachelib_allocator
gflags
)

if ((CMAKE_SYSTEM_NAME STREQUAL Linux) AND
(CMAKE_SYSTEM_PROCESSOR STREQUAL x86_64))
else()
Expand All @@ -49,11 +68,14 @@ endif()


add_executable (cachebench main.cpp)
add_executable (binary_trace_gen binary_trace_gen.cpp)
target_link_libraries(cachebench cachelib_cachebench)
target_link_libraries(binary_trace_gen cachelib_binary_trace_gen)

install(
TARGETS
cachebench
binary_trace_gen
DESTINATION ${BIN_INSTALL_DIR}
)

Expand Down
72 changes: 72 additions & 0 deletions cachelib/cachebench/binary_trace_gen.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <folly/io/async/EventBase.h>
#include <folly/logging/LoggerDB.h>
#include <gflags/gflags.h>

#include <memory>
#include <thread>

#include "cachelib/cachebench/workload/KVReplayGenerator.h"
#include "cachelib/common/Utils.h"

#include <folly/init/Init.h>
#include <gflags/gflags.h>

DEFINE_string(json_test_config,
"",
"path to test config. If empty, use default setting");
DEFINE_uint64(
progress,
60,
"if set, prints progress every X seconds as configured, 0 to disable");


bool checkArgsValidity() {
if (FLAGS_json_test_config.empty() ||
!facebook::cachelib::util::pathExists(FLAGS_json_test_config)) {
std::cout << "Invalid config file: " << FLAGS_json_test_config
<< ". pass a valid --json_test_config for trace generation."
<< std::endl;
return false;
}

return true;
}

int main(int argc, char** argv) {
using namespace facebook::cachelib;
using namespace facebook::cachelib::cachebench;

folly::init(&argc, &argv, true);
if (!checkArgsValidity()) {
return 1;
}

CacheBenchConfig config(FLAGS_json_test_config);
std::cout << "Binary Trace Generator" << std::endl;

try {
auto generator =
std::make_unique<KVReplayGenerator>(config.getStressorConfig());
} catch (const std::exception& e) {
std::cout << "Invalid configuration. Exception: " << e.what() << std::endl;
return 1;
}

return 0;
}
17 changes: 9 additions & 8 deletions cachelib/cachebench/cache/Cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include <folly/Format.h>
#include <folly/container/F14Map.h>
#include <folly/hash/Hash.h>
#include <folly/json/DynamicConverter.h>
#include <folly/json/json.h>
Expand Down Expand Up @@ -312,8 +313,8 @@ class Cache {
// return true if the key was previously detected to be inconsistent. This
// is useful only when consistency checking is enabled by calling
// enableConsistencyCheck()
bool isInvalidKey(const std::string& key) {
return invalidKeys_[key].load(std::memory_order_relaxed);
bool isInvalidKey(const std::string_view key) {
return invalidKeys_.find(key)->second.load(std::memory_order_relaxed);
}

// Get overall stats on the whole cache allocator
Expand Down Expand Up @@ -426,7 +427,7 @@ class Cache {
// Since this can be accessed from multiple threads, the map is initialized
// during start up and only the value is updated by flipping the bit
// atomically.
std::unordered_map<std::string, std::atomic<bool>> invalidKeys_;
folly::F14NodeMap<std::string, std::atomic<bool>> invalidKeys_;

// number of inconsistency detected so far with the operations
std::atomic<unsigned int> inconsistencyCount_{0};
Expand Down Expand Up @@ -835,7 +836,7 @@ void Cache<Allocator>::enableConsistencyCheck(
valueTracker_ =
std::make_unique<ValueTracker>(ValueTracker::wrapStrings(keys));
for (const std::string& key : keys) {
invalidKeys_[key] = false;
invalidKeys_.emplace(key, false);
}
}

Expand Down Expand Up @@ -983,7 +984,7 @@ typename Cache<Allocator>::ReadHandle Cache<Allocator>::find(Key key) {
auto opId = valueTracker_->beginGet(key);
auto it = findFn();
if (checkGet(opId, it)) {
invalidKeys_[key.str()].store(true, std::memory_order_relaxed);
invalidKeys_.find(key)->second.store(true, std::memory_order_relaxed);
}
return it;
}
Expand Down Expand Up @@ -1027,7 +1028,7 @@ Cache<Allocator>::asyncFind(Key key) {

if (sf.isReady()) {
if (checkGet(opId, sf.value())) {
invalidKeys_[key.str()].store(true, std::memory_order_relaxed);
invalidKeys_.find(key)->second.store(true, std::memory_order_relaxed);
}

return sf;
Expand All @@ -1038,7 +1039,7 @@ Cache<Allocator>::asyncFind(Key key) {
return std::move(sf).deferValue(
[this, opId = std::move(opId), key = std::move(key)](auto handle) {
if (checkGet(opId, handle)) {
invalidKeys_[key.str()].store(true, std::memory_order_relaxed);
invalidKeys_.find(key)->second.store(true, std::memory_order_relaxed);
}

return handle;
Expand Down Expand Up @@ -1070,7 +1071,7 @@ typename Cache<Allocator>::WriteHandle Cache<Allocator>::findToWrite(Key key) {
auto opId = valueTracker_->beginGet(key);
auto it = findToWriteFn();
if (checkGet(opId, it)) {
invalidKeys_[key.str()].store(true, std::memory_order_relaxed);
invalidKeys_.find(key)->second.store(true, std::memory_order_relaxed);
}
return it;
}
Expand Down
41 changes: 20 additions & 21 deletions cachelib/cachebench/runner/AsyncCacheStressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ class AsyncCacheStressor : public Stressor {
ThroughputStats& stats,
const Request* req,
folly::EventBase* evb,
const std::string* key) {
const std::string_view key) {
++stats.get;
auto lock = chainedItemAcquireSharedLock(*key);
auto lock = chainedItemAcquireSharedLock(key);

if (ticker_) {
ticker_->updateTimeStamp(req->timestamp);
Expand All @@ -233,8 +233,7 @@ class AsyncCacheStressor : public Stressor {
// add a distribution over sequences of requests/access patterns
// e.g. get-no-set and set-no-get

auto onReadyFn = [&, req, key = *key,
l = std::move(lock)](auto hdl) mutable {
auto onReadyFn = [&, req, key, l = std::move(lock)](auto hdl) mutable {
auto result = OpResultType::kGetMiss;

if (hdl == nullptr) {
Expand All @@ -247,7 +246,7 @@ class AsyncCacheStressor : public Stressor {
// appropriate here)
l.unlock();
auto xlock = chainedItemAcquireUniqueLock(key);
setKey(pid, stats, &key, *(req->sizeBegin), req->ttlSecs,
setKey(pid, stats, key, *(req->sizeBegin), req->ttlSecs,
req->admFeatureMap);
}
} else {
Expand All @@ -260,8 +259,8 @@ class AsyncCacheStressor : public Stressor {
}
};

cache_->recordAccess(*key);
auto sf = cache_->asyncFind(*key);
cache_->recordAccess(key);
auto sf = cache_->asyncFind(key);
if (sf.isReady()) {
// If the handle is ready, call onReadyFn directly to process the handle
onReadyFn(std::move(sf).value());
Expand All @@ -283,9 +282,9 @@ class AsyncCacheStressor : public Stressor {
ThroughputStats& stats,
const Request* req,
folly::EventBase* evb,
const std::string* key) {
const std::string_view key) {
++stats.get;
auto lock = chainedItemAcquireUniqueLock(*key);
auto lock = chainedItemAcquireUniqueLock(key);

// This was moved outside the lambda, as otherwise gcc-8.x crashes with an
// internal compiler error here (suspected regression in folly).
Expand All @@ -297,7 +296,7 @@ class AsyncCacheStressor : public Stressor {
++stats.getMiss;

++stats.set;
wHdl = cache_->allocate(pid, *key, *(req->sizeBegin), req->ttlSecs);
wHdl = cache_->allocate(pid, key, *(req->sizeBegin), req->ttlSecs);
if (!wHdl) {
++stats.setFailure;
return;
Expand Down Expand Up @@ -327,7 +326,7 @@ class AsyncCacheStressor : public Stressor {
};

// Always use asyncFind as findToWrite is sync when using HybridCache
auto sf = cache_->asyncFind(*key);
auto sf = cache_->asyncFind(key);
if (sf.isReady()) {
onReadyFn(std::move(sf).value());
return;
Expand All @@ -345,10 +344,10 @@ class AsyncCacheStressor : public Stressor {
void asyncUpdate(ThroughputStats& stats,
const Request* req,
folly::EventBase* evb,
const std::string* key) {
const std::string_view key) {
++stats.get;
++stats.update;
auto lock = chainedItemAcquireUniqueLock(*key);
auto lock = chainedItemAcquireUniqueLock(key);
if (ticker_) {
ticker_->updateTimeStamp(req->timestamp);
}
Expand All @@ -363,7 +362,7 @@ class AsyncCacheStressor : public Stressor {
cache_->updateItemRecordVersion(wHdl);
};

auto sf = cache_->asyncFind(*key);
auto sf = cache_->asyncFind(key);
if (sf.isReady()) {
onReadyFn(std::move(sf).value());
return;
Expand Down Expand Up @@ -457,18 +456,18 @@ class AsyncCacheStressor : public Stressor {
const auto pid = static_cast<PoolId>(opPoolDist(gen));
const Request& req(getReq(pid, gen, lastRequestId));
OpType op = req.getOp();
const std::string* key = &(req.key);
std::string_view key = req.key;
std::string oneHitKey;
if (op == OpType::kLoneGet || op == OpType::kLoneSet) {
oneHitKey = Request::getUniqueKey();
byrnedj marked this conversation as resolved.
Show resolved Hide resolved
key = &oneHitKey;
key = oneHitKey;
}

OpResultType result(OpResultType::kNop);
switch (op) {
case OpType::kLoneSet:
case OpType::kSet: {
auto lock = chainedItemAcquireUniqueLock(*key);
auto lock = chainedItemAcquireUniqueLock(key);
result = setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs,
req.admFeatureMap);

Expand All @@ -481,8 +480,8 @@ class AsyncCacheStressor : public Stressor {
}
case OpType::kDel: {
++stats.del;
auto lock = chainedItemAcquireUniqueLock(*key);
auto res = cache_->remove(*key);
auto lock = chainedItemAcquireUniqueLock(key);
auto res = cache_->remove(key);
if (res == CacheT::RemoveRes::kNotFoundInRam) {
++stats.delNotFound;
}
Expand Down Expand Up @@ -532,7 +531,7 @@ class AsyncCacheStressor : public Stressor {
OpResultType setKey(
PoolId pid,
ThroughputStats& stats,
const std::string* key,
const std::string_view key,
size_t size,
uint32_t ttlSecs,
const std::unordered_map<std::string, std::string>& featureMap) {
Expand All @@ -543,7 +542,7 @@ class AsyncCacheStressor : public Stressor {
}

++stats.set;
auto it = cache_->allocate(pid, *key, size, ttlSecs);
auto it = cache_->allocate(pid, key, size, ttlSecs);
if (it == nullptr) {
++stats.setFailure;
return OpResultType::kSetFailure;
Expand Down
Loading
Loading