Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
KEP-582 Add & remove writer API
Browse files Browse the repository at this point in the history
  • Loading branch information
ebruck committed Dec 5, 2018
1 parent bf95b9c commit 3d46ec1
Show file tree
Hide file tree
Showing 5 changed files with 396 additions and 23 deletions.
213 changes: 194 additions & 19 deletions crud/crud.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,21 @@ crud::crud(std::shared_ptr<bzn::storage_base> storage, std::shared_ptr<bzn::subs
: storage(std::move(storage))
, subscription_manager(std::move(subscription_manager))
, message_handlers{
{database_msg::kCreate, std::bind(&crud::handle_create, this, _1, _2, _3)},
{database_msg::kRead, std::bind(&crud::handle_read, this, _1, _2, _3)},
{database_msg::kUpdate, std::bind(&crud::handle_update, this, _1, _2, _3)},
{database_msg::kDelete, std::bind(&crud::handle_delete, this, _1, _2, _3)},
{database_msg::kHas, std::bind(&crud::handle_has, this, _1, _2, _3)},
{database_msg::kKeys, std::bind(&crud::handle_keys, this, _1, _2, _3)},
{database_msg::kSize, std::bind(&crud::handle_size, this, _1, _2, _3)},
{database_msg::kSubscribe, std::bind(&crud::handle_subscribe, this, _1, _2, _3)},
{database_msg::kUnsubscribe, std::bind(&crud::handle_unsubscribe, this, _1, _2, _3)},
{database_msg::kCreateDb, std::bind(&crud::handle_create_db, this, _1, _2, _3)},
{database_msg::kDeleteDb, std::bind(&crud::handle_delete_db, this, _1, _2, _3)},
{database_msg::kHasDb, std::bind(&crud::handle_has_db, this, _1, _2, _3)}}
{database_msg::kCreate, std::bind(&crud::handle_create, this, _1, _2, _3)},
{database_msg::kRead, std::bind(&crud::handle_read, this, _1, _2, _3)},
{database_msg::kUpdate, std::bind(&crud::handle_update, this, _1, _2, _3)},
{database_msg::kDelete, std::bind(&crud::handle_delete, this, _1, _2, _3)},
{database_msg::kHas, std::bind(&crud::handle_has, this, _1, _2, _3)},
{database_msg::kKeys, std::bind(&crud::handle_keys, this, _1, _2, _3)},
{database_msg::kSize, std::bind(&crud::handle_size, this, _1, _2, _3)},
{database_msg::kSubscribe, std::bind(&crud::handle_subscribe, this, _1, _2, _3)},
{database_msg::kUnsubscribe, std::bind(&crud::handle_unsubscribe, this, _1, _2, _3)},
{database_msg::kCreateDb, std::bind(&crud::handle_create_db, this, _1, _2, _3)},
{database_msg::kDeleteDb, std::bind(&crud::handle_delete_db, this, _1, _2, _3)},
{database_msg::kHasDb, std::bind(&crud::handle_has_db, this, _1, _2, _3)},
{database_msg::kWriters, std::bind(&crud::handle_writers, this, _1, _2, _3)},
{database_msg::kAddWriters, std::bind(&crud::handle_add_writers, this, _1, _2, _3)},
{database_msg::kRemoveWriters, std::bind(&crud::handle_remove_writers, this, _1, _2, _3)}}
{
}

