Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
KEP-1012 pbft_operation_manager existential queries
Browse files Browse the repository at this point in the history
  • Loading branch information
paularchard committed Jan 31, 2019
1 parent 5475f3d commit f9470c4
Show file tree
Hide file tree
Showing 12 changed files with 547 additions and 55 deletions.
40 changes: 24 additions & 16 deletions mocks/mock_storage_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,43 @@ namespace bzn {
class Mockstorage_base : public storage_base {
public:
MOCK_METHOD3(create,
bzn::storage_result(const bzn::uuid_t& uuid, const std::string& key, const std::string& value));
bzn::storage_result(const bzn::uuid_t& uuid, const std::string& key, const std::string& value));
MOCK_METHOD2(read,
std::optional<bzn::value_t> (const bzn::uuid_t& uuid, const std::string& key));
std::optional<bzn::value_t> (const bzn::uuid_t& uuid, const std::string& key));
MOCK_METHOD3(update,
bzn::storage_result(const bzn::uuid_t& uuid, const std::string& key, const std::string& value));
bzn::storage_result(const bzn::uuid_t& uuid, const std::string& key, const std::string& value));
MOCK_METHOD2(remove,
bzn::storage_result(const bzn::uuid_t& uuid, const std::string& key));
bzn::storage_result(const bzn::uuid_t& uuid, const std::string& key));
MOCK_METHOD0(start,
bzn::storage_result());
bzn::storage_result());
MOCK_METHOD1(save,
bzn::storage_result(const std::string& path));
bzn::storage_result(const std::string& path));
MOCK_METHOD1(load,
bzn::storage_result(const std::string& path));
bzn::storage_result(const std::string& path));
MOCK_METHOD1(error_msg,
std::string(bzn::storage_result error_id));
std::string(bzn::storage_result error_id));
MOCK_METHOD1(get_keys,
std::vector<std::string>(const bzn::uuid_t& uuid));
std::vector<std::string>(const bzn::uuid_t& uuid));
MOCK_METHOD2(has,
bool(const bzn::uuid_t& uuid, const std::string& key));
bool(const bzn::uuid_t& uuid, const std::string& key));
MOCK_METHOD1(get_size,
std::pair<std::size_t, std::size_t>(const bzn::uuid_t& uuid));
std::pair<std::size_t, std::size_t>(const bzn::uuid_t& uuid));
MOCK_METHOD1(remove,
bzn::storage_result(const bzn::uuid_t& uuid));
bzn::storage_result(const bzn::uuid_t& uuid));
MOCK_METHOD0(create_snapshot,
bool());
bool());
MOCK_METHOD0(get_snapshot,
std::shared_ptr<std::string>());
std::shared_ptr<std::string>());
MOCK_METHOD1(load_snapshot,
bool(const std::string&));
};
bool(const std::string&));
MOCK_METHOD3(remove_range,
void(const bzn::uuid_t& uuid, const std::string&, const std::string&));
MOCK_METHOD4(get_keys_if,
std::vector<bzn::key_t>(const bzn::uuid_t&, const std::string&
, const std::string&, std::optional<std::function<bool(const bzn::key_t&, const bzn::value_t&)>>));
MOCK_METHOD4(read_if,
std::vector<std::pair<bzn::key_t, bzn::value_t>>(const bzn::uuid_t&, const std::string&
, const std::string&, std::optional<std::function<bool(const bzn::key_t&, const bzn::value_t&)>>));
};

} // namespace bzn
33 changes: 19 additions & 14 deletions pbft/operations/pbft_operation_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ pbft_operation_manager::delete_operations_until(uint64_t sequence)

if (this->storage)
{
LOG(warning) << "cleaning up operation state from storage not implemented (KEP-909)";
LOG(debug) << "cleaning up operation state from storage";
pbft_persistent_operation::remove_range(this->storage.value(), 0, sequence);
}
}

Expand All @@ -108,26 +109,30 @@ pbft_operation_manager::prepared_operations_since(uint64_t sequence)
std::map<uint64_t, std::shared_ptr<pbft_operation>> result;
const auto maybe_store = [&](const std::shared_ptr<pbft_operation>& op)
{
if (op->get_sequence() > sequence && op->is_prepared())
const auto search = result.find(op->get_sequence());
if (search == result.end() || result[op->get_sequence()]->get_view() < op->get_view())
{
const auto search = result.find(op->get_sequence());
if (search == result.end() || result[op->get_sequence()]->get_view() < op->get_view())
{
result[op->get_sequence()] = op;
}
result[op->get_sequence()] = op;
}
};

