Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
KEP-781: pbft service supports get and set state at checkpoint (#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
paularchard committed Nov 15, 2018
1 parent 52a0f0f commit 85c545c
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 35 deletions.
2 changes: 2 additions & 0 deletions include/bluzelle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ namespace bzn

using session_id = uint64_t;

using service_state_t = std::string;

using hash_t = std::string;

} // bzn
Expand Down
4 changes: 4 additions & 0 deletions mocks/mock_pbft_service_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ namespace bzn {
void(bzn::execute_handler_t handler));
MOCK_METHOD1(apply_operation,
void(const std::shared_ptr<pbft_operation>&));
MOCK_CONST_METHOD1(get_service_state,
bzn::service_state_t(uint64_t sequence_number));
MOCK_METHOD2(set_service_state,
bool(uint64_t sequence_number, const bzn::service_state_t& data));
};

} // namespace bzn
49 changes: 38 additions & 11 deletions pbft/database_pbft_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ database_pbft_service::process_awaiting_operations()

this->io_context->post(std::bind(this->execute_handler, nullptr)); // TODO: need to find the pbft_operation here; requires pbft_operation not being an in-memory construct

if (auto result = this->unstable_storage->remove(uuid, key); result != bzn::storage_base::result::ok)
if (auto result = this->unstable_storage->remove(this->uuid, key); result != bzn::storage_base::result::ok)
{
// these are fatal... something bad is going on.
throw std::runtime_error("Failed to remove pbft_request from database! (" + std::to_string(uint8_t(result)) + ")");
Expand All @@ -118,30 +118,57 @@ database_pbft_service::process_awaiting_operations()
}
}


void
database_pbft_service::query(const pbft_request& request, uint64_t sequence_number) const
bzn::hash_t
database_pbft_service::service_state_hash(uint64_t /*sequence_number*/) const
{
// TODO: not sure how this works...

LOG(info) << "Querying " << request.ShortDebugString()
<< " against ver " << std::min(sequence_number, this->next_request_sequence - 1);
return "";
}


bzn::hash_t
database_pbft_service::service_state_hash(uint64_t /*sequence_number*/) const
bzn::service_state_t
database_pbft_service::get_service_state(uint64_t sequence_number) const
{
// TODO: not sure how this works...
// retrieve database state at this sequence/checkpoint
/*
return this->crud->get_state(sequence_number);
*/

return "";
return std::string("state_") + std::to_string(sequence_number);
}

bool
database_pbft_service::set_service_state(uint64_t sequence_number, const bzn::service_state_t& /*data*/)
{
// initialize database state from checkpoint data
/*
if (!this->crud->set_state(sequence_number, data))
return false;
*/

// remove all backlogged requests prior to checkpoint
uint64_t seq = this->next_request_sequence;
while (seq <= sequence_number)
{
const key_t key{std::to_string(seq)};
this->unstable_storage->remove(uuid, key);
seq++;
}

this->next_request_sequence = seq;
this->process_awaiting_operations();
return true;
}

void
database_pbft_service::consolidate_log(uint64_t sequence_number)
{
LOG(info) << "TODO: consolidating log at sequence number " << sequence_number;

// tell the database to set a checkpoint
/*
this->crud->remember_state(sequence_number);
*/
}


Expand Down
12 changes: 7 additions & 5 deletions pbft/database_pbft_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@ namespace bzn

virtual ~database_pbft_service();

void apply_operation(const std::shared_ptr<bzn::pbft_operation>& op);
void apply_operation(const std::shared_ptr<bzn::pbft_operation>& op) override;

void query(const pbft_request& request, uint64_t sequence_number) const;
bzn::hash_t service_state_hash(uint64_t sequence_number) const override;

bzn::hash_t service_state_hash(uint64_t sequence_number) const;
bzn::service_state_t get_service_state(uint64_t sequence_number) const override;

void consolidate_log(uint64_t sequence_number);
bool set_service_state(uint64_t sequence_number, const bzn::service_state_t& data) override;

void register_execute_handler(bzn::execute_handler_t handler);
void consolidate_log(uint64_t sequence_number) override;

void register_execute_handler(bzn::execute_handler_t handler) override;

uint64_t applied_requests_count() const;

Expand Down
21 changes: 14 additions & 7 deletions pbft/dummy_pbft_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,6 @@ dummy_pbft_service::apply_operation(const std::shared_ptr<pbft_operation>& op)
}
}

void
dummy_pbft_service::query(const pbft_request& request, uint64_t sequence_number) const
{
LOG(info) << "Querying " << request.ShortDebugString()
<< " against ver " << std::min(sequence_number, this->next_request_sequence - 1);
}

void
dummy_pbft_service::consolidate_log(uint64_t sequence_number)
{
Expand All @@ -79,6 +72,20 @@ dummy_pbft_service::service_state_hash(uint64_t sequence_number) const
return "I don't actually have a database [" + std::to_string(sequence_number) + "]";
}

bzn::service_state_t
dummy_pbft_service::get_service_state(uint64_t sequence_number) const
{
return "I don't actually have a database [" + std::to_string(sequence_number) + "]";
}

bool
dummy_pbft_service::set_service_state(uint64_t /*sequence_number*/, const bzn::service_state_t& /*data*/)
{
return true;
}



