Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
KEP-329: Primary should only send out preprepares once for each reque…
Browse files Browse the repository at this point in the history
…st even if the request is duplicated
  • Loading branch information
paularchard committed Nov 19, 2018
1 parent 85c545c commit dcffa3c
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 9 deletions.
68 changes: 60 additions & 8 deletions pbft/pbft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <iterator>
#include <crud/crud_base.hpp>
#include <random>
#include <chrono>

using namespace bzn;

Expand Down Expand Up @@ -245,10 +246,11 @@ pbft::preliminary_filter_msg(const pbft_msg& msg)
}

std::shared_ptr<pbft_operation>
pbft::setup_request_operation(const bzn::encoded_message& request, const std::shared_ptr<session_base>& session)
pbft::setup_request_operation(const bzn::encoded_message& request, const request_hash_t& hash,
const std::shared_ptr<session_base>& session)
{
const uint64_t request_seq = this->next_issued_sequence_number++;
auto op = this->find_operation(this->view, request_seq, this->crypto->hash(request));
auto op = this->find_operation(this->view, request_seq, hash);
op->record_request(request);

if (session)
Expand All @@ -260,7 +262,7 @@ pbft::setup_request_operation(const bzn::encoded_message& request, const std::sh
}

void
pbft::handle_request(const pbft_request& /*msg*/, const bzn::json_message& original_msg, const std::shared_ptr<session_base>& session)
pbft::handle_request(const pbft_request& msg, const bzn::json_message& original_msg, const std::shared_ptr<session_base>& session)
{
if (!this->is_primary())
{
Expand All @@ -269,10 +271,26 @@ pbft::handle_request(const pbft_request& /*msg*/, const bzn::json_message& origi
return;
}

//TODO: conditionally discard based on timestamp - KEP-328
if (msg.timestamp() < (this->now() - MAX_REQUEST_AGE_MS))
{
// TODO: send error message to client
LOG(info) << "Rejecting old request: " << original_msg.toStyledString();
return;
}

auto smsg = original_msg.toStyledString();
auto hash = this->crypto->hash(smsg);

//TODO: keep track of what requests we've seen based on timestamp and only send preprepares once - KEP-329
auto op = setup_request_operation(original_msg.toStyledString(), session);
// keep track of what requests we've seen based on timestamp and only send preprepares once
if (this->already_seen_request(msg, hash))
{
// TODO: send error message to client
LOG(info) << "Rejecting duplicate request: " << original_msg.toStyledString();
return;
}
this->saw_request(msg, hash);

auto op = setup_request_operation(smsg, hash, session);
this->do_preprepare(op);
}

Expand Down Expand Up @@ -784,6 +802,10 @@ pbft::stabilize_checkpoint(const checkpoint_t& cp)
cp.first + std::lround(HIGH_WATER_INTERVAL_IN_CHECKPOINTS * CHECKPOINT_INTERVAL));

this->service->consolidate_log(cp.first);

// remove seen requests older than our time threshold
this->recent_requests.erase(this->recent_requests.begin(),
this->recent_requests.upper_bound(this->now() - MAX_REQUEST_AGE_MS));
}

void
Expand Down Expand Up @@ -919,7 +941,7 @@ pbft::handle_database_message(const bzn::json_message& json, std::shared_ptr<bzn

pbft_request req;
*req.mutable_operation() = msg.db();
req.set_timestamp(0); //TODO: KEP-611
req.set_timestamp(this->now()); //TODO: the timestamp needs to come from the client

this->handle_request(req, json, session);

Expand Down Expand Up @@ -1051,7 +1073,8 @@ pbft::broadcast_new_configuration(pbft_configuration::shared_const_ptr config)
cfg_msg->set_configuration(config->to_string());
req.set_allocated_config(cfg_msg);

auto op = this->setup_request_operation(req.SerializeAsString());
auto smsg = req.SerializeAsString();
auto op = this->setup_request_operation(smsg, this->crypto->hash(smsg));
this->do_preprepare(op);
}

