Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

On demand OS connection manager #1645

Merged
merged 4 commits into from
Aug 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions irohad/ordering/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,11 @@ target_link_libraries(on_demand_ordering_service_transport_grpc
logger
ordering_grpc
)

add_library(on_demand_connection_manager
impl/on_demand_connection_manager.cpp
)
target_link_libraries(on_demand_connection_manager
shared_model_interfaces
rxcpp
)
52 changes: 52 additions & 0 deletions irohad/ordering/impl/on_demand_connection_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#include "ordering/impl/on_demand_connection_manager.hpp"

#include "interfaces/iroha_internal/proposal.hpp"

using namespace iroha::ordering;

OnDemandConnectionManager::OnDemandConnectionManager(
std::shared_ptr<transport::OdOsNotificationFactory> factory,
CurrentPeers initial_peers,
rxcpp::observable<CurrentPeers> peers)
: factory_(std::move(factory)),
subscription_(peers.subscribe([this](const auto &peers) {
// exclusive lock
std::lock_guard<std::shared_timed_mutex> lock(mutex_);

this->initializeConnections(peers);
})) {
// using start_with(initial_peers) results in deadlock
initializeConnections(initial_peers);
}

void OnDemandConnectionManager::onTransactions(CollectionType transactions) {
// shared lock
std::shared_lock<std::shared_timed_mutex> lock(mutex_);

connections_.current_consumer->onTransactions(transactions);
connections_.previous_consumer->onTransactions(transactions);
}

boost::optional<OnDemandConnectionManager::ProposalType>
OnDemandConnectionManager::onRequestProposal(transport::RoundType round) {
// shared lock
std::shared_lock<std::shared_timed_mutex> lock(mutex_);

return connections_.issuer->onRequestProposal(round);
}

void OnDemandConnectionManager::initializeConnections(
const CurrentPeers &peers) {
auto create_assign = [this](auto &ptr, auto &peer) {
ptr = factory_->create(*peer);
};

create_assign(connections_.issuer, peers.issuer);
create_assign(connections_.current_consumer, peers.current_consumer);
create_assign(connections_.previous_consumer, peers.previous_consumer);
}
71 changes: 71 additions & 0 deletions irohad/ordering/impl/on_demand_connection_manager.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef IROHA_ON_DEMAND_CONNECTION_MANAGER_HPP
#define IROHA_ON_DEMAND_CONNECTION_MANAGER_HPP

#include "ordering/on_demand_os_transport.hpp"

#include <shared_mutex>

#include <rxcpp/rx-observable.hpp>

namespace iroha {
namespace ordering {

/**
* Proxy class which redirects requests to appropriate peers
*/
class OnDemandConnectionManager : public transport::OdOsNotification {
public:
/**
* Current peers to send transactions and request proposals
* Transactions are sent to two ordering services:
* current and previous consumers
* Proposal is requested from current ordering service: issuer
*/
struct CurrentPeers {
std::shared_ptr<shared_model::interface::Peer> issuer, current_consumer,
previous_consumer;
};

/**
* Corresponding connections created by OdOsNotificationFactory
* @see CurrentPeers for individual descriptions
*/
struct CurrentConnections {
std::unique_ptr<transport::OdOsNotification> issuer, current_consumer,
previous_consumer;
};

OnDemandConnectionManager(
std::shared_ptr<transport::OdOsNotificationFactory> factory,
CurrentPeers initial_peers,
rxcpp::observable<CurrentPeers> peers);

void onTransactions(CollectionType transactions) override;

boost::optional<ProposalType> onRequestProposal(
transport::RoundType round) override;

private:
/**
* Initialize corresponding peers in connections_ using factory_
* @param peers to initialize connections with
*/
void initializeConnections(const CurrentPeers &peers);

std::shared_ptr<transport::OdOsNotificationFactory> factory_;
rxcpp::composite_subscription subscription_;

CurrentConnections connections_;

std::shared_timed_mutex mutex_;
};

} // namespace ordering
} // namespace iroha

