diff --git a/irohad/ordering/impl/on_demand_ordering_gate.cpp b/irohad/ordering/impl/on_demand_ordering_gate.cpp index a68b89ff72..4f5ebc25c4 100644 --- a/irohad/ordering/impl/on_demand_ordering_gate.cpp +++ b/irohad/ordering/impl/on_demand_ordering_gate.cpp @@ -5,6 +5,9 @@ #include "ordering/impl/on_demand_ordering_gate.hpp" +#include +#include +#include "ametsuchi/tx_presence_cache.hpp" #include "common/visitor.hpp" #include "interfaces/iroha_internal/proposal.hpp" #include "interfaces/iroha_internal/transaction_batch.hpp" @@ -17,6 +20,7 @@ OnDemandOrderingGate::OnDemandOrderingGate( std::shared_ptr network_client, rxcpp::observable events, std::unique_ptr factory, + std::shared_ptr tx_cache, consensus::Round initial_round) : ordering_service_(std::move(ordering_service)), network_client_(std::move(network_client)), @@ -41,14 +45,12 @@ OnDemandOrderingGate::OnDemandOrderingGate( // request proposal for the current round auto proposal = network_client_->onRequestProposal(current_round_); + auto final_proposal = this->processProposalRequest(std::move(proposal)); // vote for the object received from the network - proposal_notifier_.get_subscriber().on_next( - std::move(proposal).value_or_eval([&] { - return proposal_factory_->unsafeCreateProposal( - current_round_.block_round, current_round_.reject_round, {}); - })); + proposal_notifier_.get_subscriber().on_next(std::move(final_proposal)); })), proposal_factory_(std::move(factory)), + tx_cache_(std::move(tx_cache)), current_round_(initial_round) {} void OnDemandOrderingGate::propagateBatch( @@ -68,3 +70,40 @@ void OnDemandOrderingGate::setPcs( throw std::logic_error( "Method is deprecated. PCS observable should be set in ctor"); } + +std::unique_ptr +OnDemandOrderingGate::processProposalRequest( + boost::optional &&proposal) const { + if (not proposal) { + return proposal_factory_->unsafeCreateProposal( + current_round_.block_round, current_round_.reject_round, {}); + } + // no need to check empty proposal + if (boost::empty(proposal.value()->transactions())) { + return std::move(proposal.value()); + } + return removeReplays(std::move(**std::move(proposal))); +} + +std::unique_ptr +OnDemandOrderingGate::removeReplays( + shared_model::interface::Proposal &&proposal) const { + auto tx_is_not_processed = [this](const auto &tx) { + auto tx_result = tx_cache_->check(tx.hash()); + return iroha::visit_in_place( + tx_result, + [](const ametsuchi::tx_cache_status_responses::Missing &) { + return true; + }, + [](const auto &status) { + // TODO nickaleks 21.11.18: IR-1887 log replayed transactions + // when log is added + return false; + }); + }; + auto unprocessed_txs = + boost::adaptors::filter(proposal.transactions(), tx_is_not_processed); + + return proposal_factory_->unsafeCreateProposal( + proposal.height(), proposal.createdTime(), unprocessed_txs); +} diff --git a/irohad/ordering/impl/on_demand_ordering_gate.hpp b/irohad/ordering/impl/on_demand_ordering_gate.hpp index 039491c311..27ea7ffd86 100644 --- a/irohad/ordering/impl/on_demand_ordering_gate.hpp +++ b/irohad/ordering/impl/on_demand_ordering_gate.hpp @@ -18,6 +18,10 @@ #include "ordering/on_demand_ordering_service.hpp" namespace iroha { + namespace ametsuchi { + class TxPresenceCache; + } + namespace ordering { /** @@ -44,6 +48,7 @@ namespace iroha { rxcpp::observable events, std::unique_ptr factory, + std::shared_ptr tx_cache, consensus::Round initial_round); void propagateBatch( @@ -57,11 +62,25 @@ namespace iroha { const iroha::network::PeerCommunicationService &pcs) override; private: + /** + * Handle an incoming proposal from ordering service + */ + std::unique_ptr processProposalRequest( + boost::optional + &&proposal) const; + + /** + * remove already processed transactions from proposal + */ + std::unique_ptr removeReplays( + shared_model::interface::Proposal &&proposal) const; + std::shared_ptr ordering_service_; std::shared_ptr network_client_; rxcpp::composite_subscription events_subscription_; std::unique_ptr proposal_factory_; + std::shared_ptr tx_cache_; consensus::Round current_round_; rxcpp::subjects::subject< diff --git a/shared_model/backend/protobuf/proto_proposal_factory.hpp b/shared_model/backend/protobuf/proto_proposal_factory.hpp index 6bef471cb2..3a8516e6a6 100644 --- a/shared_model/backend/protobuf/proto_proposal_factory.hpp +++ b/shared_model/backend/protobuf/proto_proposal_factory.hpp @@ -35,7 +35,7 @@ namespace shared_model { std::unique_ptr unsafeCreateProposal( interface::types::HeightType height, interface::types::TimestampType created_time, - const UnsafeTransactionsCollectionType &transactions) override { + UnsafeTransactionsCollectionType transactions) override { return std::make_unique( createProtoProposal(height, created_time, transactions)); } @@ -52,7 +52,7 @@ namespace shared_model { iroha::protocol::Proposal createProtoProposal( interface::types::HeightType height, interface::types::TimestampType created_time, - const UnsafeTransactionsCollectionType &transactions) { + UnsafeTransactionsCollectionType transactions) { iroha::protocol::Proposal proposal; proposal.set_height(height); diff --git a/shared_model/interfaces/iroha_internal/unsafe_proposal_factory.hpp b/shared_model/interfaces/iroha_internal/unsafe_proposal_factory.hpp index a9008aad3f..9390defb92 100644 --- a/shared_model/interfaces/iroha_internal/unsafe_proposal_factory.hpp +++ b/shared_model/interfaces/iroha_internal/unsafe_proposal_factory.hpp @@ -28,7 +28,7 @@ namespace shared_model { virtual std::unique_ptr unsafeCreateProposal( types::HeightType height, types::TimestampType created_time, - const TransactionsCollectionType &transactions) = 0; + TransactionsCollectionType transactions) = 0; virtual ~UnsafeProposalFactory() = default; }; diff --git a/test/module/irohad/ordering/on_demand_ordering_gate_test.cpp b/test/module/irohad/ordering/on_demand_ordering_gate_test.cpp index 1e71028140..2990e86ef8 100644 --- a/test/module/irohad/ordering/on_demand_ordering_gate_test.cpp +++ b/test/module/irohad/ordering/on_demand_ordering_gate_test.cpp @@ -6,8 +6,10 @@ #include "ordering/impl/on_demand_ordering_gate.hpp" #include +#include #include "framework/test_subscriber.hpp" #include "interfaces/iroha_internal/transaction_batch_impl.hpp" +#include "module/irohad/ametsuchi/ametsuchi_mocks.hpp" #include "module/irohad/ordering/ordering_mocks.hpp" #include "module/shared_model/interface_mocks.hpp" @@ -19,6 +21,7 @@ using namespace framework::test_subscriber; using ::testing::_; using ::testing::ByMove; +using ::testing::NiceMock; using ::testing::Return; using ::testing::Truly; @@ -26,13 +29,19 @@ struct OnDemandOrderingGateTest : public ::testing::Test { void SetUp() override { ordering_service = std::make_shared(); notification = std::make_shared(); - auto ufactory = std::make_unique(); + auto ufactory = std::make_unique>(); factory = ufactory.get(); + tx_cache = std::make_shared(); + ON_CALL(*tx_cache, + check(testing::Matcher(_))) + .WillByDefault( + Return(iroha::ametsuchi::tx_cache_status_responses::Missing())); ordering_gate = std::make_shared(ordering_service, notification, rounds.get_observable(), std::move(ufactory), + tx_cache, initial_round); } @@ -40,6 +49,7 @@ struct OnDemandOrderingGateTest : public ::testing::Test { std::shared_ptr ordering_service; std::shared_ptr notification; MockUnsafeProposalFactory *factory; + std::shared_ptr tx_cache; std::shared_ptr ordering_gate; const consensus::Round initial_round = {2, 1}; @@ -71,7 +81,8 @@ TEST_F(OnDemandOrderingGateTest, BlockEvent) { OnDemandOrderingGate::BlockEvent event = {block}; consensus::Round round{event->height(), 1}; - boost::optional oproposal(nullptr); + boost::optional oproposal( + std::make_unique()); auto proposal = oproposal.value().get(); EXPECT_CALL(*ordering_service, onCollaborationOutcome(round)).Times(1); @@ -98,7 +109,8 @@ TEST_F(OnDemandOrderingGateTest, EmptyEvent) { consensus::Round round{initial_round.block_round, initial_round.reject_round + 1}; - boost::optional oproposal(nullptr); + boost::optional oproposal( + std::make_unique()); auto proposal = oproposal.value().get(); EXPECT_CALL(*ordering_service, onCollaborationOutcome(round)).Times(1); @@ -178,3 +190,52 @@ TEST_F(OnDemandOrderingGateTest, EmptyEventNoProposal) { ASSERT_TRUE(gate_wrapper.validate()); } + +/** + * @given initialized ordering gate + * @when new proposal arrives and the transaction was already committed + * @then the resulting proposal emitted by ordering gate does not contain + * this transaction + */ +TEST_F(OnDemandOrderingGateTest, ReplayedTransactionInProposal) { + auto block = std::make_shared(); + EXPECT_CALL(*block, height()).WillRepeatedly(Return(2)); + OnDemandOrderingGate::BlockEvent event = {block}; + + // initialize mock transaction + auto tx1 = std::make_shared>(); + auto hash = shared_model::crypto::Hash("mock code is readable"); + ON_CALL(*tx1, hash()).WillByDefault(testing::ReturnRef(testing::Const(hash))); + std::vector txs{tx1}; + auto tx_range = txs | boost::adaptors::indirected; + + // initialize mock proposal + auto proposal = std::make_unique>(); + ON_CALL(*proposal, transactions()).WillByDefault(Return(tx_range)); + boost::optional arriving_proposal = + std::unique_ptr(std::move(proposal)); + + // set expectations for ordering service + EXPECT_CALL(*ordering_service, onCollaborationOutcome(initial_round)) + .Times(1); + EXPECT_CALL(*notification, onRequestProposal(initial_round)) + .WillOnce(Return(ByMove(std::move(arriving_proposal)))); + EXPECT_CALL(*tx_cache, + check(testing::Matcher(_))) + .WillOnce( + Return(iroha::ametsuchi::tx_cache_status_responses::Committed())); + // expect proposal to be created without any transactions because it was + // removed by tx cache + EXPECT_CALL( + *factory, + unsafeCreateProposal( + _, _, MockUnsafeProposalFactory::TransactionsCollectionType())) + .Times(1); + + auto gate_wrapper = + make_test_subscriber(ordering_gate->on_proposal(), 1); + gate_wrapper.subscribe([&](auto proposal) {}); + rounds.get_subscriber().on_next(event); + + ASSERT_TRUE(gate_wrapper.validate()); +} diff --git a/test/module/shared_model/interface_mocks.hpp b/test/module/shared_model/interface_mocks.hpp index 39ce35e3c6..b556da8881 100644 --- a/test/module/shared_model/interface_mocks.hpp +++ b/test/module/shared_model/interface_mocks.hpp @@ -99,7 +99,7 @@ struct MockUnsafeProposalFactory std::unique_ptr( shared_model::interface::types::HeightType, shared_model::interface::types::TimestampType, - const TransactionsCollectionType &)); + TransactionsCollectionType)); }; struct MockCommonObjectsFactory