Expand Down Expand Up @@ -1095,3 +1118,32 @@ pbft::proposed_config_is_acceptable(std::shared_ptr<pbft_configuration> /* confi
{
return true;
}

timestamp_t
pbft::now() const
{
return static_cast<timestamp_t>(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count());
}

void
pbft::saw_request(const pbft_request& req, const request_hash_t& hash)
{
this->recent_requests.insert(std::make_pair(req.timestamp(),
std::make_pair(req.client(), hash)));
}

bool
pbft::already_seen_request(const pbft_request& req, const request_hash_t& hash) const
{
auto range = this->recent_requests.equal_range(req.timestamp());
for (auto r = range.first; r != range.second; r++)
{
if (r->second.first == req.client() && r->second.second == hash)
{
return true;
}
}

return false;
}
11 changes: 10 additions & 1 deletion pbft/pbft.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ namespace
const std::string INITIAL_CHECKPOINT_HASH = "<null db state>";
const uint64_t CHECKPOINT_INTERVAL = 100; //TODO: KEP-574
const double HIGH_WATER_INTERVAL_IN_CHECKPOINTS = 2.0; //TODO: KEP-574
const uint64_t MAX_REQUEST_AGE_MS = 300000; // 5 minutes
}

namespace bzn
{
using request_hash_t = std::string;
using checkpoint_t = std::pair<uint64_t, bzn::hash_t>;
using timestamp_t = uint64_t;

class pbft final : public bzn::pbft_base, public bzn::status_provider_base, public std::enable_shared_from_this<pbft>
{
Expand Down Expand Up @@ -115,7 +117,7 @@ namespace bzn

pbft_msg common_message_setup(const std::shared_ptr<pbft_operation>& op, pbft_msg_type type);
std::shared_ptr<pbft_operation> setup_request_operation(const bzn::encoded_message& msg
, const std::shared_ptr<session_base>& session = nullptr);
, const request_hash_t& hash, const std::shared_ptr<session_base>& session = nullptr);

void broadcast(const bzn::encoded_message& message);

Expand Down Expand Up @@ -149,6 +151,11 @@ namespace bzn

void maybe_record_request(const pbft_msg& msg, const std::shared_ptr<pbft_operation>& op);

timestamp_t now() const;
bool already_seen_request(const pbft_request& msg, const request_hash_t& hash) const;
void saw_request(const pbft_request& msg, const request_hash_t& hash);


// Using 1 as first value here to distinguish from default value of 0 in protobuf
uint64_t view = 1;
uint64_t next_issued_sequence_number = 1;
Expand Down Expand Up @@ -182,6 +189,8 @@ namespace bzn
std::map<checkpoint_t, std::unordered_map<uuid_t, std::string>> unstable_checkpoint_proofs;
pbft_config_store configurations;

std::multimap<timestamp_t, std::pair<bzn::uuid_t, request_hash_t>> recent_requests;

FRIEND_TEST(pbft_test, join_request_generates_new_config_preprepare);
FRIEND_TEST(pbft_test, valid_leave_request_test);
FRIEND_TEST(pbft_test, invalid_leave_request_test);
Expand Down
1 change: 1 addition & 0 deletions pbft/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ set(test_srcs
pbft_join_leave_test.cpp
pbft_proto_test.cpp
pbft_catchup_test.cpp
pbft_timestamp_test.cpp
database_pbft_service_test.cpp)
set(test_libs pbft crypto options ${Protobuf_LIBRARIES} bootstrap storage)

Expand Down
1 change: 1 addition & 0 deletions pbft/test/pbft_proto_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace bzn

auto request = new pbft_request();
request->set_type(PBFT_REQ_DATABASE);
request->set_timestamp(this->now());
auto dmsg = new database_msg;
auto create = new database_create;
create->set_key(std::string("key_" + std::to_string(++this->index)));
Expand Down
8 changes: 8 additions & 0 deletions pbft/test/pbft_proto_test.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ namespace bzn

// current view
uint64_t view = 1;

// get the current time in milliseconds
uint64_t now()
{ return this->pbft->now(); }

// send request to pbft
void handle_request(const pbft_request& msg, const bzn::json_message& original_msg, const std::shared_ptr<session_base>& session = nullptr)
{ this->pbft->handle_request(msg, original_msg, session); }
};
}

190 changes: 190 additions & 0 deletions pbft/test/pbft_timestamp_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright (C) 2018 Bluzelle
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License

#include <pbft/test/pbft_test_common.hpp>
#include <pbft/test/pbft_proto_test.hpp>
#include <utils/make_endpoint.hpp>
#include <chrono>

using namespace ::testing;

namespace bzn
{
using namespace test;

TEST_F(pbft_proto_test, repeated_request_doesnt_generate_preprepare)
{
this->build_pbft();

auto create = new database_create;
create->set_key(std::string("key"));
create->set_value(std::string("value"));

auto dmsg = new database_msg;
dmsg->set_allocated_create(create);

auto request = new pbft_request();
request->set_type(PBFT_REQ_DATABASE);
request->set_allocated_operation(dmsg);
request->set_timestamp(this->now());
request->set_client(TEST_NODE_UUID);

bzn::json_message json_msg;
json_msg["msg"] = request->SerializeAsString();

// the first time we should get pre-prepare messages
EXPECT_CALL(*this->mock_node, send_message_str(_, ResultOf(test::is_preprepare, Eq(true))))
.Times(Exactly(TEST_PEER_LIST.size()));
this->handle_request(*request, json_msg);

auto request2 = new pbft_request(*request);
auto json_msg2 = json_msg;

// this time no pre-prepare should be issued
EXPECT_CALL(*this->mock_node, send_message_str(_, ResultOf(test::is_preprepare, Eq(true))))
.Times(Exactly(0));
this->handle_request(*request2, json_msg2);
}

TEST_F(pbft_proto_test, similar_request_generates_preprepare)
{
this->build_pbft();

// send an initial message
auto create = new database_create;
create->set_key(std::string("key"));
create->set_value(std::string("value"));

auto dmsg = new database_msg;
dmsg->set_allocated_create(create);

auto request = new pbft_request();
request->set_type(PBFT_REQ_DATABASE);
request->set_allocated_operation(dmsg);
request->set_timestamp(this->now());
request->set_client(TEST_NODE_UUID);

bzn::json_message json_msg;
json_msg["msg"] = request->SerializeAsString();

// we should get pre-prepare messages
EXPECT_CALL(*this->mock_node, send_message_str(_, ResultOf(test::is_preprepare, Eq(true))))
.Times(Exactly(TEST_PEER_LIST.size()));
this->handle_request(*request, json_msg);


// send a second message the same as first but with a slightly different timestamp
auto request2 = new pbft_request(*request);
request2->set_timestamp(request2->timestamp() + 1);
bzn::json_message json_msg2;
json_msg2["msg"] = request2->SerializeAsString();

// again we should get pre-prepare messages
EXPECT_CALL(*this->mock_node, send_message_str(_, ResultOf(test::is_preprepare, Eq(true))))
.Times(Exactly(TEST_PEER_LIST.size()));
this->handle_request(*request2, json_msg2);


// send a third message the same as first but with same timestamp and different operation
auto create3 = new database_create;
create3->set_key(std::string("key3"));
create3->set_value(std::string("value3"));

auto dmsg3 = new database_msg;
dmsg3->set_allocated_create(create3);
auto request3 = new pbft_request(*request);
request3->set_allocated_operation(dmsg3);
bzn::json_message json_msg3;
json_msg3["msg"] = request3->SerializeAsString();

// again we should get pre-prepare messages
EXPECT_CALL(*this->mock_node, send_message_str(_, ResultOf(test::is_preprepare, Eq(true))))
.Times(Exactly(TEST_PEER_LIST.size()));
this->handle_request(*request3, json_msg3);
}

TEST_F(pbft_proto_test, same_request_from_different_client_generates_preprepare)
{
this->build_pbft();

// send an initial message
auto create = new database_create;
create->set_key(std::string("key"));
create->set_value(std::string("value"));

auto dmsg = new database_msg;
dmsg->set_allocated_create(create);

auto request = new pbft_request();
request->set_type(PBFT_REQ_DATABASE);
request->set_allocated_operation(dmsg);
request->set_timestamp(this->now());
request->set_client(TEST_NODE_UUID);

bzn::json_message json_msg;
json_msg["msg"] = request->SerializeAsString();

// we should get pre-prepare messages
EXPECT_CALL(*this->mock_node, send_message_str(_, ResultOf(test::is_preprepare, Eq(true))))
.Times(Exactly(TEST_PEER_LIST.size()));
this->handle_request(*request, json_msg);

// send a second message the same as first but from different client
auto request2 = new pbft_request(*request);
request2->set_client(SECOND_NODE_UUID);
bzn::json_message json_msg2;
json_msg2["msg"] = request2->SerializeAsString();

// again we should get pre-prepare messages
EXPECT_CALL(*this->mock_node, send_message_str(_, ResultOf(test::is_preprepare, Eq(true))))
.Times(Exactly(TEST_PEER_LIST.size()));
this->handle_request(*request2, json_msg2);
}

TEST_F(pbft_proto_test, old_request_is_rejected)
{
this->build_pbft();

auto create = new database_create;
create->set_key(std::string("key"));
create->set_value(std::string("value"));

auto dmsg = new database_msg;
dmsg->set_allocated_create(create);

auto request = new pbft_request();
request->set_type(PBFT_REQ_DATABASE);
request->set_allocated_operation(dmsg);
request->set_timestamp(this->now() - MAX_REQUEST_AGE_MS - 1);
request->set_client(TEST_NODE_UUID);

bzn::json_message json_msg;
json_msg["msg"] = request->SerializeAsString();

// we should NOT get pre-prepare messages since this is an old request
EXPECT_CALL(*this->mock_node, send_message_str(_, ResultOf(test::is_preprepare, Eq(true))))
.Times(Exactly(0));
this->handle_request(*request, json_msg);
}

TEST_F(pbft_proto_test, range_test)
{
std::map<int, std::string> m;
m.insert(std::make_pair(2, "2"));
m.insert(std::make_pair(3, "3"));
m.insert(std::make_pair(4, "4"));

auto r = m.equal_range(1);
EXPECT_EQ(r.first, r.second);
}
}

0 comments on commit dcffa3c

Please sign in to comment.