void
dummy_pbft_service::send_execute_response(const std::shared_ptr<pbft_operation>& op)
{
Expand Down
3 changes: 2 additions & 1 deletion pbft/dummy_pbft_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ namespace bzn
public:
dummy_pbft_service(std::shared_ptr<bzn::asio::io_context_base> io_context);
void apply_operation(const std::shared_ptr<pbft_operation>& op) override;
void query(const pbft_request& request, uint64_t sequence_number) const override;
void consolidate_log(uint64_t sequence_number) override;
void register_execute_handler(execute_handler_t handler) override;
bzn::hash_t service_state_hash(uint64_t sequence_number) const override;
bzn::service_state_t get_service_state(uint64_t sequence_number) const override;
bool set_service_state(uint64_t sequence_number, const bzn::service_state_t& data) override;

uint64_t applied_requests_count();

Expand Down
10 changes: 6 additions & 4 deletions pbft/pbft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,7 @@ pbft::stabilize_checkpoint(const checkpoint_t& cp)
this->high_water_mark = std::max(this->high_water_mark,
cp.first + std::lround(HIGH_WATER_INTERVAL_IN_CHECKPOINTS * CHECKPOINT_INTERVAL));

this->service->consolidate_log(cp.first);
}

void
Expand Down Expand Up @@ -822,16 +823,17 @@ pbft::select_peer_for_checkpoint(const checkpoint_t& cp)
std::string
pbft::get_checkpoint_state(const checkpoint_t& cp) const
{
// TODO: call service to retrieve state at this checkpoint
return std::string("state_") + cp.second;
// call service to retrieve state at this checkpoint
return this->service->get_service_state(cp.first);
}

void
pbft::set_checkpoint_state(const checkpoint_t& /*cp*/, const std::string& /*data*/)
pbft::set_checkpoint_state(const checkpoint_t& cp, const std::string& data)
{
// TODO: set the service state at the given checkpoint sequence
// set the service state at the given checkpoint sequence
// the service is expected to load the state and discard any pending operations
// prior to the sequence number, then execute any subsequent operations sequentially
this->service->set_service_state(cp.first, data);
}

void
Expand Down
17 changes: 10 additions & 7 deletions pbft/pbft_service_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,22 @@ namespace bzn
*/
virtual void apply_operation(const std::shared_ptr<pbft_operation>& op) = 0;

/*
* Apply some read-only operation to the history of the service at some particular sequence number (either the
* sequence number is >= any the service has seen before because we want the most recent version, or we are
* querying some stable checkpoint to introduce a new node).
*/
virtual void query(const pbft_request& request, uint64_t sequence_number) const = 0;

/*
* Get the hash of the database state (presumably this will be a merkle tree root, but the details don't matter
* for now)- same semantics as query
*/
virtual bzn::hash_t service_state_hash(uint64_t sequence_number) const = 0;

/*
* Get the full database state at the given sequence number, if available
*/
virtual bzn::service_state_t get_service_state(uint64_t sequence_number) const = 0;

/*
* Set the full database state at the given sequence number
*/
virtual bool set_service_state(uint64_t sequence_number, const bzn::service_state_t& data) = 0;

/*
* A checkpoint has been stabilized, so we no longer need any history from before then.
*/
Expand Down
50 changes: 50 additions & 0 deletions pbft/test/database_pbft_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,53 @@ TEST(database_pbft_service, test_that_stored_operation_is_executed_in_order_and_

ASSERT_EQ(uint64_t(3), dps.applied_requests_count());
}

namespace test
{
void do_operation(uint64_t seq, bzn::database_pbft_service &dps)
{
pbft_request msg;
msg.mutable_operation()->mutable_header()->set_db_uuid(TEST_UUID);
msg.mutable_operation()->mutable_header()->set_transaction_id(uint64_t(seq));
msg.mutable_operation()->mutable_create()->set_key("key" + std::to_string(seq));
msg.mutable_operation()->mutable_create()->set_value("value" + std::to_string(seq));

auto operation = std::make_shared<bzn::pbft_operation>(0, seq, "somehash" + std::to_string(seq), nullptr);
operation->record_request(msg.SerializeAsString());
dps.apply_operation(operation);
}

uint64_t database_msg_seq(const database_msg& msg)
{
return msg.header().transaction_id();
}
}

TEST(database_pbft_service, test_that_set_state_catches_up_backlogged_operations)
{
auto mem_storage = std::make_shared<bzn::mem_storage>();
auto mock_io_context = std::make_shared<bzn::asio::Mockio_context_base>();
auto mock_crud = std::make_shared<bzn::Mockcrud_base>();

bzn::database_pbft_service dps(mock_io_context, mem_storage, mock_crud, TEST_UUID);

test::do_operation(99, dps);
test::do_operation(100, dps);
test::do_operation(101, dps);
test::do_operation(102, dps);
ASSERT_EQ(uint64_t(0), dps.applied_requests_count());

// only the last two operations should be applied after we set the state @ 100
EXPECT_CALL(*mock_crud, handle_request(ResultOf(test::database_msg_seq, 101), _))
.Times(Exactly(1));
EXPECT_CALL(*mock_crud, handle_request(ResultOf(test::database_msg_seq, 102), _))
.Times(Exactly(1));
EXPECT_CALL(*mock_io_context, post(_))
.Times(Exactly(2));

// push state for checkpoint at sequence 100
dps.set_service_state(100, "state_at_sequence_100");

// operations applied should be caught up now
ASSERT_EQ(uint64_t(102), dps.applied_requests_count());
}

0 comments on commit 85c545c

Please sign in to comment.