Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
KEP-838: pbft_database_service passes the pbft_operation ptr back to
Browse files Browse the repository at this point in the history
pbft
  • Loading branch information
isabelsavannah committed Nov 27, 2018
1 parent e384b07 commit ef2a5dc
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 12 deletions.
16 changes: 12 additions & 4 deletions pbft/database_pbft_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ database_pbft_service::apply_operation(const std::shared_ptr<bzn::pbft_operation
}

// store requester session for eventual response...
this->sessions_awaiting_response[op->sequence] = op->session();
this->operations_awaiting_result[op->sequence] = op;

this->process_awaiting_operations();
}
Expand Down Expand Up @@ -91,20 +91,28 @@ database_pbft_service::process_awaiting_operations()

LOG(info) << "Executing request " << request.DebugString() << "..., sequence: " << key;

if (auto session_it = this->sessions_awaiting_response.find(this->next_request_sequence); session_it != this->sessions_awaiting_response.end())
auto op_it = this->operations_awaiting_result.find(this->next_request_sequence);

if (op_it != this->operations_awaiting_result.end() && op_it->second->has_session())
{

// session found, but is the connection still around?
auto session = session_it->second.lock();
auto session = op_it->second->session().lock();

LOG(info) << "We do not have a session for the result of this request";
this->crud->handle_request(request, (session) ? session : nullptr);
}
else
{
// session not found then this was probably loaded from the database...
LOG(info) << "We do not have a session for the result of this request";
this->crud->handle_request(request, nullptr);
}

this->io_context->post(std::bind(this->execute_handler, nullptr)); // TODO: need to find the pbft_operation here; requires pbft_operation not being an in-memory construct
if (op_it != this->operations_awaiting_result.end())
{
this->io_context->post(std::bind(this->execute_handler, (*op_it).second));
}

if (auto result = this->unstable_storage->remove(this->uuid, key); result != bzn::storage_base::result::ok)
{
Expand Down
2 changes: 1 addition & 1 deletion pbft/database_pbft_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ namespace bzn
uint64_t next_request_sequence = 1;
const bzn::uuid_t uuid;

std::unordered_map<uint64_t, std::weak_ptr<bzn::session_base>> sessions_awaiting_response;
std::unordered_map<uint64_t, std::shared_ptr<bzn::pbft_operation>> operations_awaiting_result;

bzn::execute_handler_t execute_handler;

Expand Down
7 changes: 0 additions & 7 deletions pbft/pbft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,6 @@ pbft::start()
[weak_this = this->weak_from_this(), fd = this->failure_detector]
(std::shared_ptr<pbft_operation> op)
{
if (!op)
{
// TODO: Get real pbft_operation pointers from pbft_service
LOG(error) << "Ignoring null operation pointer recieved from pbft_service";
return;
}

fd->request_executed(op->request_hash);

if (op->sequence % CHECKPOINT_INTERVAL == 0)
Expand Down
36 changes: 36 additions & 0 deletions pbft/test/database_pbft_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,42 @@ TEST(database_pbft_service, test_that_failed_storing_of_operation_throws)
EXPECT_THROW(dps.apply_operation(operation), std::runtime_error);
}

TEST(database_pbft_service, test_that_executed_operation_fires_callback_with_operation)
{
auto mem_storage = std::make_shared<bzn::mem_storage>();
auto mock_io_context = std::make_shared<bzn::asio::Mockio_context_base>();
auto mock_crud = std::make_shared<NiceMock<bzn::Mockcrud_base>>();

EXPECT_CALL(*mock_io_context, post(_)).WillOnce(InvokeArgument<0>());

bzn::database_pbft_service dps(mock_io_context, mem_storage, mock_crud, TEST_UUID);

auto operation = std::make_shared<bzn::pbft_operation>(0, 1, "somehash", nullptr);
bool execute_handler_called_with_operation = false;

database_msg msg;
msg.mutable_header()->set_db_uuid(TEST_UUID);
msg.mutable_header()->set_transaction_id(uint64_t(123));
msg.mutable_create()->set_key("key2");
msg.mutable_create()->set_value("value2");

bzn_envelope env;
env.set_database_msg(msg.SerializeAsString());

operation->record_request(env);

dps.register_execute_handler(
[&](const auto& operation_ptr)
{
execute_handler_called_with_operation = operation_ptr->request_hash == "somehash";
});

dps.apply_operation(operation);

EXPECT_TRUE(execute_handler_called_with_operation);

}


TEST(database_pbft_service, test_that_stored_operation_is_executed_in_order_and_registered_handler_is_scheduled)
{
Expand Down

0 comments on commit ef2a5dc

Please sign in to comment.