Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
KEP-485, KEP-612 Finalize protobuf response changes and update daemon…
Browse files Browse the repository at this point in the history
…, crud script etc. to support it
  • Loading branch information
ebruck committed Sep 10, 2018
1 parent b78f4ff commit 0b29dc6
Show file tree
Hide file tree
Showing 21 changed files with 464 additions and 222 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ header {
------------------------------------------------------------
```
### Subscribe
#### Subscribe
```text
$ ./crud -n localhost:50000 subscribe -u myuuid -k mykey
Sending:
Expand Down
39 changes: 23 additions & 16 deletions crud/crud.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ crud::do_raft_task_routing(const bzn::message& msg, const database_msg& request,
}
else
{
response.mutable_resp()->set_error(bzn::MSG_INVALID_RAFT_STATE);

response.mutable_error()->set_message(bzn::MSG_INVALID_RAFT_STATE);
}
}
break;
Expand All @@ -128,13 +129,14 @@ crud::handle_create(const bzn::message& msg, const database_msg& request, databa
{
if (this->validate_value_size(request.create().value().size()))
{
response.mutable_resp()->set_error(bzn::MSG_VALUE_SIZE_TOO_LARGE);
response.mutable_error()->set_message(bzn::MSG_VALUE_SIZE_TOO_LARGE);
return;
}

if (this->storage->has(request.header().db_uuid(), request.create().key()))
{
response.mutable_resp()->set_error(bzn::MSG_RECORD_EXISTS);

response.mutable_error()->set_message(bzn::MSG_RECORD_EXISTS);
return;
}

Expand All @@ -151,13 +153,14 @@ crud::handle_read(const bzn::message& /*msg*/, const database_msg& request, data
{
if (auto record = this->storage->read(request.header().db_uuid(), request.read().key()); record)
{
response.mutable_resp()->set_value(record->value);
response.mutable_read()->set_key(request.read().key());
response.mutable_read()->set_value(record->value);
return;
}

if (this->raft->get_state() == bzn::raft_state::leader)
{
response.mutable_resp()->set_error(bzn::MSG_RECORD_NOT_FOUND);
response.mutable_error()->set_message(bzn::MSG_RECORD_NOT_FOUND);
return;
}

Expand All @@ -170,13 +173,13 @@ crud::handle_update(const bzn::message& msg, const database_msg& request, databa
{
if (this->validate_value_size(request.update().value().size()))
{
response.mutable_resp()->set_error(bzn::MSG_VALUE_SIZE_TOO_LARGE);
response.mutable_error()->set_message(bzn::MSG_VALUE_SIZE_TOO_LARGE);
return;
}

if (!this->storage->has(request.header().db_uuid(), request.update().key()))
{
response.mutable_resp()->set_error(bzn::MSG_RECORD_NOT_FOUND);
response.mutable_error()->set_message(bzn::MSG_RECORD_NOT_FOUND);
return;
}

Expand All @@ -196,6 +199,8 @@ crud::handle_delete(const bzn::message& msg, const database_msg& request, databa
if (this->raft->get_state() != bzn::raft_state::leader)
{
this->set_leader_info(response);

// copy original request...
return;
}

Expand All @@ -205,7 +210,7 @@ crud::handle_delete(const bzn::message& msg, const database_msg& request, databa
return;
}

response.mutable_resp()->set_error(bzn::MSG_RECORD_NOT_FOUND);
response.mutable_error()->set_message(bzn::MSG_RECORD_NOT_FOUND);
}


Expand All @@ -217,28 +222,30 @@ crud::handle_get_keys(const bzn::message& /*msg*/, const database_msg& request,
if (keys.empty())
{
// ensure we at least create the empty response...
response.mutable_resp();
response.mutable_keys();
return;
}

for (const auto& key : keys)
{
response.mutable_resp()->add_keys(key);
response.mutable_keys()->add_keys(key);
}
}


void
crud::handle_has(const bzn::message& /*msg*/, const database_msg& request, database_response& response)
{
response.mutable_resp()->set_has(this->storage->has(request.header().db_uuid(), request.has().key()));
response.mutable_has()->set_key(request.has().key());
response.mutable_has()->set_has(this->storage->has(request.header().db_uuid(), request.has().key()));
}


void
crud::handle_size(const bzn::message& /*msg*/, const database_msg& request, database_response& response)
{
response.mutable_resp()->set_size(this->storage->get_size(request.header().db_uuid()));
response.mutable_size()->set_keys(0); // todo: add number of keys
response.mutable_size()->set_bytes(this->storage->get_size(request.header().db_uuid()));
}


Expand Down Expand Up @@ -290,23 +297,23 @@ crud::handle_ws_crud_messages(const bzn::message& ws_msg, std::shared_ptr<bzn::s
if (!ws_msg.isMember("msg"))
{
LOG(error) << "Invalid message: " << ws_msg.toStyledString().substr(0,MAX_MESSAGE_SIZE) << "...";
response.mutable_resp()->set_error(bzn::MSG_INVALID_CRUD_COMMAND);
response.mutable_error()->set_message(bzn::MSG_INVALID_CRUD_COMMAND);
session->send_message(std::make_shared<std::string>(response.SerializeAsString()), true);
return;
}

if (!msg.ParseFromString(boost::beast::detail::base64_decode(ws_msg["msg"].asString())))
{
LOG(error) << "Failed to decode message: " << ws_msg.toStyledString().substr(0,MAX_MESSAGE_SIZE) << "...";
response.mutable_resp()->set_error(bzn::MSG_INVALID_CRUD_COMMAND);
response.mutable_error()->set_message(bzn::MSG_INVALID_CRUD_COMMAND);
session->send_message(std::make_shared<std::string>(response.SerializeAsString()), true);
return;
}

if (msg.msg_case() != msg.kDb)
{
LOG(error) << "Invalid message type: " << msg.msg_case();
response.mutable_resp()->set_error(bzn::MSG_INVALID_ARGUMENTS);
response.mutable_error()->set_message(bzn::MSG_INVALID_ARGUMENTS);
session->send_message(std::make_shared<std::string>(response.SerializeAsString()), true);
return;
}
Expand All @@ -322,7 +329,7 @@ crud::handle_ws_crud_messages(const bzn::message& ws_msg, std::shared_ptr<bzn::s
void
crud::do_candidate_tasks(const bzn::message& /*msg*/, const database_msg& /*request*/, database_response& response)
{
response.mutable_resp()->set_error(bzn::MSG_ELECTION_IN_PROGRESS);
response.mutable_error()->set_message(bzn::MSG_ELECTION_IN_PROGRESS);
}


Expand Down

0 comments on commit 0b29dc6

Please sign in to comment.