Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Replay prevention on proposal generation #1857

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions irohad/ordering/impl/on_demand_ordering_service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@

#include <unordered_set>

#include <boost/range/adaptor/filtered.hpp>
#include <boost/range/adaptor/indirected.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/algorithm/for_each.hpp>
#include <boost/range/adaptor/indirected.hpp>
#include <boost/range/size.hpp>
#include "ametsuchi/tx_presence_cache.hpp"
#include "common/visitor.hpp"
#include "datetime/time.hpp"
#include "interfaces/iroha_internal/proposal.hpp"
#include "interfaces/iroha_internal/transaction_batch.hpp"
Expand All @@ -25,12 +29,14 @@ const iroha::consensus::RejectRoundType kFirstRound = 1;
OnDemandOrderingServiceImpl::OnDemandOrderingServiceImpl(
size_t transaction_limit,
std::unique_ptr<shared_model::interface::UnsafeProposalFactory>
proposal_factory,
proposal_factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
size_t number_of_proposals,
const consensus::Round &initial_round)
: transaction_limit_(transaction_limit),
number_of_proposals_(number_of_proposals),
proposal_factory_(std::move(proposal_factory)),
tx_cache_(std::move(tx_cache)),
log_(logger::log("OnDemandOrderingServiceImpl")) {
onCollaborationOutcome(initial_round);
}
Expand Down Expand Up @@ -61,11 +67,17 @@ void OnDemandOrderingServiceImpl::onBatches(consensus::Round round,
round.block_round,
round.reject_round);

auto unprocessed_batches =
boost::adaptors::filter(batches, [this](const auto &batch) {
log_->info("check batch {} for already processed transactions",
batch->reducedHash().hex());
return not this->batchAlreadyProcessed(*batch);
});
Akvinikym marked this conversation as resolved.
Show resolved Hide resolved
auto it = current_proposals_.find(round);
if (it != current_proposals_.end()) {
std::for_each(batches.begin(), batches.end(), [&it](auto &obj) {
it->second.push(std::move(obj));
});
std::for_each(unprocessed_batches.begin(),
unprocessed_batches.end(),
[&it](auto &obj) { it->second.push(std::move(obj)); });
log_->info("onTransactions => collection is inserted");
}
}
Expand Down Expand Up @@ -189,3 +201,21 @@ void OnDemandOrderingServiceImpl::tryErase() {
round_queue_.pop();
}
}
bool OnDemandOrderingServiceImpl::batchAlreadyProcessed(
const shared_model::interface::TransactionBatch &batch) {
auto tx_statuses = tx_cache_->check(batch);
// if any transaction is commited or rejected, batch was already processed
// Note: any_of returns false for empty sequence
return std::any_of(
tx_statuses.begin(), tx_statuses.end(), [this](const auto &batch_result) {
return iroha::visit_in_place(
batch_result,
[](const ametsuchi::tx_cache_status_responses::Missing &) {
return false;
},
[this](const auto &status) {
log_->warn("Duplicate transaction: {}", status.hash.hex());
return true;
});
});
}
16 changes: 15 additions & 1 deletion irohad/ordering/impl/on_demand_ordering_service_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
#include <unordered_map>

#include <tbb/concurrent_queue.h>

#include "interfaces/iroha_internal/unsafe_proposal_factory.hpp"
#include "logger/logger.hpp"

