Skip to content

Commit

Permalink
prefix-based colocation feature
Browse files Browse the repository at this point in the history
Summary:
This will allow us to configure a prefix-based colocation, for example:
```
...
    "bucketize": True,
    "total_buckets": "%total-buckets%",
    "buckets_colocation": {
        "asdf": 1000,
        "qwert": 2000,
        "zxcv": 3000
    },
    "bucketization_keyspace": "main"
...
```

The colocation works like this:
1. If the key matches any of the configured prefixes, do the following:
   - **bucket** = hash the key to a number [0, configured_buckets_for_prefix).
   -  construct a string like this: <**prefix**><**bucket**> and hash it to a **bucket2**.
   - use **bucket2** as a routing key.
2. If there is no match, hash the key to a bucket same as it was before.

Reviewed By: udippant

Differential Revision: D53232349

fbshipit-source-id: 6ba56a91e52c77017d468cec78e17a521425b43b
  • Loading branch information
Lenar Fatikhov authored and facebook-github-bot committed Feb 15, 2024
1 parent fb23451 commit fa880c5
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 24 deletions.
37 changes: 37 additions & 0 deletions mcrouter/routes/McBucketRoute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,22 @@
*/

#include "mcrouter/routes/McBucketRoute.h"
#include <cstdint>