Expand Down Expand Up @@ -362,7 +365,7 @@ crud::handle_delete_db(const bzn::caller_id_t& caller_id, const database_msg& re

if (db_exists)
{
if (!is_caller_owner(caller_id, perms))
if (!this->is_caller_owner(caller_id, perms))
{
result = bzn::storage_result::access_denied;
}
Expand Down Expand Up @@ -404,6 +407,122 @@ crud::handle_has_db(const bzn::caller_id_t& /*caller_id*/, const database_msg& r
}


void
crud::handle_writers(const bzn::caller_id_t& /*caller_id*/, const database_msg& request, std::shared_ptr<bzn::session_base> session)
{
if (session)
{
bzn::storage_result result{bzn::storage_result::not_found};

std::shared_lock<std::shared_mutex> lock(this->lock); // lock for read access

const auto [db_exists, perms] = this->get_database_permissions(request.header().db_uuid());

if (db_exists)
{
database_response resp;

resp.mutable_writers()->set_owner(perms[OWNER_KEY].asString());

for(const auto& writer : perms[WRITERS_KEY])
{
resp.mutable_writers()->add_writers(writer.asString());
}

this->send_response(request, bzn::storage_result::ok, std::move(resp), session);

return;
}
else
{
this->send_response(request, result, database_response(), session);
}

return;
}

LOG(warning) << "session no longer available. WRITERS not executed.";
}


void
crud::handle_add_writers(const bzn::caller_id_t& caller_id, const database_msg& request, std::shared_ptr<bzn::session_base> session)
{
bzn::storage_result result{bzn::storage_result::db_not_found};

std::lock_guard<std::shared_mutex> lock(this->lock); // lock for write access

auto [db_exists, perms] = this->get_database_permissions(request.header().db_uuid());

if (db_exists)
{
if (!this->is_caller_owner(caller_id, perms))
{
result = bzn::storage_result::access_denied;
}
else
{
this->add_writers(request, perms);

LOG(debug) << "updating db perms: " << perms.toStyledString().substr(0, MAX_MESSAGE_SIZE) << "...";

if (result = this->storage->update(PERMISSION_UUID, request.header().db_uuid(), perms.toStyledString()); result != bzn::storage_result::ok)
{
throw std::runtime_error("Failed to update database permissions: " + bzn::storage_result_msg.at(result));
}
}
}

if (session)
{
this->send_response(request, result, database_response(), session);

return;
}

LOG(warning) << "session no longer available. ADD_WRITERS response not sent,";
}


void
crud::handle_remove_writers(const bzn::caller_id_t& caller_id, const database_msg& request, std::shared_ptr<bzn::session_base> session)
{
bzn::storage_result result{bzn::storage_result::db_not_found};

std::lock_guard<std::shared_mutex> lock(this->lock); // lock for write access

auto [db_exists, perms] = this->get_database_permissions(request.header().db_uuid());

if (db_exists)
{
if (!this->is_caller_owner(caller_id, perms))
{
result = bzn::storage_result::access_denied;
}
else
{
this->remove_writers(request, perms);

LOG(debug) << "updating db perms: " << perms.toStyledString().substr(0, MAX_MESSAGE_SIZE) << "...";

if (result = this->storage->update(PERMISSION_UUID, request.header().db_uuid(), perms.toStyledString()); result != bzn::storage_result::ok)
{
throw std::runtime_error("Failed to update database permissions: " + bzn::storage_result_msg.at(result));
}
}
}

if (session)
{
this->send_response(request, result, database_response(), session);

return;
}

LOG(warning) << "session no longer available. REMOVE_WRITERS response not sent,";
}


std::pair<bool, Json::Value>
crud::get_database_permissions(const bzn::uuid_t& uuid) const
{
Expand Down Expand Up @@ -438,7 +557,7 @@ crud::create_permission_data(const bzn::caller_id_t& caller_id) const
Json::Value json;

json[OWNER_KEY] = caller_id;
json[WRITERS_KEY] = Json::Value();
json[WRITERS_KEY] = Json::Value(Json::arrayValue);

LOG(debug) << "created db perms: " << json.toStyledString();

Expand All @@ -447,24 +566,78 @@ crud::create_permission_data(const bzn::caller_id_t& caller_id) const


bool
crud::is_caller_owner(const bzn::caller_id_t& caller_id, const Json::Value& json) const
crud::is_caller_owner(const bzn::caller_id_t& caller_id, const Json::Value& perms) const
{
return json[OWNER_KEY] == caller_id;
return perms[OWNER_KEY] == caller_id;
}


bool
crud::is_caller_a_writer(const bzn::caller_id_t& caller_id, const Json::Value& json) const
crud::is_caller_a_writer(const bzn::caller_id_t& caller_id, const Json::Value& perms) const
{
for(const auto& writer_id : json[WRITERS_KEY])
for(const auto& writer_id : perms[WRITERS_KEY])
{
if (writer_id == caller_id)
{
return true;
}
}

return this->is_caller_owner(caller_id, json);
return this->is_caller_owner(caller_id, perms);
}


void
crud::add_writers(const database_msg& request, Json::Value& perms)
{
const std::string owner = perms[OWNER_KEY].asString();

std::set<std::string> current_writers;

for (auto& writer : perms[WRITERS_KEY])
{
current_writers.emplace(writer.asString());
}

for (const auto& writer : request.add_writers().writers())
{
// owner never should be in the writers list...
if (writer != owner)
{
current_writers.insert(writer);
}
}

perms[WRITERS_KEY].clear();

for (auto&& writer : current_writers)
{
perms[WRITERS_KEY].append(std::move(writer));
}
}


void
crud::remove_writers(const database_msg& request, Json::Value& perms)
{
std::set<std::string> current_writers;

for (auto& writer : perms[WRITERS_KEY])
{
current_writers.emplace(writer.asString());
}

for (const auto& writer : request.remove_writers().writers())
{
current_writers.erase(writer);
}

perms[WRITERS_KEY].clear();

for (auto&& writer : current_writers)
{
perms[WRITERS_KEY].append(std::move(writer));
}
}


Expand All @@ -480,6 +653,8 @@ crud::save_state()
std::shared_ptr<std::string>
crud::get_saved_state()
{
std::shared_lock<std::shared_mutex> lock(this->lock); // lock for read access

return this->storage->get_snapshot();
}

Expand Down
14 changes: 12 additions & 2 deletions crud/crud.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ namespace bzn

void handle_unsubscribe(const bzn::caller_id_t& caller_id, const database_msg& request, std::shared_ptr<bzn::session_base> session);

void handle_writers(const bzn::caller_id_t& caller_id, const database_msg& request, std::shared_ptr<bzn::session_base> session);

void handle_add_writers(const bzn::caller_id_t& caller_id, const database_msg& request, std::shared_ptr<bzn::session_base> session);

void handle_remove_writers(const bzn::caller_id_t& caller_id, const database_msg& request, std::shared_ptr<bzn::session_base> session);

void send_response(const database_msg& request, bzn::storage_result result, database_response&& response,
std::shared_ptr<bzn::session_base>& session);

Expand All @@ -73,9 +79,13 @@ namespace bzn

bzn::value_t create_permission_data(const bzn::caller_id_t& caller_id) const;

bool is_caller_owner(const bzn::caller_id_t& caller_id, const Json::Value& json) const;
bool is_caller_owner(const bzn::caller_id_t& caller_id, const Json::Value& perms) const;

bool is_caller_a_writer(const bzn::caller_id_t& caller_id, const Json::Value& perms) const;

void add_writers(const database_msg& request, Json::Value& perms);

bool is_caller_a_writer(const bzn::caller_id_t& caller_id, const Json::Value& json) const;
void remove_writers(const database_msg& request, Json::Value& perms);

std::shared_ptr<bzn::storage_base> storage;
std::shared_ptr<bzn::subscription_manager_base> subscription_manager;
Expand Down

0 comments on commit 3d46ec1

Please sign in to comment.