Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Replay check to ordering gate #1868

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 44 additions & 5 deletions irohad/ordering/impl/on_demand_ordering_gate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

#include "ordering/impl/on_demand_ordering_gate.hpp"

#include <boost/range/adaptor/filtered.hpp>
#include <boost/range/empty.hpp>
#include "ametsuchi/tx_presence_cache.hpp"
#include "common/visitor.hpp"
#include "interfaces/iroha_internal/proposal.hpp"
#include "interfaces/iroha_internal/transaction_batch.hpp"
Expand All @@ -17,6 +20,7 @@ OnDemandOrderingGate::OnDemandOrderingGate(
std::shared_ptr<transport::OdOsNotification> network_client,
rxcpp::observable<BlockRoundEventType> events,
std::unique_ptr<shared_model::interface::UnsafeProposalFactory> factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
consensus::Round initial_round)
: ordering_service_(std::move(ordering_service)),
network_client_(std::move(network_client)),
Expand All @@ -41,14 +45,12 @@ OnDemandOrderingGate::OnDemandOrderingGate(
// request proposal for the current round
auto proposal = network_client_->onRequestProposal(current_round_);

auto final_proposal = this->processProposalRequest(std::move(proposal));
// vote for the object received from the network
proposal_notifier_.get_subscriber().on_next(
std::move(proposal).value_or_eval([&] {
return proposal_factory_->unsafeCreateProposal(
current_round_.block_round, current_round_.reject_round, {});
}));
proposal_notifier_.get_subscriber().on_next(std::move(final_proposal));
})),
proposal_factory_(std::move(factory)),
tx_cache_(std::move(tx_cache)),
current_round_(initial_round) {}

