Skip to content

Commit

Permalink
Merge pull request #158 from hariharan-devarajan/hariharan/vbucket_sh…
Browse files Browse the repository at this point in the history
…ared

Hariharan/vbucket shared
  • Loading branch information
ChristopherHogan committed Apr 16, 2021
2 parents 9a79e47 + fb55366 commit b9ac73e
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 51 deletions.
2 changes: 1 addition & 1 deletion adapter/src/hermes/adapter/stdio/mapper/balanced_mapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ MapperReturnType BalancedMapper::map(const FileStruct& file_op) {
: file_op.size_ - size_mapped;

file.size_ = hermes.size_;
hermes.blob_name_ = std::to_string(page_index);
hermes.blob_name_ = std::to_string(page_index + 1);
mapper_return.emplace_back(file, hermes);
size_mapped += hermes.size_;
}
Expand Down
7 changes: 3 additions & 4 deletions adapter/src/hermes/adapter/stdio/stdio.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ size_t write_internal(std::pair<AdapterStat, bool> &existing, const void *ptr,
<< std::endl;
for (const auto &item : mapping) {
hapi::Context ctx;
size_t pos = item.second.blob_name_.find(kStringDelimiter) + 1;
auto index = std::stol(item.second.blob_name_.substr(pos));
auto index = std::stol(item.second.blob_name_) - 1;
auto blob_exists =
existing.first.st_bkid->ContainsBlob(item.second.blob_name_);
unsigned char *put_data_ptr = (unsigned char *)ptr + data_offset;
Expand Down Expand Up @@ -556,7 +555,7 @@ int HERMES_DECL(fflush)(FILE *fp) {
auto offset_map = std::unordered_map<std::string, hermes::u64>();
for (const auto &blob_name : blob_names) {
file_vbucket.Link(blob_name, filename, ctx);
auto page_index = std::stol(blob_name);
auto page_index = std::stol(blob_name) - 1;
offset_map.emplace(blob_name, page_index * kPageSize);
}
auto trait = hermes::api::FileMappingTrait(filename, offset_map,
Expand Down Expand Up @@ -600,7 +599,7 @@ int HERMES_DECL(fclose)(FILE *fp) {
for (const auto &blob_name : blob_names) {
auto status = file_vbucket.Link(blob_name, filename, ctx);
if (!status.Failed()) {
auto page_index = std::stol(blob_name);
auto page_index = std::stol(blob_name) - 1;
offset_map.emplace(blob_name, page_index * kPageSize);
}
}
Expand Down
98 changes: 61 additions & 37 deletions src/api/vbucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ namespace hermes {

namespace api {

bool VBucket::IsValid() const {
return !IsNullVBucketId(id_);
}
bool VBucket::IsValid() const { return !IsNullVBucketId(id_); }

Status VBucket::Link(std::string blob_name, std::string bucket_name) {
Status result = Link(blob_name, bucket_name, ctx_);
Expand All @@ -44,7 +42,6 @@ Status VBucket::Link(std::string blob_name, std::string bucket_name,
bool blob_exists = hermes_->BucketContainsBlob(bucket_name, blob_name);
if (blob_exists) {
// inserting value by insert function
linked_blobs_.push_back(make_pair(bucket_name, blob_name));
TraitInput input;
input.bucket_name = bucket_name;
input.blob_name = blob_name;
Expand All @@ -54,6 +51,8 @@ Status VBucket::Link(std::string blob_name, std::string bucket_name,
// TODO(hari): @errorhandling Check if linking was successful
}
}
AttachBlobToVBucket(&hermes_->context_, &hermes_->rpc_, blob_name.data(),
bucket_name.data(), id_);
} else {
ret = BLOB_NOT_IN_BUCKET;
LOG(ERROR) << ret.Msg();
Expand All @@ -76,8 +75,14 @@ Status VBucket::Unlink(std::string blob_name, std::string bucket_name,
LOG(INFO) << "Unlinking blob " << blob_name << " in bucket " << bucket_name
<< " from VBucket " << name_ << '\n';
bool found = false;
for (auto ci = linked_blobs_.begin(); ci != linked_blobs_.end(); ++ci) {
if (ci->first == bucket_name && ci->second == blob_name) {
auto blob_ids =
GetBlobsFromVBucketInfo(&hermes_->context_, &hermes_->rpc_, id_);
auto bucket_id =
GetBucketId(&hermes_->context_, &hermes_->rpc_, bucket_name.c_str());
auto selected_blob_id = GetBlobId(&hermes_->context_, &hermes_->rpc_,
blob_name.c_str(), bucket_id);
for (hermes::BlobID& blob_id : blob_ids) {
if (selected_blob_id.as_int == blob_id.as_int) {
TraitInput input;
input.bucket_name = bucket_name;
input.blob_name = blob_name;
Expand All @@ -87,7 +92,8 @@ Status VBucket::Unlink(std::string blob_name, std::string bucket_name,
// TODO(hari): @errorhandling Check if unlinking was successful
}
}
linked_blobs_.erase(ci);
RemoveBlobFromVBucketInfo(&hermes_->context_, &hermes_->rpc_, id_,
blob_name.c_str(), bucket_name.c_str());
found = true;
break;
}
Expand All @@ -107,10 +113,14 @@ bool VBucket::ContainsBlob(std::string blob_name, std::string bucket_name) {
LOG(INFO) << "Checking if blob " << blob_name << " from bucket "
<< bucket_name << " is in this VBucket " << name_ << '\n';

for (auto ci = linked_blobs_.begin(); ci != linked_blobs_.end(); ++ci) {
bk_tmp = ci->first;
blob_tmp = ci->second;
if (bk_tmp == bucket_name && blob_tmp == blob_name) ret = true;
auto blob_ids =
GetBlobsFromVBucketInfo(&hermes_->context_, &hermes_->rpc_, id_);
auto bucket_id =
GetBucketId(&hermes_->context_, &hermes_->rpc_, bucket_name.c_str());
auto selected_blob_id = GetBlobId(&hermes_->context_, &hermes_->rpc_,
blob_name.c_str(), bucket_id);
for (const auto& blob_id : blob_ids) {
if (selected_blob_id.as_int == blob_id.as_int) ret = true;
}

return ret;
Expand All @@ -128,12 +138,14 @@ Blob& VBucket::GetBlob(std::string blob_name, std::string bucket_name) {
return local_blob;
}

template <class Predicate>
std::vector<std::string> VBucket::GetLinks(Predicate pred, Context& ctx) {
std::vector<BlobID> VBucket::GetLinks(Context& ctx) {
(void)ctx;
LOG(INFO) << "Getting subset of links satisfying pred in VBucket " << name_
<< '\n';

return std::vector<std::string>();
auto blob_ids =
GetBlobsFromVBucketInfo(&hermes_->context_, &hermes_->rpc_, id_);
// TODO(hari): add filtering
return blob_ids;
}

Status VBucket::Attach(Trait* trait) {
Expand All @@ -156,11 +168,17 @@ Status VBucket::Attach(Trait* trait, Context& ctx) {
}
}
if (!selected_trait) {
for (auto ci = linked_blobs_.begin(); ci != linked_blobs_.end(); ++ci) {
auto blob_ids =
GetBlobsFromVBucketInfo(&hermes_->context_, &hermes_->rpc_, id_);
for (const auto& blob_id : blob_ids) {
Trait* t = static_cast<Trait*>(trait);
TraitInput input;
input.bucket_name = ci->first;
input.blob_name = ci->second;
auto bucket_id =
GetBucketIdFromBlobId(&hermes_->context_, &hermes_->rpc_, blob_id);
input.bucket_name =
GetBucketNameById(&hermes_->context_, &hermes_->rpc_, bucket_id);
input.blob_name =
GetBlobNameFromId(&hermes_->context_, &hermes_->rpc_, blob_id);
if (t->onAttachFn != nullptr) {
t->onAttachFn(input, trait);
// TODO(hari): @errorhandling Check if attach was successful
Expand Down Expand Up @@ -201,11 +219,17 @@ Status VBucket::Detach(Trait* trait, Context& ctx) {
}
}
if (selected_trait) {
for (auto ci = linked_blobs_.begin(); ci != linked_blobs_.end(); ++ci) {
auto blob_ids =
GetBlobsFromVBucketInfo(&hermes_->context_, &hermes_->rpc_, id_);
for (const auto& blob_id : blob_ids) {
Trait* t = static_cast<Trait*>(trait);
TraitInput input;
input.bucket_name = ci->first;
input.blob_name = ci->second;
auto bucket_id =
GetBucketIdFromBlobId(&hermes_->context_, &hermes_->rpc_, blob_id);
input.bucket_name =
GetBucketNameById(&hermes_->context_, &hermes_->rpc_, bucket_id);
input.blob_name =
GetBlobNameFromId(&hermes_->context_, &hermes_->rpc_, blob_id);
if (t->onDetachFn != nullptr) {
t->onDetachFn(input, trait);
// TODO(hari): @errorhandling Check if detach was successful
Expand Down Expand Up @@ -264,12 +288,17 @@ Status VBucket::Delete(Context& ctx) {
}
}
}
for (auto ci = linked_blobs_.begin(); ci != linked_blobs_.end(); ++ci) {
auto blob_ids =
GetBlobsFromVBucketInfo(&hermes_->context_, &hermes_->rpc_, id_);
for (const auto& blob_id : blob_ids) {
TraitInput input;
auto bucket_id =
GetBucketIdFromBlobId(&hermes_->context_, &hermes_->rpc_, blob_id);
input.bucket_name =
GetBucketNameById(&hermes_->context_, &hermes_->rpc_, bucket_id);
input.blob_name =
GetBlobNameFromId(&hermes_->context_, &hermes_->rpc_, blob_id);
if (attached_traits_.size() > 0) {
TraitInput input;
input.bucket_name = ci->first;
input.blob_name = ci->second;

if (this->persist) {
if (t->type == TraitType::FILE_MAPPING) {
FileMappingTrait* fileBackedTrait = (FileMappingTrait*)t;
Expand All @@ -278,20 +307,13 @@ Status VBucket::Delete(Context& ctx) {
fileBackedTrait->flush_cb(input, fileBackedTrait);
} else {
if (!fileBackedTrait->offset_map.empty()) {
auto iter = fileBackedTrait->offset_map.find(ci->second);
auto iter = fileBackedTrait->offset_map.find(input.blob_name);
if (iter != fileBackedTrait->offset_map.end()) {
BucketID bucket_id =
GetBucketId(&hermes_->context_, &hermes_->rpc_,
ci->first.c_str());
auto blob_id =
GetBlobId(&hermes_->context_, &hermes_->rpc_, ci->second,
bucket_id);
// TODO(hari): @errorhandling check return of StdIoPersistBlob
ret = StdIoPersistBlob(&hermes_->context_, &hermes_->rpc_,
&hermes_->trans_arena_, blob_id, file,
iter->second);
if (!ret.Succeeded()) {
LOG(ERROR) << ret.Msg();
}
if (!ret.Succeeded()) LOG(ERROR) << ret.Msg();
} else {
ret = BLOB_NOT_LINKED_IN_MAP;
LOG(ERROR) << ret.Msg();
Expand All @@ -313,6 +335,9 @@ Status VBucket::Delete(Context& ctx) {
// TODO(hari): @errorhandling Check if unlinking was successful
}
}
RemoveBlobFromVBucketInfo(&hermes_->context_, &hermes_->rpc_, id_,
input.blob_name.c_str(),
input.bucket_name.c_str());
}
if (persist) {
if (file != nullptr) {
Expand All @@ -324,7 +349,6 @@ Status VBucket::Delete(Context& ctx) {
}
}
}
linked_blobs_.clear();
attached_traits_.clear();
return Status();
}
Expand Down
10 changes: 3 additions & 7 deletions src/api/vbucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class VBucket {
private:
std::string name_;
VBucketID id_;
std::list<std::pair<std::string, std::string>> linked_blobs_;
std::list<Trait *> attached_traits_;
Blob local_blob;
bool persist;
Expand All @@ -44,8 +43,7 @@ class VBucket {
VBucket(std::string initial_name, std::shared_ptr<Hermes> const &h,
bool persist, Context ctx = Context())
: name_(initial_name),
id_({0, 0}),
linked_blobs_(),
id_({{0, 0}}),
attached_traits_(),
local_blob(),
persist(persist),
Expand All @@ -66,7 +64,6 @@ class VBucket {

~VBucket() {
name_.clear();
linked_blobs_.clear();
}

bool IsValid() const;
Expand All @@ -88,10 +85,9 @@ class VBucket {
/** get a blob linked to this vbucket */
Blob &GetBlob(std::string blob_name, std::string bucket_name);

/** retrieves the subset of links satisfying pred */
/** retrieves the subset of blob links satisfying pred */
/** could return iterator */
template <class Predicate>
std::vector<std::string> GetLinks(Predicate pred, Context &ctx);
std::vector<BlobID> GetLinks(Context &ctx);

/** attach a trait to this vbucket */
Status Attach(Trait *trait, Context &ctx);
Expand Down
53 changes: 53 additions & 0 deletions src/metadata_management.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1200,4 +1200,57 @@ GetRemainingTargetCapacities(SharedMemoryContext *context, RpcContext *rpc,
return result;
}

void AttachBlobToVBucket(SharedMemoryContext *context, RpcContext *rpc,
const char *blob_name, const char *bucket_name,
VBucketID vbucket_id) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
BucketID bucket_id = GetBucketId(context, rpc, bucket_name);
BlobID blob_id = GetBlobId(context, rpc, blob_name, bucket_id);
AddBlobIdToVBucket(mdm, rpc, blob_id, vbucket_id);
}

std::string LocalGetBucketNameById(SharedMemoryContext *context,
BucketID blob_id) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
std::string bucket_name =
ReverseGetFromStorage(mdm, blob_id.as_int, kMapType_Bucket);
return bucket_name;
}

std::vector<BlobID> GetBlobsFromVBucketInfo(SharedMemoryContext *context,
RpcContext *rpc,
VBucketID vbucket_id) {
u32 target_node = vbucket_id.bits.node_id;
if (target_node == rpc->node_id) {
return LocalGetBlobsFromVBucketInfo(context, vbucket_id);
} else {
return RpcCall<std::vector<BlobID>>(
rpc, target_node, "RemoteGetBlobsFromVBucketInfo", vbucket_id);
}
}

void RemoveBlobFromVBucketInfo(SharedMemoryContext *context, RpcContext *rpc,
VBucketID vbucket_id, const char *blob_name,
const char *bucket_name) {
BucketID bucket_id = GetBucketId(context, rpc, bucket_name);
BlobID blob_id = GetBlobId(context, rpc, blob_name, bucket_id);
u32 target_node = vbucket_id.bits.node_id;
if (target_node == rpc->node_id) {
LocalRemoveBlobFromVBucketInfo(context, vbucket_id, blob_id);
} else {
RpcCall<bool>(rpc, target_node, "RemoteRemoveBlobFromVBucketInfo",
vbucket_id, blob_id);
}
}

std::string GetBucketNameById(SharedMemoryContext *context, RpcContext *rpc,
BucketID id) {
auto target_node = id.bits.node_id;
if (target_node == rpc->node_id) {
return LocalGetBucketNameById(context, id);
} else {
return RpcCall<std::string>(rpc, target_node, "RemoteGetBucketNameById",
id);
}
}
} // namespace hermes
28 changes: 27 additions & 1 deletion src/metadata_management.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ BlobID GetBlobId(SharedMemoryContext *context, RpcContext *rpc,
/**
*
*/
std::string GetBlobNameFromId(MetadataManager *mdm, RpcContext *rpc,
std::string GetBlobNameFromId(SharedMemoryContext *context, RpcContext *rpc,
BlobID blob_id);

/**
Expand Down Expand Up @@ -336,6 +336,32 @@ void LocalBeginGlobalTicketMutex(MetadataManager *mdm);
*/
void LocalEndGlobalTicketMutex(MetadataManager *mdm);

/**
*
*/
void AttachBlobToVBucket(SharedMemoryContext *context, RpcContext *rpc,
const char *blob_name, const char *bucket_name,
VBucketID vbucket_id);

/**
*
*/
void RemoveBlobFromVBucketInfo(SharedMemoryContext *context, RpcContext *rpc,
VBucketID vbucket_id, const char *blob_name,
const char *bucket_name);

/**
*
*/
std::vector<BlobID> GetBlobsFromVBucketInfo(SharedMemoryContext *context,
RpcContext *rpc,
VBucketID vbucket_id);

/**
*
*/
std::string GetBucketNameById(SharedMemoryContext *context, RpcContext *rpc,
BucketID id);
} // namespace hermes

#endif // HERMES_METADATA_MANAGEMENT_H_
7 changes: 7 additions & 0 deletions src/metadata_management_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,12 @@ VBucketID LocalGetOrCreateVBucketId(SharedMemoryContext *context,
u64 HexStringToU64(const std::string &s);
std::string MakeInternalBlobName(const std::string &name, BucketID id);

void LocalRemoveBlobFromVBucketInfo(SharedMemoryContext *context,
VBucketID vbucket_id, BlobID blob_id);
std::vector<BlobID> LocalGetBlobsFromVBucketInfo(SharedMemoryContext *context,
VBucketID vbucket_id);
std::string LocalGetBucketNameById(SharedMemoryContext *context,
BucketID blob_id);

} // namespace hermes
#endif // HERMES_METADATA_MANAGEMENT_INTERNAL_H_
Loading

0 comments on commit b9ac73e

Please sign in to comment.