Skip to content

Commit

Permalink
Merge 7a1ee56 into 29225f6
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherHogan committed Aug 20, 2021
2 parents 29225f6 + 7a1ee56 commit f05a326
Show file tree
Hide file tree
Showing 12 changed files with 598 additions and 206 deletions.
47 changes: 47 additions & 0 deletions src/buffer_organizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,51 @@ int MoveToTarget(SharedMemoryContext *context, RpcContext *rpc, BlobID blob_id,
return result;
}

void LocalAdjustFlushCount(SharedMemoryContext *context,
const std::string &vbkt_name, int adjustment) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
VBucketID id = LocalGetVBucketId(context, vbkt_name.c_str());
VBucketInfo *info = LocalGetVBucketInfoById(mdm, id);
int flush_count = info->async_flush_count.fetch_add(adjustment);
VLOG(1) << "Flush count on VBucket " << vbkt_name
<< (adjustment > 0 ? "incremented" : "decremented") << " to "
<< flush_count + adjustment << "\n";
}

void LocalIncrementFlushCount(SharedMemoryContext *context,
const std::string &vbkt_name) {
LocalAdjustFlushCount(context, vbkt_name, 1);
}

void LocalDecrementFlushCount(SharedMemoryContext *context,
const std::string &vbkt_name) {
LocalAdjustFlushCount(context, vbkt_name, -1);
}

void IncrementFlushCount(SharedMemoryContext *context, RpcContext *rpc,
const std::string &vbkt_name) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
u32 target_node = HashString(mdm, rpc, vbkt_name.c_str());

if (target_node == rpc->node_id) {
LocalIncrementFlushCount(context, vbkt_name);
} else {
RpcCall<bool>(rpc, target_node, "RemoteIncrementFlushCount",
vbkt_name);
}
}

void DecrementFlushCount(SharedMemoryContext *context, RpcContext *rpc,
const std::string &vbkt_name) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
u32 target_node = HashString(mdm, rpc, vbkt_name.c_str());

if (target_node == rpc->node_id) {
LocalDecrementFlushCount(context, vbkt_name);
} else {
RpcCall<bool>(rpc, target_node, "RemoteDecrementFlushCount",
vbkt_name);
}
}

} // namespace hermes
9 changes: 8 additions & 1 deletion src/buffer_organizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,14 @@ void BoCopy(SharedMemoryContext *context, BufferID src, TargetID dest);
void BoDelete(SharedMemoryContext *context, BufferID src);

void ShutdownBufferOrganizer(SharedMemoryContext *context);

void IncrementFlushCount(SharedMemoryContext *context, RpcContext *rpc,
const std::string &vbkt_name);
void DecrementFlushCount(SharedMemoryContext *context, RpcContext *rpc,
const std::string &vbkt_name);
void LocalIncrementFlushCount(SharedMemoryContext *context,
const std::string &vbkt_name);
void LocalDecrementFlushCount(SharedMemoryContext *context,
const std::string &vbkt_name);
} // namespace hermes

#endif // HERMES_BUFFER_ORGANIZER_H_
8 changes: 5 additions & 3 deletions src/buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,11 @@ void Finalize(SharedMemoryContext *context, CommunicationContext *comm,
}