void OnDemandOrderingGate::propagateBatch(
Expand All @@ -68,3 +70,40 @@ void OnDemandOrderingGate::setPcs(
throw std::logic_error(
"Method is deprecated. PCS observable should be set in ctor");
}

std::unique_ptr<shared_model::interface::Proposal>
OnDemandOrderingGate::processProposalRequest(
boost::optional<OnDemandOrderingService::ProposalType> &&proposal) const {
if (not proposal) {
return proposal_factory_->unsafeCreateProposal(
current_round_.block_round, current_round_.reject_round, {});
}
// no need to check empty proposal
if (boost::empty(proposal.value()->transactions())) {
return std::move(proposal.value());
}
return removeReplays(std::move(**std::move(proposal)));
}

std::unique_ptr<shared_model::interface::Proposal>
OnDemandOrderingGate::removeReplays(
shared_model::interface::Proposal &&proposal) const {
auto tx_is_not_processed = [this](const auto &tx) {
auto tx_result = tx_cache_->check(tx.hash());
return iroha::visit_in_place(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check if it creates a visitor struct on each call. If it does, it is better to replace this construction with boost::get.

tx_result,
[](const ametsuchi::tx_cache_status_responses::Missing &) {
return true;
},
[](const auto &status) {
// TODO nickaleks 21.11.18: IR-1887 log replayed transactions
// when log is added
return false;
});
};
auto unprocessed_txs =
boost::adaptors::filter(proposal.transactions(), tx_is_not_processed);

return proposal_factory_->unsafeCreateProposal(
proposal.height(), proposal.createdTime(), unprocessed_txs);
}
19 changes: 19 additions & 0 deletions irohad/ordering/impl/on_demand_ordering_gate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
#include "ordering/on_demand_ordering_service.hpp"

namespace iroha {
namespace ametsuchi {
class TxPresenceCache;
}

namespace ordering {

/**
Expand All @@ -44,6 +48,7 @@ namespace iroha {
rxcpp::observable<BlockRoundEventType> events,
std::unique_ptr<shared_model::interface::UnsafeProposalFactory>
factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
consensus::Round initial_round);

void propagateBatch(
Expand All @@ -57,11 +62,25 @@ namespace iroha {
const iroha::network::PeerCommunicationService &pcs) override;

private:
/**
* Handle an incoming proposal from ordering service
*/
std::unique_ptr<shared_model::interface::Proposal> processProposalRequest(
boost::optional<OnDemandOrderingService::ProposalType>
&&proposal) const;

/**
* remove already processed transactions from proposal
*/
std::unique_ptr<shared_model::interface::Proposal> removeReplays(
shared_model::interface::Proposal &&proposal) const;

std::shared_ptr<OnDemandOrderingService> ordering_service_;
std::shared_ptr<transport::OdOsNotification> network_client_;
rxcpp::composite_subscription events_subscription_;
std::unique_ptr<shared_model::interface::UnsafeProposalFactory>
proposal_factory_;
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache_;

consensus::Round current_round_;
rxcpp::subjects::subject<
Expand Down
4 changes: 2 additions & 2 deletions shared_model/backend/protobuf/proto_proposal_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace shared_model {
std::unique_ptr<interface::Proposal> unsafeCreateProposal(
interface::types::HeightType height,
interface::types::TimestampType created_time,
const UnsafeTransactionsCollectionType &transactions) override {
UnsafeTransactionsCollectionType transactions) override {
return std::make_unique<Proposal>(
createProtoProposal(height, created_time, transactions));
}
Expand All @@ -52,7 +52,7 @@ namespace shared_model {
iroha::protocol::Proposal createProtoProposal(
interface::types::HeightType height,
interface::types::TimestampType created_time,
const UnsafeTransactionsCollectionType &transactions) {
UnsafeTransactionsCollectionType transactions) {
iroha::protocol::Proposal proposal;

proposal.set_height(height);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace shared_model {
virtual std::unique_ptr<Proposal> unsafeCreateProposal(
types::HeightType height,
types::TimestampType created_time,
const TransactionsCollectionType &transactions) = 0;
TransactionsCollectionType transactions) = 0;

virtual ~UnsafeProposalFactory() = default;
};
Expand Down
67 changes: 64 additions & 3 deletions test/module/irohad/ordering/on_demand_ordering_gate_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
#include "ordering/impl/on_demand_ordering_gate.hpp"

#include <gtest/gtest.h>
#include <boost/range/adaptor/indirected.hpp>
#include "framework/test_subscriber.hpp"
#include "interfaces/iroha_internal/transaction_batch_impl.hpp"
#include "module/irohad/ametsuchi/ametsuchi_mocks.hpp"
#include "module/irohad/ordering/ordering_mocks.hpp"
#include "module/shared_model/interface_mocks.hpp"

Expand All @@ -19,27 +21,35 @@ using namespace framework::test_subscriber;

using ::testing::_;
using ::testing::ByMove;
using ::testing::NiceMock;
using ::testing::Return;
using ::testing::Truly;

struct OnDemandOrderingGateTest : public ::testing::Test {
void SetUp() override {
ordering_service = std::make_shared<MockOnDemandOrderingService>();
notification = std::make_shared<MockOdOsNotification>();
auto ufactory = std::make_unique<MockUnsafeProposalFactory>();
auto ufactory = std::make_unique<NiceMock<MockUnsafeProposalFactory>>();
factory = ufactory.get();
tx_cache = std::make_shared<ametsuchi::MockTxPresenceCache>();
ON_CALL(*tx_cache,
check(testing::Matcher<const shared_model::crypto::Hash &>(_)))
.WillByDefault(
Return(iroha::ametsuchi::tx_cache_status_responses::Missing()));
ordering_gate =
std::make_shared<OnDemandOrderingGate>(ordering_service,
notification,
rounds.get_observable(),
std::move(ufactory),
tx_cache,
initial_round);
}

rxcpp::subjects::subject<OnDemandOrderingGate::BlockRoundEventType> rounds;
std::shared_ptr<MockOnDemandOrderingService> ordering_service;
std::shared_ptr<MockOdOsNotification> notification;
MockUnsafeProposalFactory *factory;
std::shared_ptr<ametsuchi::MockTxPresenceCache> tx_cache;
std::shared_ptr<OnDemandOrderingGate> ordering_gate;

const consensus::Round initial_round = {2, 1};
Expand Down Expand Up @@ -71,7 +81,8 @@ TEST_F(OnDemandOrderingGateTest, BlockEvent) {
OnDemandOrderingGate::BlockEvent event = {block};
consensus::Round round{event->height(), 1};

boost::optional<OdOsNotification::ProposalType> oproposal(nullptr);
boost::optional<OdOsNotification::ProposalType> oproposal(
std::make_unique<MockProposal>());
auto proposal = oproposal.value().get();

EXPECT_CALL(*ordering_service, onCollaborationOutcome(round)).Times(1);
Expand All @@ -98,7 +109,8 @@ TEST_F(OnDemandOrderingGateTest, EmptyEvent) {
consensus::Round round{initial_round.block_round,
initial_round.reject_round + 1};

boost::optional<OdOsNotification::ProposalType> oproposal(nullptr);
boost::optional<OdOsNotification::ProposalType> oproposal(
std::make_unique<MockProposal>());
auto proposal = oproposal.value().get();

EXPECT_CALL(*ordering_service, onCollaborationOutcome(round)).Times(1);
Expand Down Expand Up @@ -178,3 +190,52 @@ TEST_F(OnDemandOrderingGateTest, EmptyEventNoProposal) {

ASSERT_TRUE(gate_wrapper.validate());
}

/**
* @given initialized ordering gate
* @when new proposal arrives and the transaction was already committed
* @then the resulting proposal emitted by ordering gate does not contain
* this transaction
*/
TEST_F(OnDemandOrderingGateTest, ReplayedTransactionInProposal) {
auto block = std::make_shared<MockBlock>();
EXPECT_CALL(*block, height()).WillRepeatedly(Return(2));
OnDemandOrderingGate::BlockEvent event = {block};

// initialize mock transaction
auto tx1 = std::make_shared<NiceMock<MockTransaction>>();
auto hash = shared_model::crypto::Hash("mock code is readable");
ON_CALL(*tx1, hash()).WillByDefault(testing::ReturnRef(testing::Const(hash)));
std::vector<decltype(tx1)> txs{tx1};
auto tx_range = txs | boost::adaptors::indirected;

// initialize mock proposal
auto proposal = std::make_unique<NiceMock<MockProposal>>();
ON_CALL(*proposal, transactions()).WillByDefault(Return(tx_range));
boost::optional<OdOsNotification::ProposalType> arriving_proposal =
std::unique_ptr<shared_model::interface::Proposal>(std::move(proposal));

// set expectations for ordering service
EXPECT_CALL(*ordering_service, onCollaborationOutcome(initial_round))
.Times(1);
EXPECT_CALL(*notification, onRequestProposal(initial_round))
.WillOnce(Return(ByMove(std::move(arriving_proposal))));
EXPECT_CALL(*tx_cache,
check(testing::Matcher<const shared_model::crypto::Hash &>(_)))
.WillOnce(
Return(iroha::ametsuchi::tx_cache_status_responses::Committed()));
// expect proposal to be created without any transactions because it was
// removed by tx cache
EXPECT_CALL(
*factory,
unsafeCreateProposal(
_, _, MockUnsafeProposalFactory::TransactionsCollectionType()))
.Times(1);

auto gate_wrapper =
make_test_subscriber<CallExact>(ordering_gate->on_proposal(), 1);
gate_wrapper.subscribe([&](auto proposal) {});
rounds.get_subscriber().on_next(event);

ASSERT_TRUE(gate_wrapper.validate());
}
2 changes: 1 addition & 1 deletion test/module/shared_model/interface_mocks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ struct MockUnsafeProposalFactory
std::unique_ptr<shared_model::interface::Proposal>(
shared_model::interface::types::HeightType,
shared_model::interface::types::TimestampType,
const TransactionsCollectionType &));
TransactionsCollectionType));
};

struct MockCommonObjectsFactory
Expand Down