namespace iroha {
namespace ametsuchi {
class TxPresenceCache;
}
namespace ordering {
class OnDemandOrderingServiceImpl : public OnDemandOrderingService {
public:
Expand All @@ -34,6 +36,7 @@ namespace iroha {
size_t transaction_limit,
std::unique_ptr<shared_model::interface::UnsafeProposalFactory>
proposal_factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
size_t number_of_proposals = 3,
const consensus::Round &initial_round = {2, 1});

Expand Down Expand Up @@ -68,6 +71,12 @@ namespace iroha {
*/
ProposalType emitProposal(const consensus::Round &round);

/**
* Check if batch was already processed by the peer
*/
bool batchAlreadyProcessed(
const shared_model::interface::TransactionBatch &batch);

/**
* Max number of transaction in one proposal
*/
Expand Down Expand Up @@ -107,6 +116,11 @@ namespace iroha {
std::unique_ptr<shared_model::interface::UnsafeProposalFactory>
proposal_factory_;

/**
* Processed transactions cache used for replay prevention
*/
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache_;

/**
* Logger instance
*/
Expand Down
1 change: 1 addition & 0 deletions test/module/irohad/ametsuchi/ametsuchi_mocks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ namespace iroha {
};

class MockTxPresenceCache : public iroha::ametsuchi::TxPresenceCache {
public:
MOCK_CONST_METHOD1(check,
iroha::ametsuchi::TxCacheStatusType(
const shared_model::crypto::Hash &hash));
Expand Down
1 change: 1 addition & 0 deletions test/module/irohad/ordering/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ addtest(on_demand_os_test on_demand_os_test.cpp)
target_link_libraries(on_demand_os_test
on_demand_ordering_service
shared_model_default_builders
ametsuchi
)

addtest(on_demand_os_client_grpc_test on_demand_os_client_grpc_test.cpp)
Expand Down
137 changes: 129 additions & 8 deletions test/module/irohad/ordering/on_demand_os_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "builders/protobuf/transaction.hpp"
#include "datetime/time.hpp"
#include "interfaces/iroha_internal/transaction_batch_impl.hpp"
#include "module/irohad/ametsuchi/ametsuchi_mocks.hpp"
#include "module/shared_model/interface_mocks.hpp"
#include "module/shared_model/validators/validators.hpp"

Expand All @@ -22,7 +23,9 @@ using namespace iroha::ordering::transport;

using testing::_;
using testing::ByMove;
using testing::Matcher;
using testing::NiceMock;
using testing::Ref;
using testing::Return;

using shared_model::interface::Proposal;
Expand All @@ -36,13 +39,28 @@ class OnDemandOsTest : public ::testing::Test {
const uint32_t proposal_limit = 5;
const consensus::Round initial_round = {2, 1}, target_round = {4, 1},
commit_round = {3, 1}, reject_round = {2, 2};
NiceMock<iroha::ametsuchi::MockTxPresenceCache> *mock_cache;

void SetUp() override {
// TODO: nickaleks IR-1811 use mock factory
auto factory = std::make_unique<
shared_model::proto::ProtoProposalFactory<MockProposalValidator>>();
os = std::make_shared<OnDemandOrderingServiceImpl>(
transaction_limit, std::move(factory), proposal_limit, initial_round);
auto tx_cache =
std::make_unique<NiceMock<iroha::ametsuchi::MockTxPresenceCache>>();
mock_cache = tx_cache.get();
// every batch is new by default
ON_CALL(
*mock_cache,
check(
testing::Matcher<const shared_model::interface::TransactionBatch &>(
_)))
.WillByDefault(Return(std::vector<iroha::ametsuchi::TxCacheStatusType>{
iroha::ametsuchi::tx_cache_status_responses::Missing()}));
os = std::make_shared<OnDemandOrderingServiceImpl>(transaction_limit,
std::move(factory),
std::move(tx_cache),
proposal_limit,
initial_round);
}

/**
Expand All @@ -52,8 +70,14 @@ class OnDemandOsTest : public ::testing::Test {
*/
void generateTransactionsAndInsert(consensus::Round round,
std::pair<uint64_t, uint64_t> range) {
os->onBatches(round, generateTransactions(range));
}

OnDemandOrderingService::CollectionType generateTransactions(
std::pair<uint64_t, uint64_t> range) {
auto now = iroha::time::now();
OnDemandOrderingService::CollectionType collection;

for (auto i = range.first; i < range.second; ++i) {
collection.push_back(
std::make_unique<shared_model::interface::TransactionBatchImpl>(
Expand All @@ -70,7 +94,7 @@ class OnDemandOsTest : public ::testing::Test {
generateKeypair())
.finish())}));
}
os->onBatches(round, std::move(collection));
return collection;
}

std::unique_ptr<Proposal> makeMockProposal() {
Expand Down Expand Up @@ -131,14 +155,19 @@ TEST_F(OnDemandOsTest, OverflowRound) {
* @given initialized on-demand OS
* @when send transactions from different threads
* AND initiate next round
* @then check that all transactions are appeared in proposal
* @then check that all transactions appear in proposal
*/
TEST_F(OnDemandOsTest, DISABLED_ConcurrentInsert) {
auto large_tx_limit = 10000u;
auto factory = std::make_unique<
shared_model::proto::ProtoProposalFactory<MockProposalValidator>>();
os = std::make_shared<OnDemandOrderingServiceImpl>(
large_tx_limit, std::move(factory), proposal_limit, initial_round);
auto tx_cache =
std::make_unique<NiceMock<iroha::ametsuchi::MockTxPresenceCache>>();
os = std::make_shared<OnDemandOrderingServiceImpl>(large_tx_limit,
std::move(factory),
std::move(tx_cache),
proposal_limit,
initial_round);

auto call = [this](auto bounds) {
for (auto i = bounds.first; i < bounds.second; ++i) {
Expand Down Expand Up @@ -213,8 +242,13 @@ TEST_F(OnDemandOsTest, EraseReject) {
TEST_F(OnDemandOsTest, UseFactoryForProposal) {
auto factory = std::make_unique<MockUnsafeProposalFactory>();
auto mock_factory = factory.get();
os = std::make_shared<OnDemandOrderingServiceImpl>(
transaction_limit, std::move(factory), proposal_limit, initial_round);
auto tx_cache =
std::make_unique<NiceMock<iroha::ametsuchi::MockTxPresenceCache>>();
os = std::make_shared<OnDemandOrderingServiceImpl>(transaction_limit,
std::move(factory),
std::move(tx_cache),
proposal_limit,
initial_round);

EXPECT_CALL(*mock_factory, unsafeCreateProposal(_, _, _))
.WillOnce(Return(ByMove(makeMockProposal())));
Expand All @@ -225,3 +259,90 @@ TEST_F(OnDemandOsTest, UseFactoryForProposal) {

ASSERT_TRUE(os->onRequestProposal(target_round));
}

// Return matcher for batch, which passes it by const &
// used when passing batch as an argument to check() in transaction cache
auto batchRef(const shared_model::interface::TransactionBatch &batch) {
return Matcher<const shared_model::interface::TransactionBatch &>(Ref(batch));
}

/**
* @given initialized on-demand OS
* @when add a batch which was already commited
* @then the batch is not present in a proposal
*/
TEST_F(OnDemandOsTest, AlreadyProcessedProposalDiscarded) {
auto batches = generateTransactions({1, 2});
auto &batch = *batches.at(0);

EXPECT_CALL(*mock_cache, check(batchRef(batch)))
.WillOnce(Return(std::vector<iroha::ametsuchi::TxCacheStatusType>{
iroha::ametsuchi::tx_cache_status_responses::Committed()}));

os->onBatches(initial_round, batches);

os->onCollaborationOutcome(commit_round);

auto proposal = os->onRequestProposal(initial_round);

EXPECT_FALSE(proposal);
}

/**
* @given initialized on-demand OS
* @when add a batch with new transaction
* @then batch is present in a proposal
*/
TEST_F(OnDemandOsTest, PassMissingTransaction) {
auto batches = generateTransactions({1, 2});
auto &batch = *batches.at(0);

EXPECT_CALL(*mock_cache, check(batchRef(batch)))
.WillOnce(Return(std::vector<iroha::ametsuchi::TxCacheStatusType>{
iroha::ametsuchi::tx_cache_status_responses::Missing()}));

os->onBatches(target_round, batches);

os->onCollaborationOutcome(commit_round);

auto proposal = os->onRequestProposal(target_round);

// since we only sent one transaction,
// if the proposal is present, there is no need to check for that specific tx
EXPECT_TRUE(proposal);
}

/**
* @given initialized on-demand OS
* @when add 3 batches, with second one being already commited
* @then 2 new batches are in a proposal and already commited batch is discarded
*/
TEST_F(OnDemandOsTest, SeveralTransactionsOneCommited) {
auto batches = generateTransactions({1, 4});
auto &batch1 = *batches.at(0);
auto &batch2 = *batches.at(1);
auto &batch3 = *batches.at(2);

EXPECT_CALL(*mock_cache, check(batchRef(batch1)))
.WillOnce(Return(std::vector<iroha::ametsuchi::TxCacheStatusType>{
iroha::ametsuchi::tx_cache_status_responses::Missing()}));
EXPECT_CALL(*mock_cache, check(batchRef(batch2)))
.WillOnce(Return(std::vector<iroha::ametsuchi::TxCacheStatusType>{
iroha::ametsuchi::tx_cache_status_responses::Committed()}));
EXPECT_CALL(*mock_cache, check(batchRef(batch3)))
.WillOnce(Return(std::vector<iroha::ametsuchi::TxCacheStatusType>{
iroha::ametsuchi::tx_cache_status_responses::Missing()}));

os->onBatches(target_round, batches);

os->onCollaborationOutcome(commit_round);

auto proposal = os->onRequestProposal(target_round);
const auto &txs = proposal->get()->transactions();
auto &batch2_tx = *batch2.transactions().at(0);

EXPECT_TRUE(proposal);
EXPECT_EQ(boost::size(txs), 2);
// already processed transaction is no present in the proposal
EXPECT_TRUE(std::find(txs.begin(), txs.end(), batch2_tx) == txs.end());
}