for (const auto& pair : this->held_operations)
if (this->storage)
{
// This is an inefficient search, but we can fix it if it matters
maybe_store(pair.second);
for (const auto& op : pbft_persistent_operation::prepared_operations_in_range(*this->storage, sequence + 1))
{
maybe_store(op);
}
}

// Now, add anything we can find from storage
if (this->storage)
else
{
LOG(warning) << "finding prepared operations from storage not implemented (KEP-908)";
for (const auto& pair : this->held_operations)
{
if (pair.second->get_sequence() > sequence && pair.second->is_prepared())
{
// This is an inefficient search, but we can fix it if it matters
maybe_store(pair.second);
}
}
}

return result;
Expand Down
153 changes: 133 additions & 20 deletions pbft/operations/pbft_persistent_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
#include <boost/format.hpp>
#include <include/bluzelle.hpp>
#include <pbft/pbft.hpp>
#include <regex>
#include <limits>

using namespace bzn;

namespace {
const std::string STAGE_KEY = "stage";
const std::string REQUEST_KEY = "request";
const std::string OPERATIONS_UUID = "pbft_operations_data";
}

std::string
Expand All @@ -32,13 +35,50 @@ pbft_persistent_operation::generate_prefix(uint64_t view, uint64_t sequence, con
return (boost::format("%020u_%s_%020u") % sequence % request_hash % view).str();
}

std::string
pbft_persistent_operation::generate_key(const std::string& prefix, const std::string& key)
{
// Integers formatted to 20 digits, which is the maximum length of a 64 bit uint- they need to have constant length
// to be sorted correctly for prefix searches and the like
return prefix + "_" + key;
}

std::string
pbft_persistent_operation::prefix_for_sequence(uint64_t sequence)
{
return (boost::format("%020u_") % sequence).str();
}

bool
pbft_persistent_operation::parse_prefix(const std::string& prefix, uint64_t& view, uint64_t& sequence, bzn::hash_t& hash)
{
auto hash_start = prefix.find_first_of('_');
auto hash_end = prefix.find_last_of('_');
if (hash_end >= (prefix.size() - 1) || hash_start >= hash_end)
{
return false;
}

sequence = std::stoul(prefix.substr(0, hash_start));
view = std::stoul(prefix.substr(hash_end + 1));
hash = prefix.substr(hash_start + 1, hash_end - hash_start - 1);
return true;
}

const std::string&
pbft_persistent_operation::get_uuid()
{
return OPERATIONS_UUID;
}