#endif // IROHA_ON_DEMAND_CONNECTION_MANAGER_HPP
3 changes: 1 addition & 2 deletions irohad/ordering/impl/on_demand_ordering_service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ void OnDemandOrderingServiceImpl::onCollaborationOutcome(RoundOutput outcome) {

// ----------------------------| OdOsNotification |-----------------------------

void OnDemandOrderingServiceImpl::onTransactions(
CollectionType &&transactions) {
void OnDemandOrderingServiceImpl::onTransactions(CollectionType transactions) {
// read lock
std::shared_lock<std::shared_timed_mutex> guard(lock_);
log_->info("onTransactions => collections size = {}", transactions.size());
Expand Down
2 changes: 1 addition & 1 deletion irohad/ordering/impl/on_demand_ordering_service_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace iroha {

// ----------------------- | OdOsNotification | --------------------------

void onTransactions(CollectionType &&transactions) override;
void onTransactions(CollectionType transactions) override;

boost::optional<ProposalType> onRequestProposal(
transport::RoundType round) override;
Expand Down
2 changes: 1 addition & 1 deletion irohad/ordering/impl/on_demand_os_client_grpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ OnDemandOsClientGrpc::OnDemandOsClientGrpc(
time_provider_(std::move(time_provider)),
proposal_request_timeout_(proposal_request_timeout) {}

void OnDemandOsClientGrpc::onTransactions(CollectionType &&transactions) {
void OnDemandOsClientGrpc::onTransactions(CollectionType transactions) {
proto::TransactionsCollection message;
for (auto &transaction : transactions) {
*message.add_transactions() = std::move(
Expand Down
2 changes: 1 addition & 1 deletion irohad/ordering/impl/on_demand_os_client_grpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace iroha {
std::function<TimepointType()> time_provider,
std::chrono::milliseconds proposal_request_timeout);

void onTransactions(CollectionType &&transactions) override;
void onTransactions(CollectionType transactions) override;

boost::optional<ProposalType> onRequestProposal(
transport::RoundType round) override;
Expand Down
5 changes: 3 additions & 2 deletions irohad/ordering/on_demand_os_transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ namespace iroha {
* Type of stored transactions
*/
using TransactionType =
std::unique_ptr<shared_model::interface::Transaction>;
std::shared_ptr<shared_model::interface::Transaction>;

/**
* Type of inserted collections
Expand All @@ -76,7 +76,7 @@ namespace iroha {
* Callback on receiving transactions
* @param transactions - vector of passed transactions
*/
virtual void onTransactions(CollectionType &&transactions) = 0;
virtual void onTransactions(CollectionType transactions) = 0;

/**
* Callback on request about proposal
Expand All @@ -97,6 +97,7 @@ namespace iroha {
public:
/**
* Create corresponding OdOsNotification interface for peer
* Returned pointer is guaranteed to be not equal to nullptr
* @param peer - peer to connect
* @return connection represented with OdOsNotification interface
*/
Expand Down
5 changes: 5 additions & 0 deletions test/module/irohad/ordering/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,8 @@ addtest(on_demand_os_server_grpc_test on_demand_os_server_grpc_test.cpp)
target_link_libraries(on_demand_os_server_grpc_test
on_demand_ordering_service_transport_grpc
)

addtest(on_demand_connection_manager_test on_demand_connection_manager_test.cpp)
target_link_libraries(on_demand_connection_manager_test
on_demand_connection_manager
)
117 changes: 117 additions & 0 deletions test/module/irohad/ordering/on_demand_connection_manager_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#include "ordering/impl/on_demand_connection_manager.hpp"

#include <gtest/gtest.h>
#include "interfaces/iroha_internal/proposal.hpp"
#include "module/irohad/ordering/ordering_mocks.hpp"
#include "module/shared_model/interface_mocks.hpp"

using namespace iroha::ordering;
using namespace iroha::ordering::transport;

using ::testing::ByMove;
using ::testing::Ref;
using ::testing::Return;

/**
* Create unique_ptr with MockOdOsNotification, save to var, and return it
*/
ACTION_P(CreateAndSave, var) {
auto result = std::make_unique<MockOdOsNotification>();
*var = result.get();
return std::unique_ptr<OdOsNotification>(std::move(result));
}

struct OnDemandConnectionManagerTest : public ::testing::Test {
void SetUp() override {
factory = std::make_shared<MockOdOsNotificationFactory>();

auto set = [this](auto &field, auto &ptr) {
field = std::make_shared<MockPeer>();

EXPECT_CALL(*factory, create(Ref(*field)))
.WillRepeatedly(CreateAndSave(&ptr));
};

set(cpeers.issuer, issuer);
set(cpeers.current_consumer, current_consumer);
set(cpeers.previous_consumer, previous_consumer);

manager = std::make_shared<OnDemandConnectionManager>(
factory, cpeers, peers.get_observable());
}

OnDemandConnectionManager::CurrentPeers cpeers;
MockOdOsNotification *issuer, *previous_consumer, *current_consumer;

rxcpp::subjects::subject<OnDemandConnectionManager::CurrentPeers> peers;
std::shared_ptr<MockOdOsNotificationFactory> factory;
std::shared_ptr<OnDemandConnectionManager> manager;
};

/**
* @given OnDemandConnectionManager
* @when peers observable is triggered
* @then new peers are requested from factory
*/
TEST_F(OnDemandConnectionManagerTest, FactoryUsed) {
ASSERT_NE(issuer, nullptr);
ASSERT_NE(previous_consumer, nullptr);
ASSERT_NE(current_consumer, nullptr);
}

/**
* @given initialized OnDemandConnectionManager
* @when onTransactions is called
* @then peers get data for propagation
*/
TEST_F(OnDemandConnectionManagerTest, onTransactions) {
OdOsNotification::CollectionType collection;
EXPECT_CALL(*previous_consumer, onTransactions(collection)).Times(1);
EXPECT_CALL(*current_consumer, onTransactions(collection)).Times(1);

manager->onTransactions(collection);
}

/**
* @given initialized OnDemandConnectionManager
* @when onRequestProposal is called
* AND proposal is returned
* @then peer is triggered
* AND return data is forwarded
*/
TEST_F(OnDemandConnectionManagerTest, onRequestProposal) {
RoundType round;
boost::optional<OnDemandConnectionManager::ProposalType> oproposal =
OnDemandConnectionManager::ProposalType{};
auto proposal = oproposal.value().get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't it be better to firstly check optional for ASSERT_TRUE(..)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because it is not created from component being tested, but explicitly in two lines above by initializing optional with unique pointer.

EXPECT_CALL(*issuer, onRequestProposal(round))
.WillOnce(Return(ByMove(std::move(oproposal))));

auto result = manager->onRequestProposal(round);

ASSERT_TRUE(result);
ASSERT_EQ(result.value().get(), proposal);
}

/**
* @given initialized OnDemandConnectionManager
* @when onRequestProposal is called
* AND no proposal is returned
* @then peer is triggered
* AND return data is forwarded
*/
TEST_F(OnDemandConnectionManagerTest, onRequestProposalNone) {
RoundType round;
boost::optional<OnDemandConnectionManager::ProposalType> oproposal;
EXPECT_CALL(*issuer, onRequestProposal(round))
.WillOnce(Return(ByMove(std::move(oproposal))));

auto result = manager->onRequestProposal(round);

ASSERT_FALSE(result);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ TEST_F(OnDemandOsServerGrpcTest, SendTransactions) {
OdOsNotification::CollectionType collection;
auto creator = "test";

EXPECT_CALL(*notification, doOnTransactions(_))
EXPECT_CALL(*notification, onTransactions(_))
.WillOnce(SaveArg0Move(&collection));
proto::TransactionsCollection request;
request.add_transactions()
Expand Down
8 changes: 2 additions & 6 deletions test/module/irohad/ordering/ordering_mocks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,13 @@ namespace iroha {
namespace transport {

struct MockOdOsNotification : public OdOsNotification {
MOCK_METHOD1(doOnTransactions, void(CollectionType &transactions));

void onTransactions(CollectionType &&transactions) {
doOnTransactions(transactions);
}
MOCK_METHOD1(onTransactions, void(CollectionType transactions));

MOCK_METHOD1(onRequestProposal,
boost::optional<ProposalType>(RoundType round));
};

class MockOdOsNotificationFactory : public OdOsNotificationFactory {
struct MockOdOsNotificationFactory : public OdOsNotificationFactory {
MOCK_METHOD1(create,
std::unique_ptr<OdOsNotification>(
const shared_model::interface::Peer &to));
Expand Down
7 changes: 7 additions & 0 deletions test/module/shared_model/interface_mocks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#define IROHA_INTERFACE_MOCKS_HPP

#include <gmock/gmock.h>
#include "interfaces/common_objects/peer.hpp"
#include "interfaces/iroha_internal/block.hpp"
#include "interfaces/iroha_internal/proposal.hpp"
#include "interfaces/transaction.hpp"
Expand Down Expand Up @@ -59,4 +60,10 @@ struct MockProposal : public iface::Proposal {
MOCK_CONST_METHOD0(clone, MockProposal *());
};

struct MockPeer : public iface::Peer {
MOCK_CONST_METHOD0(address, const iface::types::AddressType &());
MOCK_CONST_METHOD0(pubkey, const iface::types::PubkeyType &());
MOCK_CONST_METHOD0(clone, MockPeer *());
};

#endif // IROHA_INTERFACE_MOCKS_HPP