namespace facebook {
namespace memcache {
namespace mcrouter {
namespace detail {
std::vector<std::pair<std::string, Ch3HashFunc>> buildPrefixMap(
const std::unordered_map<std::string, uint64_t>& map) {
std::vector<std::pair<std::string, Ch3HashFunc>> result;
for (const auto& [prefix, bucket] : map) {
result.emplace_back(prefix, Ch3HashFunc(bucket));
}
return result;
}
} // namespace detail

McBucketRouteSettings parseMcBucketRouteSettings(const folly::dynamic& json) {
McBucketRouteSettings settings;
checkLogic(
Expand All @@ -20,6 +32,31 @@ McBucketRouteSettings parseMcBucketRouteSettings(const folly::dynamic& json) {
1,
std::numeric_limits<int64_t>::max());

auto jBucketsByPrefix = json.get_ptr("total_buckets_by_prefix");
if (jBucketsByPrefix && jBucketsByPrefix->isObject()) {
std::unordered_map<std::string, uint64_t> prefixToBuckets;
for (const auto& it : jBucketsByPrefix->items()) {
checkLogic(
it.first.isString(),
"{} expected string, found {}",
it.first,
it.first.typeName());
checkLogic(
it.second.isInt(),
"{} expected int, found {}",
it.second,
it.second.typeName());
auto buckets = it.second.asInt();
checkLogic(
0 < buckets && buckets <= totalBuckets,
"Bucket count should be in range (0, {}], got {}",
totalBuckets,
buckets);
prefixToBuckets[it.first.asString()] = it.second.asInt();
}
settings.prefixToBuckets = std::move(prefixToBuckets);
}

auto* bucketizationKeyspacePtr = json.get_ptr("bucketization_keyspace");
checkLogic(
bucketizationKeyspacePtr && bucketizationKeyspacePtr->isString(),
Expand Down
62 changes: 44 additions & 18 deletions mcrouter/routes/McBucketRoute.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,34 @@
#include "mcrouter/lib/Ch3HashFunc.h"
#include "mcrouter/lib/Reply.h"
#include "mcrouter/lib/RouteHandleTraverser.h"
#include "mcrouter/lib/fbi/cpp/LowerBoundPrefixMap.h"
#include "mcrouter/routes/RoutingUtils.h"

namespace facebook::memcache::mcrouter {

namespace detail {
std::vector<std::pair<std::string, Ch3HashFunc>> buildPrefixMap(
const std::unordered_map<std::string, uint64_t>& map);

FOLLY_ALWAYS_INLINE size_t getBucket(
const folly::StringPiece key,
const memcache::LowerBoundPrefixMap<Ch3HashFunc>& prefixMap,
const Ch3HashFunc& defaultCh3) {
if (!prefixMap.empty()) {
auto it = prefixMap.findPrefix(key);
if (it != prefixMap.end()) {
return defaultCh3(folly::to<std::string>(it->key(), it->value()(key)));
}
}
return defaultCh3(key);
}
} // namespace detail

struct McBucketRouteSettings {
size_t totalBuckets;
std::string salt;
std::string bucketizationKeyspace;
std::unordered_map<std::string, uint64_t> prefixToBuckets;
};

/**
Expand All @@ -30,6 +50,19 @@ struct McBucketRouteSettings {
* construct the routing key based on the resulted bucket and route the request
* based on this key.
*
* Additional feature is co-location of keys with common prefixes into a small
* set of buckets to improve performance - can be enabled by adding a
* "total_buckets_by_prefix" config like this:
*
* "bucketize": True,
* "total_buckets": 1000000,
* "total_buckets_by_prefix": {
* "asdf": 1000,
* "qwert": 2000,
* "zxcv": 3000
* },
* "bucketization_keyspace": "main",
*
* This particular route handle only calculates the bucket id and adds it
* to the fiber context.
* The downstream route's responsibility is to fetch the bucket id from the
Expand All @@ -38,8 +71,9 @@ struct McBucketRouteSettings {
* Config:
* - bucketize(bool) - enable the bucketization
* - total_buckets(int) - total number of buckets
* - bucketize_until(int) - enable the handle for buckets until (exclusive)
* this number. Must be less than total_buckets. Needed for gradual migration.
* - total_buckets_by_prefix(object) - map of prefixes to total_buckets. Used to
* co-locate keys under a prefix into a smaller set of buckets to increase
* batching.
*/
template <class RouterInfo>
class McBucketRoute {
Expand All @@ -51,16 +85,16 @@ class McBucketRoute {
McBucketRoute(RouteHandlePtr rh, McBucketRouteSettings& settings)
: rh_(std::move(rh)),
totalBuckets_(settings.totalBuckets),
salt_(settings.salt),
ch3_(totalBuckets_),
bucketizationKeyspace_(settings.bucketizationKeyspace) {}
bucketizationKeyspace_(settings.bucketizationKeyspace),
prefixMap_(detail::buildPrefixMap(settings.prefixToBuckets)) {}

std::string routeName() const {
return fmt::format(
"bucketize|total_buckets={}|salt={}|bucketization_keyspace={}",
"bucketize|total_buckets={}|bucketization_keyspace={}|prefix_map_enabled={}",
totalBuckets_,
salt_,
bucketizationKeyspace_);
bucketizationKeyspace_,
prefixMap_.empty() ? "false" : "true");
}

template <class Request>
Expand All @@ -73,11 +107,7 @@ class McBucketRoute {
return t(*rh_, req);
}
auto bucketId = folly::fibers::runInMainContext([this, &req]() {
if (this->salt_.empty()) {
return ch3_(getRoutingKey<Request>(req));
} else {
return ch3_(getRoutingKey<Request>(req, this->salt_));
}
return detail::getBucket(getRoutingKey<Request>(req), prefixMap_, ch3_);
});
if (auto* ctx = fiber_local<RouterInfo>::getTraverseCtx()) {
ctx->recordBucketizationData(
Expand All @@ -94,11 +124,7 @@ class McBucketRoute {
template <class Request>
ReplyT<Request> route(const Request& req) const {
auto bucketId = folly::fibers::runInMainContext([this, &req]() {
if (this->salt_.empty()) {
return ch3_(getRoutingKey<Request>(req));
} else {
return ch3_(getRoutingKey<Request>(req, this->salt_));
}
return detail::getBucket(getRoutingKey<Request>(req), prefixMap_, ch3_);
});
auto& ctx = fiber_local<RouterInfo>::getSharedCtx();

Expand All @@ -125,9 +151,9 @@ class McBucketRoute {
private:
const RouteHandlePtr rh_;
const size_t totalBuckets_{0};
const std::string salt_;
const Ch3HashFunc ch3_;
const std::string bucketizationKeyspace_;
const memcache::LowerBoundPrefixMap<Ch3HashFunc> prefixMap_;
};

McBucketRouteSettings parseMcBucketRouteSettings(const folly::dynamic& json);
Expand Down
52 changes: 50 additions & 2 deletions mcrouter/routes/test/McBucketRouteTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ TEST(McBucketRouteTest, checkParams) {
EXPECT_TRUE(params.size() == 4);
EXPECT_EQ(params[0], "bucketize");
EXPECT_EQ(params[1], folly::to<std::string>("total_buckets=", total));
EXPECT_EQ(params[2], folly::to<std::string>("salt="));
EXPECT_EQ(
params[3], folly::to<std::string>("bucketization_keyspace=", keyspace));
params[2], folly::to<std::string>("bucketization_keyspace=", keyspace));
EXPECT_EQ(params[3], "prefix_map_enabled=false");
}

TEST(McBucketRouteTest, recordBucketizationData) {
Expand Down Expand Up @@ -146,4 +146,52 @@ TEST(McBucketRouteTest, GetRoutingKey) {
EXPECT_EQ(getRoutingKey(req, std::string()), "getReq");
}

TEST(McBucketRouteTest, bucketIdWithColocation) {
std::vector<std::shared_ptr<TestHandle>> srHandleVec{
std::make_shared<TestHandle>(
GetRouteTestData(carbon::Result::FOUND, "a")),
};
auto mockSrHandle = get_route_handles(srHandleVec)[0];

constexpr folly::StringPiece kMcBucketRouteConfig = R"(
{
"bucketize": true,
"total_buckets": 1000000,
"total_buckets_by_prefix": {
"asdf": 1,
"qwert": 20,
"zxcv": 25
},
"bucketization_keyspace": "testRegion:testPool"
}
)";

auto rh = makeMcBucketRoute<MemcacheRouterInfo>(
mockSrHandle, folly::parseJson(kMcBucketRouteConfig));
ASSERT_TRUE(rh);
mockFiberContext();
rh->route(McGetRequest("getReq")); // not colocated, bucketId == 755248
EXPECT_FALSE(srHandleVec[0]->sawBucketIds.empty());
EXPECT_EQ(755248, srHandleVec[0]->sawBucketIds[0]);
rh->route(McGetRequest("asdf:getReq")); // colocated, bucketId == 304678
EXPECT_EQ(304678, srHandleVec[0]->sawBucketIds[1]);
rh->route(McGetRequest("asdf:getReq2")); // colocated, bucketId == 304678
EXPECT_EQ(304678, srHandleVec[0]->sawBucketIds[2]);
rh->route(McGetRequest("asdf:getReq3")); // colocated, bucketId == 304678
EXPECT_EQ(304678, srHandleVec[0]->sawBucketIds[3]);
rh->route(
McGetRequest("asdfgetReqanything")); // colocated, bucketId == 304678
EXPECT_EQ(304678, srHandleVec[0]->sawBucketIds[4]);
rh->route(McGetRequest("asd:getReq")); // not colocated, bucketId == 973109
EXPECT_EQ(973109, srHandleVec[0]->sawBucketIds[5]);
rh->route(McGetRequest("qwert:getReq")); // colocated, bucketId == 454705
EXPECT_EQ(454705, srHandleVec[0]->sawBucketIds[6]);
rh->route(McGetRequest("zxcvgetReq")); // colocated, bucketId == 97898
EXPECT_EQ(97898, srHandleVec[0]->sawBucketIds[7]);
rh->route(McGetRequest("zxcv")); // colocated, bucketId == 418469
EXPECT_EQ(418469, srHandleVec[0]->sawBucketIds[8]);
rh->route(McGetRequest("zxc")); // not colocated, bucketId == 839734
EXPECT_EQ(839734, srHandleVec[0]->sawBucketIds[9]);
}

} // namespace facebook::memcache::mcrouter
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,12 @@ TEST(McRouteHandleProvider, bucketized_sr_route_and_mcreplay_asynclogRoutes) {
auto rh = setup.getRoute(kBucketizedSRRoute);
EXPECT_TRUE(rh != nullptr);
EXPECT_EQ(
"bucketize|total_buckets=1000|salt=|bucketization_keyspace=tst",
"bucketize|total_buckets=1000|bucketization_keyspace=tst|prefix_map_enabled=false",
rh->routeName());
auto asynclogRoutes = setup.provider().releaseAsyncLogRoutes();
EXPECT_EQ(1, asynclogRoutes.size());
EXPECT_EQ(
"bucketize|total_buckets=1000|salt=|bucketization_keyspace=tst",
"bucketize|total_buckets=1000|bucketization_keyspace=tst|prefix_map_enabled=false",
asynclogRoutes["test.asynclog"]->routeName());
}

Expand All @@ -168,11 +168,11 @@ TEST(McRouteHandleProvider, bucketized_pool_route_and_mcreplay_asynclogRoutes) {
auto rh = setup.getRoute(kBucketizedPoolRoute);
EXPECT_TRUE(rh != nullptr);
EXPECT_EQ(
"bucketize|total_buckets=1000|salt=|bucketization_keyspace=tst",
"bucketize|total_buckets=1000|bucketization_keyspace=tst|prefix_map_enabled=false",
rh->routeName());
auto asynclogRoutes = setup.provider().releaseAsyncLogRoutes();
EXPECT_EQ(1, asynclogRoutes.size());
EXPECT_EQ(
"bucketize|total_buckets=1000|salt=|bucketization_keyspace=tst",
"bucketize|total_buckets=1000|bucketization_keyspace=tst|prefix_map_enabled=false",
asynclogRoutes["test.asynclog"]->routeName());
}

0 comments on commit fa880c5

Please sign in to comment.