void LockBuffer(BufferHeader *header) {
bool expected = false;
while (header->locked.compare_exchange_weak(expected, true)) {
// NOTE(chogan): Spin until we get the lock
while (true) {
bool expected = false;
if (header->locked.compare_exchange_weak(expected, true)) {
break;
}
}
}

Expand Down
156 changes: 130 additions & 26 deletions src/metadata_management.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,20 @@ void LocalPut(MetadataManager *mdm, const char *key, u64 val,
PutToStorage(mdm, key, val, map_type);
}

void LocalPut(MetadataManager *mdm, BlobID key, const BlobInfo &value) {
PutToStorage(mdm, key, value);
}

u64 LocalGet(MetadataManager *mdm, const char *key, MapType map_type) {
u64 result = GetFromStorage(mdm, key, map_type);

return result;
}

void LocalDelete(MetadataManager *mdm, BlobID key) {
DeleteFromStorage(mdm, key, false);
}

void LocalDelete(MetadataManager *mdm, const char *key, MapType map_type) {
DeleteFromStorage(mdm, key, map_type);
}
Expand Down Expand Up @@ -190,9 +198,9 @@ BlobID GetBlobId(SharedMemoryContext *context, RpcContext *rpc,
bool track_stats) {
std::string internal_name = MakeInternalBlobName(name, bucket_id);
BlobID result = {};
result.as_int = GetId(context, rpc, internal_name.c_str(), kMapType_Blob);
if (track_stats) {
IncrementBlobStatsSafely(context, rpc, internal_name, bucket_id, result);
result.as_int = GetId(context, rpc, internal_name.c_str(), kMapType_BlobId);
if (!IsNullBlobId(result) && track_stats) {
IncrementBlobStats(context, rpc, result);
}

return result;
Expand Down Expand Up @@ -231,7 +239,7 @@ void LocalPutVBucketId(MetadataManager *mdm, const std::string &name,
void PutBlobId(MetadataManager *mdm, RpcContext *rpc, const std::string &name,
BlobID id, BucketID bucket_id) {
std::string internal_name = MakeInternalBlobName(name, bucket_id);
PutId(mdm, rpc, internal_name, id.as_int, kMapType_Blob);
PutId(mdm, rpc, internal_name, id.as_int, kMapType_BlobId);
}

void DeleteId(MetadataManager *mdm, RpcContext *rpc, const std::string &name,
Expand All @@ -255,10 +263,20 @@ void DeleteVBucketId(MetadataManager *mdm, RpcContext *rpc,
DeleteId(mdm, rpc, name, kMapType_VBucket);
}

void LocalDeleteBlobInfo(MetadataManager *mdm, BlobID blob_id) {
LocalDelete(mdm, blob_id);
}

void LocalDeleteBlobId(MetadataManager *mdm, const std::string &name,
BucketID bucket_id) {
std::string internal_name = MakeInternalBlobName(name, bucket_id);
LocalDelete(mdm, internal_name.c_str(), kMapType_BlobId);
}

void DeleteBlobId(MetadataManager *mdm, RpcContext *rpc,
const std::string &name, BucketID bucket_id) {
std::string internal_name = MakeInternalBlobName(name, bucket_id);
DeleteId(mdm, rpc, internal_name, kMapType_Blob);
DeleteId(mdm, rpc, internal_name, kMapType_BlobId);
}

BucketInfo *LocalGetBucketInfoByIndex(MetadataManager *mdm, u32 index) {
Expand All @@ -272,7 +290,7 @@ std::string LocalGetBlobNameFromId(SharedMemoryContext *context,
BlobID blob_id) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
std::string blob_name = ReverseGetFromStorage(mdm, blob_id.as_int,
kMapType_Blob);
kMapType_BlobId);

std::string result;
if (blob_name.size() > kBucketIdStringSize) {
Expand Down Expand Up @@ -324,7 +342,7 @@ u64 HexStringToU64(const std::string &s) {
BucketID LocalGetBucketIdFromBlobId(SharedMemoryContext *context, BlobID id) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
std::string internal_name = ReverseGetFromStorage(mdm, id.as_int,
kMapType_Blob);
kMapType_BlobId);
BucketID result = {};
if (internal_name.size() > kBucketIdStringSize) {
result.as_int = HexStringToU64(internal_name);
Expand Down Expand Up @@ -391,7 +409,6 @@ BucketID LocalGetNextFreeBucketId(SharedMemoryContext *context,
if (!IsNullBucketId(result)) {
BucketInfo *info = LocalGetBucketInfoByIndex(mdm, result.bits.index);
info->blobs = {};
info->stats = {};
info->ref_count.store(1);
info->active = true;
mdm->first_free_bucket = info->next_free;
Expand Down Expand Up @@ -619,6 +636,26 @@ BufferIdArray GetBufferIdsFromBlobId(Arena *arena,
return result;
}

void LocalCreateBlobMetadata(MetadataManager *mdm, const std::string &blob_name,
BlobID blob_id) {
LocalPut(mdm, blob_name.c_str(), blob_id.as_int, kMapType_BlobId);
BlobInfo blob_info = {};
blob_info.stats.frequency = 1;
blob_info.stats.recency = mdm->clock++;
LocalPut(mdm, blob_id, blob_info);
}

void CreateBlobMetadata(MetadataManager *mdm, RpcContext *rpc,
const std::string &blob_name, BlobID blob_id) {
u32 target_node = GetBlobNodeId(blob_id);
if (target_node == rpc->node_id) {
LocalCreateBlobMetadata(mdm, blob_name, blob_id);
} else {
RpcCall<bool>(rpc, target_node, "RemoteCreateBlobMetadata", blob_name,
blob_id);
}
}

void AttachBlobToBucket(SharedMemoryContext *context, RpcContext *rpc,
const char *blob_name, BucketID bucket_id,
const std::vector<BufferID> &buffer_ids,
Expand Down Expand Up @@ -649,7 +686,7 @@ void AttachBlobToBucket(SharedMemoryContext *context, RpcContext *rpc,
blob_id.bits.buffer_ids_offset = AllocateBufferIdList(context, rpc,
target_node,
buffer_ids);
PutBlobId(mdm, rpc, blob_name, blob_id, bucket_id);
CreateBlobMetadata(mdm, rpc, internal_name, blob_id);
AddBlobIdToBucket(mdm, rpc, blob_id, bucket_id);
}

Expand All @@ -663,9 +700,24 @@ void FreeBufferIdList(SharedMemoryContext *context, RpcContext *rpc,
}
}

void LocalDeleteBlobMetadata(MetadataManager *mdm, const char *blob_name,
BlobID blob_id, BucketID bucket_id) {
LocalDeleteBlobId(mdm, blob_name, bucket_id);
LocalDeleteBlobInfo(mdm, blob_id);
}

void LocalDestroyBlobByName(SharedMemoryContext *context, RpcContext *rpc,
const char *blob_name, BlobID blob_id,
BucketID bucket_id) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
BlobInfo *blob_info = GetBlobInfoPtr(mdm, blob_id);
// NOTE(chogan): Holding the mdm->blob_info_map_mutex
if (blob_info) {
// NOTE(chogan): Take the Blob lock to enusre that all outstanding
// background operations on the Blob complete before it's deleted.
BeginTicketMutex(&blob_info->lock);
}

if (!BlobIsInSwap(blob_id)) {
std::vector<BufferID> buffer_ids = GetBufferIdList(context, rpc, blob_id);
ReleaseBuffers(context, rpc, buffer_ids);
Expand All @@ -675,28 +727,17 @@ void LocalDestroyBlobByName(SharedMemoryContext *context, RpcContext *rpc,

FreeBufferIdList(context, rpc, blob_id);

MetadataManager *mdm = GetMetadataManagerFromContext(context);
DeleteBlobId(mdm, rpc, blob_name, bucket_id);
LocalDeleteBlobMetadata(mdm, blob_name, blob_id, bucket_id);

ReleaseBlobInfoPtr(mdm);
}

void LocalDestroyBlobById(SharedMemoryContext *context, RpcContext *rpc,
BlobID blob_id, BucketID bucket_id) {
if (!BlobIsInSwap(blob_id)) {
std::vector<BufferID> buffer_ids = GetBufferIdList(context, rpc, blob_id);
ReleaseBuffers(context, rpc, buffer_ids);
} else {
// TODO(chogan): Invalidate swap region once we have a SwapManager
}

FreeBufferIdList(context, rpc, blob_id);

std::string blob_name = LocalGetBlobNameFromId(context, blob_id);

if (blob_name.size() > 0) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
DeleteBlobId(mdm, rpc, blob_name, bucket_id);
LocalDestroyBlobByName(context, rpc, blob_name.c_str(), blob_id, bucket_id);
} else {
// TODO(chogan): @errorhandling
DLOG(INFO) << "Expected to find blob_id " << blob_id.as_int
<< " in Map but didn't" << std::endl;
}
Expand Down Expand Up @@ -1298,11 +1339,74 @@ f32 ScoringFunction(MetadataManager *mdm, Stats *stats) {
f32 recency_weight = 0.5;
f32 frequency_weight = 0.5;
// NOTE(chogan): A high relative_recency corresponds to a "cold" Blob.
f32 relative_recency = mdm->clock - stats->bits.recency;
f32 relative_recency = mdm->clock - stats->recency;
f32 result = (relative_recency * recency_weight -
stats->bits.frequency * frequency_weight);
stats->frequency * frequency_weight);

return result;
}

int LocalGetNumOutstandingFlushingTasks(SharedMemoryContext *context,
VBucketID id) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
VBucketInfo *info = LocalGetVBucketInfoById(mdm, id);
int result = 0;
if (info) {
result = info->async_flush_count;
}

return result;
}

int GetNumOutstandingFlushingTasks(SharedMemoryContext *context,
RpcContext *rpc, VBucketID id) {
u32 target_node = id.bits.node_id;
int result = 0;

if (target_node == rpc->node_id) {
result = LocalGetNumOutstandingFlushingTasks(context, id);
} else {
result = RpcCall<int>(rpc, target_node,
"RemoteGetNumOutstandingFlushingTasks", id);
}

return result;
}

void LocalLockBlob(SharedMemoryContext *context, BlobID blob_id) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
BlobInfo *blob_info = GetBlobInfoPtr(mdm, blob_id);
if (blob_info) {
BeginTicketMutex(&blob_info->lock);
}
ReleaseBlobInfoPtr(mdm);
}

void LocalUnlockBlob(SharedMemoryContext *context, BlobID blob_id) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
BlobInfo *blob_info = GetBlobInfoPtr(mdm, blob_id);
if (blob_info) {
EndTicketMutex(&blob_info->lock);
}
ReleaseBlobInfoPtr(mdm);
}

void LockBlob(SharedMemoryContext *context, RpcContext *rpc, BlobID blob_id) {
u32 target_node = GetBlobNodeId(blob_id);
if (target_node == rpc->node_id) {
LocalLockBlob(context, blob_id);
} else {
RpcCall<bool>(rpc, target_node, "RemoteLockBlob", blob_id);
}
}

void UnlockBlob(SharedMemoryContext *context, RpcContext *rpc, BlobID blob_id) {
u32 target_node = GetBlobNodeId(blob_id);
if (target_node == rpc->node_id) {
LocalUnlockBlob(context, blob_id);
} else {
RpcCall<bool>(rpc, target_node, "RemoteUnlockBlob", blob_id);
}
}

} // namespace hermes
Loading

0 comments on commit f05a326

Please sign in to comment.