Skip to content

Commit

Permalink
refactored batch promotion and added DML support
Browse files Browse the repository at this point in the history
  • Loading branch information
guptask committed Oct 15, 2023
1 parent 708513f commit eb23469
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 31 deletions.
107 changes: 98 additions & 9 deletions cachelib/allocator/CacheAllocator-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,7 @@ void CacheAllocator<CacheTrait>::evictRegularItems(TierId tid, PoolId pid, Class
evictionData[i].candidate->toString()));
}
}

if (dmlBatchSize) {
handler = dml::submit<dml::hardware>(dml::batch, sequence);
if (!handler.valid()) {
Expand Down Expand Up @@ -1613,6 +1614,7 @@ void CacheAllocator<CacheTrait>::evictRegularItems(TierId tid, PoolId pid, Class
util::LatencyTracker smallItemWait{stats().evictDmlSmallItemWaitLatency_, smallBatch};
result = handler.get();
}

if (result.status != dml::status_code::ok) {
/* Re-try using CPU memmove */
for (auto i = 0U; i < dmlBatchSize; i++) {
Expand All @@ -1625,7 +1627,92 @@ void CacheAllocator<CacheTrait>::evictRegularItems(TierId tid, PoolId pid, Class

/* Complete book keeping for items moved successfully via DSA based batch move */
for (auto i = 0U; i < dmlBatchSize; i++) {
moved[i] = moveRegularItemBookKeeper(*evictionData[i].candidate, newItemHdls[i]);
moved[i] = completeAccessContainerUpdate(*evictionData[i].candidate, newItemHdls[i]);
}
}

template <typename CacheTrait>
void CacheAllocator<CacheTrait>::promoteRegularItems(TierId tid, PoolId pid, ClassId cid,
std::vector<Item*>& candidates,
std::vector<WriteHandle>& newItemHdls,
bool skipAddInMMContainer,
bool fromBgThread,
std::vector<bool>& moved) {
/* Split batch for DSA-based move */
const auto& pool = allocator_[tid]->getPool(pid);
const auto& allocSizes = pool.getAllocSizes();
auto isLarge = allocSizes[cid] >= config_.largeItemMinSize;
auto dmlBatchRatio = isLarge ? config_.largeItemBatchPromoteDsaUsageFraction :
config_.smallItemBatchPromoteDsaUsageFraction;
size_t dmlBatchSize =
(config_.dsaEnabled && candidates.size() >= config_.minBatchSizeForDsaUsage) ?
static_cast<size_t>(candidates.size() * dmlBatchRatio) : 0;
auto sequence = dml::sequence<allocator_t>(dmlBatchSize);
batch_handler_t handler{};

/* Move a calculated portion of the batch using DSA (if enabled) */
for (auto i = 0U; i < dmlBatchSize; i++) {
XDCHECK(!candidates[i]->isExpired());
XDCHECK_EQ(newItemHdls[i]->getSize(), candidates[i]->getSize());
if (candidates[i]->isNvmClean()) {
newItemHdls[i]->markNvmClean();
}
dml::const_data_view srcView = dml::make_view(
reinterpret_cast<uint8_t*>(candidates[i]->getMemory()), candidates[i]->getSize());
dml::data_view dstView = dml::make_view(
reinterpret_cast<uint8_t*>(newItemHdls[i]->getMemory()), newItemHdls[i]->getSize());
if (sequence.add(dml::mem_copy, srcView, dstView) != dml::status_code::ok) {
throw std::runtime_error(folly::sformat(
"failed to add dml::mem_copy operation to the sequence for item: {}",
candidates[i]->toString()));
}
}

if (dmlBatchSize) {
handler = dml::submit<dml::hardware>(dml::batch, sequence);
if (!handler.valid()) {
auto status = handler.get();
XDCHECK(handler.valid()) << dmlErrStr(status);
throw std::runtime_error(folly::sformat(
"Failed dml sequence hw submission: {}", dmlErrStr(status)));
}
(*stats_.promoteDmlBatchSubmits)[tid][pid][cid].inc();
}

/* Move the remaining batch using CPU memmove */
for (auto i = dmlBatchSize; i < candidates.size(); i++) {
moved[i] = moveRegularItem(*candidates[i], newItemHdls[i],
skipAddInMMContainer, fromBgThread);
}

/* If DSA batch move not in use */
if (!dmlBatchSize) {
return;
}

/* Complete the DSA based batch move */
dml::batch_result result{};
{
size_t largeBatch = isLarge ? dmlBatchSize : 0;
size_t smallBatch = dmlBatchSize - largeBatch;
util::LatencyTracker largeItemWait{stats().promoteDmlLargeItemWaitLatency_, largeBatch};
util::LatencyTracker smallItemWait{stats().promoteDmlSmallItemWaitLatency_, smallBatch};
result = handler.get();
}

if (result.status != dml::status_code::ok) {
/* Re-try using CPU memmove */
for (auto i = 0U; i < dmlBatchSize; i++) {
moved[i] = moveRegularItem(*candidates[i], newItemHdls[i],
skipAddInMMContainer, fromBgThread);
}
(*stats_.promoteDmlBatchFails)[tid][pid][cid].inc();
return;
}

/* Complete book keeping for items moved successfully via DSA based batch move */
for (auto i = 0U; i < dmlBatchSize; i++) {
moved[i] = completeAccessContainerUpdate(*candidates[i], newItemHdls[i]);
}
}

Expand Down Expand Up @@ -1693,11 +1780,11 @@ bool CacheAllocator<CacheTrait>::moveRegularItem(Item& oldItem,
XDCHECK(!oldItem.hasChainedItem());
XDCHECK(newItemHdl->hasChainedItem());
}
return moveRegularItemBookKeeper(oldItem, newItemHdl);
return completeAccessContainerUpdate(oldItem, newItemHdl);
}

template <typename CacheTrait>
bool CacheAllocator<CacheTrait>::moveRegularItemBookKeeper(
bool CacheAllocator<CacheTrait>::completeAccessContainerUpdate(
Item& oldItem, WriteHandle& newItemHdl) {
auto predicate = [&](const Item& item){
// we rely on moving flag being set (it should block all readers)
Expand Down Expand Up @@ -1921,7 +2008,7 @@ CacheAllocator<CacheTrait>::getNextCandidatesPromotion(TierId tid,
candidateHandles.push_back(std::move(candidateHandle_));
}
};

mmContainer.withPromotionIterator(iterateAndMark);

if (candidates.size() < batch) {
Expand All @@ -1934,7 +2021,7 @@ CacheAllocator<CacheTrait>::getNextCandidatesPromotion(TierId tid,
return candidates;
}
}

//1. get and item handle from a new allocation
for (int i = 0; i < candidates.size(); i++) {
Item *candidate = candidates[i];
Expand All @@ -1954,6 +2041,7 @@ CacheAllocator<CacheTrait>::getNextCandidatesPromotion(TierId tid,
folly::sformat("Was not to acquire new alloc, failed alloc {}", blankAllocs[i]));
}
}

//2. add in batch to mmContainer
auto& newMMContainer = getMMContainer(tid-1, pid, cid);
uint32_t added = newMMContainer.addBatch(newAllocs.begin(), newAllocs.end());
Expand All @@ -1963,12 +2051,15 @@ CacheAllocator<CacheTrait>::getNextCandidatesPromotion(TierId tid,
folly::sformat("Was not able to add all new items, failed item {} and handle {}",
newAllocs[added]->toString(),newHandles[added]->toString()));
}

//3. copy item data - don't need to add in mmContainer
std::vector<bool> moved(candidates.size());
promoteRegularItems(tid, pid, cid, candidates, newHandles, true, true, moved);

for (int i = 0; i < candidates.size(); i++) {
Item *candidate = candidates[i];
WriteHandle newHandle = std::move(newHandles[i]);
bool moved = moveRegularItem(*candidate,newHandle, true, true);
if (moved) {
if (moved[i]) {
XDCHECK(candidate->getKey() == newHandle->getKey());
if (markMoving) {
auto ref = candidate->unmarkMoving();
Expand All @@ -1980,7 +2071,6 @@ CacheAllocator<CacheTrait>::getNextCandidatesPromotion(TierId tid,
}
} else {
typename NvmCacheT::PutToken token{};

removeFromMMContainer(*newAllocs[i]);
auto ret = handleFailedMove(candidate,token,false,markMoving);
XDCHECK(ret);
Expand All @@ -1989,7 +2079,6 @@ CacheAllocator<CacheTrait>::getNextCandidatesPromotion(TierId tid,
releaseBackToAllocator(*candidate, RemoveContext::kNormal, false);
XDCHECK(res == ReleaseRes::kReleased);
}

}
}
return candidates;
Expand Down
41 changes: 30 additions & 11 deletions cachelib/allocator/CacheAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -1693,25 +1693,44 @@ class CacheAllocator : public CacheBase {
// not exist.
FOLLY_ALWAYS_INLINE WriteHandle findFastImpl(Key key, AccessMode mode);

// Evicts a regular item to a far memory tier.
// Evicts a batch of regular item from near to far memory tier.
//
// @param tid the id of the tier to look for evictions inside
// @param pid the id of the pool to look for evictions inside
// @param tid the id of the tier to look for evictions inside
// @param pid the id of the pool to look for evictions inside
// @param cid the id of the class to look for evictions inside
// @param evictionData Reference to the vector of items being moved
// @param newItemHdls Reference to the vector of new item handles being moved into
// @param skipAddInMMContainer
// So we can tell if we should add in mmContainer or wait
// @param evictionData reference to the vector of items being moved
// @param newItemHdls reference to the vector of new item handles being moved into
// @param skipAddInMMContainer
// so we can tell if we should add in mmContainer or wait
// to do in batch
// @param fromBgThread Use memmove instead of memcopy (for DTO testing)
// @param moved Save the status of move for each item
// @param fromBgThread use memmove instead of memcopy (for DTO testing)
// @param moved save the status of move for each item
void evictRegularItems(TierId tid, PoolId pid, ClassId cid,
std::vector<EvictionData>& evictionData,
std::vector<WriteHandle>& newItemHdls,
bool skipAddInMMContainer,
bool fromBgThread,
std::vector<bool>& moved);

// Promote a batch of regular items from far to near memory tier.
//
// @param tid the id of the tier to look for promotions inside
// @param pid the id of the pool to look for promotions inside
// @param cid the id of the class to look for promotions inside
// @param candidates reference to the vector of items being promoted
// @param newItemHdls reference to the vector of new item handles being moved into
// @param skipAddInMMContainer
// so we can tell if we should add in mmContainer or wait
// to do in batch
// @param fromBgThread use memmove instead of memcopy (for DTO testing)
// @param moved save the status of move for each item
void promoteRegularItems(TierId tid, PoolId pid, ClassId cid,
std::vector<Item*>& candidates,
std::vector<WriteHandle>& newItemHdls,
bool skipAddInMMContainer,
bool fromBgThread,
std::vector<bool>& moved);

// Moves a regular item to a different memory tier.
//
// @param oldItem Reference to the item being moved
Expand All @@ -1726,12 +1745,12 @@ class CacheAllocator : public CacheBase {
bool skipAddInMMContainer,
bool fromBgThread);

// Performs book keeping after a regular item is moved to a different memory tier.
// Update in the access container after a regular item is moved to a different memory tier.
//
// @param oldItem Reference to the item being moved
// @param newItemHdl Reference to the handle of the new item being moved into
// @return true If the containers were updated successfully.
bool moveRegularItemBookKeeper(Item& oldItem, WriteHandle& newItemHdl);
bool completeAccessContainerUpdate(Item& oldItem, WriteHandle& newItemHdl);

// Moves a chained item to a different memory tier.
//
Expand Down
29 changes: 21 additions & 8 deletions cachelib/allocator/CacheAllocatorConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,10 @@ class CacheAllocatorConfig {
CacheAllocatorConfig& enableDSA(bool useDsa,
uint64_t minBatchSize,
uint64_t largeItemSizeMin,
double largeItemBatchFrac,
double smallItemBatchFrac);
double largeItemBatchEvictFrac,
double smallItemBatchEvictFrac,
double largeItemBatchPromoFrac,
double smallItemBatchPromoFrac);

// This enables an optimization for Pool rebalancing and resizing.
// The rough idea is to ensure only the least useful items are evicted when
Expand Down Expand Up @@ -517,12 +519,18 @@ class CacheAllocatorConfig {
// Min item size (in Bytes) to get classified as Large
uint64_t largeItemMinSize{8192};

// Large items - 80:20 split
// Large items eviction - 80:20 split
double largeItemBatchEvictDsaUsageFraction{0.8};

// Small items - 70:30 split
// Small items eviction - 70:30 split
double smallItemBatchEvictDsaUsageFraction{0.7};

// Large items promotion - 80:20 split
double largeItemBatchPromoteDsaUsageFraction{0.8};

// Small items promotion - 70:30 split
double smallItemBatchPromoteDsaUsageFraction{0.7};

// step size for compact cache size optimization: how many percents of the
// victim to move
unsigned int ccacheOptimizeStepSizePercent{1};
Expand Down Expand Up @@ -1047,13 +1055,18 @@ template <typename T>
CacheAllocatorConfig<T>& CacheAllocatorConfig<T>::enableDSA(bool useDsa,
uint64_t minBatchSize,
uint64_t largeItemSizeMin,
double largeItemBatchFrac,
double smallItemBatchFrac) {
double largeItemBatchEvictFrac,
double smallItemBatchEvictFrac,
double largeItemBatchPromoFrac,
double smallItemBatchPromoFrac) {
dsaEnabled = useDsa;
minBatchSizeForDsaUsage = minBatchSize;
largeItemMinSize = largeItemSizeMin;
largeItemBatchEvictDsaUsageFraction = largeItemBatchFrac;
smallItemBatchEvictDsaUsageFraction = smallItemBatchFrac;
largeItemBatchEvictDsaUsageFraction = largeItemBatchEvictFrac;
smallItemBatchEvictDsaUsageFraction = smallItemBatchEvictFrac;
largeItemBatchPromoteDsaUsageFraction = largeItemBatchPromoFrac;
smallItemBatchPromoteDsaUsageFraction = smallItemBatchPromoFrac;

return *this;
}

Expand Down
4 changes: 3 additions & 1 deletion cachelib/cachebench/cache/Cache-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ Cache<Allocator>::Cache(const CacheConfig& config,
config_.minBatchSizeForDsaUsage,
config_.largeItemMinSize,
config_.largeItemBatchEvictDsaUsageFraction,
config_.smallItemBatchEvictDsaUsageFraction);
config_.smallItemBatchEvictDsaUsageFraction,
config_.largeItemBatchPromoteDsaUsageFraction,
config_.smallItemBatchPromoteDsaUsageFraction);
allocatorConfig_.enableBackgroundEvictor(
config_.getBackgroundEvictorStrategy(),
std::chrono::milliseconds(config_.backgroundEvictorIntervalMilSec),
Expand Down
2 changes: 1 addition & 1 deletion cachelib/cachebench/cache/CacheStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ struct Stats {
for (TierId tid = 0; tid < nTiers; tid++) {
out << folly::sformat(
"Tier {} Evict Attempts: {:,}\n"
"Tier {} Success: {:.2f}%",
"Tier {} Eviction Success: {:.2f}%",
tid, evictAttempts[tid],
tid, invertPctFn(evictAttempts[tid] - numEvictions[tid], evictAttempts[tid]))
<< std::endl;
Expand Down
4 changes: 3 additions & 1 deletion cachelib/cachebench/util/CacheConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ CacheConfig::CacheConfig(const folly::dynamic& configJson) {
JSONSetVal(configJson, largeItemMinSize);
JSONSetVal(configJson, largeItemBatchEvictDsaUsageFraction);
JSONSetVal(configJson, smallItemBatchEvictDsaUsageFraction);
JSONSetVal(configJson, largeItemBatchPromoteDsaUsageFraction);
JSONSetVal(configJson, smallItemBatchPromoteDsaUsageFraction);
JSONSetVal(configJson, moveOnSlabRelease);
JSONSetVal(configJson, rebalanceStrategy);
JSONSetVal(configJson, rebalanceMinSlabs);
Expand Down Expand Up @@ -136,7 +138,7 @@ CacheConfig::CacheConfig(const folly::dynamic& configJson) {
// if you added new fields to the configuration, update the JSONSetVal
// to make them available for the json configs and increment the size
// below
checkCorrectSize<CacheConfig, 928>();
checkCorrectSize<CacheConfig, 944>();

if (numPools != poolSizes.size()) {
throw std::invalid_argument(folly::sformat(
Expand Down
2 changes: 2 additions & 0 deletions cachelib/cachebench/util/CacheConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ struct CacheConfig : public JSONConfig {
uint64_t largeItemMinSize{8192};
double largeItemBatchEvictDsaUsageFraction{0.8};
double smallItemBatchEvictDsaUsageFraction{0.7};
double largeItemBatchPromoteDsaUsageFraction{0.8};
double smallItemBatchPromoteDsaUsageFraction{0.7};
uint64_t rebalanceMinSlabs{1};
double rebalanceDiffRatio{0.25};
bool moveOnSlabRelease{false};
Expand Down

0 comments on commit eb23469

Please sign in to comment.