pbft_persistent_operation::pbft_persistent_operation(uint64_t view, uint64_t sequence, const bzn::hash_t& request_hash, std::shared_ptr<bzn::storage_base> storage, size_t peers_size)
: pbft_operation(view, sequence, request_hash)
, peers_size(peers_size)
, storage(std::move(storage))
, prefix(pbft_persistent_operation::generate_prefix(view, sequence, request_hash))
{
const auto response = this->storage->create(prefix, STAGE_KEY, std::to_string(static_cast<unsigned int>(pbft_operation_stage::prepare)));
const auto response = this->storage->create(get_uuid(), generate_key(this->prefix, STAGE_KEY)
, std::to_string(static_cast<unsigned int>(pbft_operation_stage::prepare)));
switch (response)
{
case storage_result::ok:
Expand All @@ -52,6 +92,15 @@ pbft_persistent_operation::pbft_persistent_operation(uint64_t view, uint64_t seq
}
}

pbft_persistent_operation::pbft_persistent_operation(std::shared_ptr<bzn::storage_base> storage, uint64_t view, uint64_t sequence, const bzn::hash_t& request_hash)
: pbft_operation(view, sequence, request_hash)
, peers_size(1) // TODO: move peers_size out of operation
, storage(std::move(storage))
, prefix(pbft_persistent_operation::generate_prefix(view, sequence, request_hash))
{
// constructs operation already in storage without adding to storage
}

void
pbft_persistent_operation::record_pbft_msg(const pbft_msg& msg, const bzn_envelope& encoded_msg)
{
Expand All @@ -63,7 +112,9 @@ pbft_persistent_operation::record_pbft_msg(const pbft_msg& msg, const bzn_envelo
return;
}

const auto response = this->storage->create(this->typed_prefix(msg.type()), encoded_msg.sender(), encoded_msg.SerializeAsString());
const auto response = this->storage->create(get_uuid()
, generate_key(this->typed_prefix(msg.type()), encoded_msg.sender()), encoded_msg.SerializeAsString());

switch (response)
{
case storage_result::ok:
Expand All @@ -80,7 +131,7 @@ pbft_persistent_operation::record_pbft_msg(const pbft_msg& msg, const bzn_envelo
pbft_operation_stage
pbft_persistent_operation::get_stage() const
{
const auto response = this->storage->read(this->prefix, STAGE_KEY);
const auto response = this->storage->read(get_uuid(), generate_key(this->prefix, STAGE_KEY));
if (!response)
{
throw std::runtime_error("failed to read stage of pbft_operation " + this->prefix + " from storage");
Expand All @@ -96,7 +147,7 @@ pbft_persistent_operation::advance_operation_stage(pbft_operation_stage new_stag
case pbft_operation_stage::prepare :
throw std::runtime_error("cannot advance to initial stage");
case pbft_operation_stage::commit :
if (!this->is_preprepared() || this->get_stage() != pbft_operation_stage::prepare)
if (!this->is_prepared() || this->get_stage() != pbft_operation_stage::prepare)
{
throw std::runtime_error("illegal move to commit phase");
}
Expand All @@ -111,7 +162,8 @@ pbft_persistent_operation::advance_operation_stage(pbft_operation_stage new_stag
throw std::runtime_error("unknown pbft_operation_stage: " + std::to_string(static_cast<int>(new_stage)));
}

const auto response = this->storage->update(this->prefix, STAGE_KEY, std::to_string(static_cast<int>(new_stage)));
const auto response = this->storage->update(get_uuid(), generate_key(this->prefix, STAGE_KEY)
, std::to_string(static_cast<int>(new_stage)));
if (response != storage_result::ok)
{
throw std::runtime_error("failed to write operation stage update: " + storage_result_msg.at(response));
Expand All @@ -121,25 +173,24 @@ pbft_persistent_operation::advance_operation_stage(pbft_operation_stage new_stag
bool
pbft_persistent_operation::is_preprepared() const
{
size_t keys;
std::tie(keys, std::ignore) = this->storage->get_size(this->typed_prefix(pbft_msg_type::PBFT_MSG_PREPREPARE));
return keys > 0;
auto prefix = this->typed_prefix(pbft_msg_type::PBFT_MSG_PREPREPARE);
return this->storage->get_keys_if(get_uuid(), prefix, this->increment_prefix(prefix)).size() > 0;
}

bool
pbft_persistent_operation::is_prepared() const
{
size_t keys;
std::tie(keys, std::ignore) = this->storage->get_size(this->typed_prefix(pbft_msg_type::PBFT_MSG_PREPARE));
return keys >= pbft::honest_majority_size(this->peers_size) && this->is_preprepared() && this->has_request();
auto prefix = this->typed_prefix(pbft_msg_type::PBFT_MSG_PREPARE);
return this->storage->get_keys_if(get_uuid(), prefix, this->increment_prefix(prefix)).size()
>= pbft::honest_majority_size(this->peers_size) && this->is_preprepared() && this->has_request();
}

bool
pbft_persistent_operation::is_committed() const
{
size_t keys;
std::tie(keys, std::ignore) = this->storage->get_size(this->typed_prefix(pbft_msg_type::PBFT_MSG_COMMIT));
return keys >= pbft::honest_majority_size(this->peers_size) && this->is_prepared();
auto prefix = this->typed_prefix(pbft_msg_type::PBFT_MSG_COMMIT);
return this->storage->get_keys_if(get_uuid(), prefix, this->increment_prefix(prefix)).size()
>= pbft::honest_majority_size(this->peers_size) && this->is_prepared();
}

void
Expand All @@ -151,7 +202,8 @@ pbft_persistent_operation::record_request(const bzn_envelope& encoded_request)
return;
}

const auto response = this->storage->create(this->prefix, REQUEST_KEY, encoded_request.SerializeAsString());
const auto response = this->storage->create(get_uuid(), generate_key(this->prefix, REQUEST_KEY)
, encoded_request.SerializeAsString());
switch (response)
{
case storage_result::ok:
Expand Down Expand Up @@ -183,7 +235,7 @@ pbft_persistent_operation::load_transient_request() const
return;
}

const auto response = this->storage->read(this->prefix, REQUEST_KEY);
const auto response = this->storage->read(get_uuid(), generate_key(this->prefix, REQUEST_KEY));
if (!response.has_value())
{
return;
Expand Down Expand Up @@ -257,14 +309,15 @@ pbft_persistent_operation::typed_prefix(pbft_msg_type pbft_type) const
bzn_envelope
pbft_persistent_operation::get_preprepare() const
{
auto keys = this->storage->get_keys(this->typed_prefix(PBFT_MSG_PREPREPARE));
auto keys = this->storage->get_keys_if(get_uuid(), this->typed_prefix(pbft_msg_type::PBFT_MSG_PREPREPARE)
, this->typed_prefix(pbft_msg_type::PBFT_MSG_PREPARE));
if (keys.size() == 0)
{
throw std::runtime_error("tried to fetch a preprepare that we don't have for operation " + this->prefix);
}

bzn_envelope env;
if (!env.ParseFromString(this->storage->read(this->typed_prefix(PBFT_MSG_PREPREPARE), keys.at(0)).value_or("")))
if (!env.ParseFromString(this->storage->read(get_uuid(), keys.at(0)).value_or("")))
{
throw std::runtime_error("failed to parse or fetch preprepare that we supposedly have? " + this->prefix);
}
Expand All @@ -275,16 +328,76 @@ pbft_persistent_operation::get_preprepare() const
std::map<bzn::uuid_t, bzn_envelope>
pbft_persistent_operation::get_prepares() const
{
auto keys = this->storage->get_keys(this->typed_prefix(PBFT_MSG_PREPARE));
auto keys = this->storage->get_keys_if(get_uuid(), this->typed_prefix(pbft_msg_type::PBFT_MSG_PREPARE)
, this->typed_prefix(pbft_msg_type::PBFT_MSG_COMMIT));
std::map<uuid_t, bzn_envelope> result;

for (const auto& key : keys)
{
if (!result[key].ParseFromString(this->storage->read(this->typed_prefix(PBFT_MSG_PREPARE), key).value_or("")))
if (!result[key].ParseFromString(this->storage->read(get_uuid(), key).value_or("")))
{
throw std::runtime_error("failed to parse or fetch prepare that we supposedly have? " + this->prefix);
}
}

return result;
}

std::vector<std::shared_ptr<pbft_persistent_operation>>
pbft_persistent_operation::prepared_operations_in_range(std::shared_ptr<bzn::storage_base> storage, uint64_t start
, std::optional<uint64_t> end)
{
static const std::regex pattern(STAGE_KEY + "$");

auto first = (boost::format("%020u_") % start).str();
auto last = end ? (boost::format("%020u_") % *end).str() : "";
auto matches = storage->read_if(get_uuid(), first, last, [](const std::string& key, const std::string& value)->bool
{
return (value == std::to_string(static_cast<unsigned int>(pbft_operation_stage::commit))
|| value == std::to_string(static_cast<unsigned int>(pbft_operation_stage::execute)))
&& std::regex_search(key, pattern);
});

std::vector<std::shared_ptr<pbft_persistent_operation>> results;
for (const auto& m : matches)
{
auto prefix = m.first.substr(0, m.first.find("_" + STAGE_KEY));
auto key = generate_key(prefix, REQUEST_KEY);
auto res = storage->read(get_uuid(), key);
if (res)
{
uint64_t view;
uint64_t sequence;
bzn::hash_t hash;
if (parse_prefix(prefix, view, sequence, hash))
{
auto op = std::make_shared<pbft_persistent_operation>(storage, view, sequence, hash);
results.push_back(op);
}
}
}

return results;
}

void
pbft_persistent_operation::remove_range(std::shared_ptr<bzn::storage_base> storage, uint64_t first, uint64_t last)
{
storage->remove_range(get_uuid(), prefix_for_sequence(first), prefix_for_sequence(last));
}

std::string
pbft_persistent_operation::increment_prefix(const std::string& prefix) const
{
auto result = prefix;
if (result.back() < std::numeric_limits<char>::max())
{
result.back()++;
}
else
{
result += char(0x1);
}

return result;
}

0 comments on commit f9470c4

Please sign in to comment.