From 9b40cb022701263442cb7b509921eeb1a59e6fad Mon Sep 17 00:00:00 2001 From: Kitsu Date: Wed, 18 Jul 2018 02:31:33 +0300 Subject: [PATCH 1/2] Introduce lockless StatusBus - Add StatusBusImpl - Add MockStatusBus - Introduce boost::thread - Use proper synchronization for recieving stateless statuses in ITF::sendTx - Fix cache issue in CommandService - Update dockerfiles - Refactor CommandService with makeResponse Signed-off-by: Kitsu --- cmake/dependencies.cmake | 1 + docker/dependencies/Dockerfile | 2 +- docker/develop/Dockerfile | 2 +- irohad/main/application.cpp | 21 +++- irohad/main/application.hpp | 4 + irohad/torii/CMakeLists.txt | 8 ++ irohad/torii/command_service.hpp | 51 +++------ irohad/torii/impl/command_service.cpp | 85 +++++++-------- irohad/torii/impl/status_bus_impl.cpp | 21 ++++ irohad/torii/impl/status_bus_impl.hpp | 32 ++++++ irohad/torii/processor/CMakeLists.txt | 1 + .../impl/transaction_processor_impl.cpp | 43 +++----- .../torii/processor/transaction_processor.hpp | 8 -- .../processor/transaction_processor_impl.hpp | 11 +- irohad/torii/status_bus.hpp | 40 +++++++ .../integration_test_framework.cpp | 22 ++++ .../integration_framework/test_irohad.hpp | 4 + test/module/iroha-cli/client_test.cpp | 12 +- .../processor/transaction_processor_test.cpp | 103 ++++++++---------- test/module/irohad/torii/torii_mocks.hpp | 29 +++-- .../irohad/torii/torii_service_test.cpp | 31 +++--- 21 files changed, 303 insertions(+), 228 deletions(-) create mode 100644 irohad/torii/impl/status_bus_impl.cpp create mode 100644 irohad/torii/impl/status_bus_impl.hpp create mode 100644 irohad/torii/status_bus.hpp diff --git a/cmake/dependencies.cmake b/cmake/dependencies.cmake index f849b4e5ec..cc869ff288 100644 --- a/cmake/dependencies.cmake +++ b/cmake/dependencies.cmake @@ -71,6 +71,7 @@ find_package(Boost 1.65.0 REQUIRED COMPONENTS filesystem system + thread ) add_library(boost INTERFACE IMPORTED) set_target_properties(boost PROPERTIES diff --git a/docker/dependencies/Dockerfile b/docker/dependencies/Dockerfile index 040e8be607..d9abf7bfb7 100644 --- a/docker/dependencies/Dockerfile +++ b/docker/dependencies/Dockerfile @@ -49,7 +49,7 @@ RUN set -e; \ git clone https://github.com/boostorg/boost /tmp/boost; \ (cd /tmp/boost ; git checkout 436ad1dfcfc7e0246141beddd11c8a4e9c10b146); \ (cd /tmp/boost ; git submodule update --init --recursive); \ - (cd /tmp/boost ; /tmp/boost/bootstrap.sh --with-libraries=system,filesystem); \ + (cd /tmp/boost ; /tmp/boost/bootstrap.sh --with-libraries=system,filesystem,thread); \ (cd /tmp/boost ; /tmp/boost/b2 headers); \ (cd /tmp/boost ; /tmp/boost/b2 cxxflags="-std=c++14" -j ${PARALLELISM} install --prefix=/opt/dependencies/boost); \ ldconfig; \ diff --git a/docker/develop/Dockerfile b/docker/develop/Dockerfile index ab85deeca7..0f584feeaa 100644 --- a/docker/develop/Dockerfile +++ b/docker/develop/Dockerfile @@ -49,7 +49,7 @@ RUN set -e; \ git clone https://github.com/boostorg/boost /tmp/boost; \ (cd /tmp/boost ; git checkout 436ad1dfcfc7e0246141beddd11c8a4e9c10b146); \ (cd /tmp/boost ; git submodule update --init --recursive); \ - (cd /tmp/boost ; /tmp/boost/bootstrap.sh --with-libraries=system,filesystem); \ + (cd /tmp/boost ; /tmp/boost/bootstrap.sh --with-libraries=system,filesystem,thread); \ (cd /tmp/boost ; /tmp/boost/b2 headers); \ (cd /tmp/boost ; /tmp/boost/b2 cxxflags="-std=c++14" -j ${PARALLELISM} install); \ ldconfig; \ diff --git a/irohad/main/application.cpp b/irohad/main/application.cpp index dc44bfc0f2..4a0f56ac2c 100644 --- a/irohad/main/application.cpp +++ b/irohad/main/application.cpp @@ -15,6 +15,7 @@ #include "multi_sig_transactions/mst_time_provider_impl.hpp" #include "multi_sig_transactions/storage/mst_storage_impl.hpp" #include "multi_sig_transactions/transport/mst_transport_grpc.hpp" +#include "torii/impl/status_bus_impl.hpp" #include "validators/field_validator.hpp" using namespace iroha; @@ -74,6 +75,7 @@ void Irohad::init() { initConsensusGate(); initSynchronizer(); initPeerCommunicationService(); + initStatusBus(); initMstProcessor(); // Torii @@ -233,6 +235,11 @@ void Irohad::initPeerCommunicationService() { log_->info("[Init] => pcs"); } +void Irohad::initStatusBus() { + status_bus_ = std::make_shared(); + log_->info("[Init] => Tx status bus"); +} + void Irohad::initMstProcessor() { if (is_mst_supported_) { auto mst_transport = std::make_shared(); @@ -257,11 +264,15 @@ void Irohad::initMstProcessor() { * Initializing transaction command service */ void Irohad::initTransactionCommandService() { - auto tx_processor = - std::make_shared(pcs, mst_processor); - - command_service = std::make_shared<::torii::CommandService>( - tx_processor, storage, std::chrono::seconds(1), 2 * proposal_delay_); + auto tx_processor = std::make_shared( + pcs, mst_processor, status_bus_); + + command_service = + std::make_shared<::torii::CommandService>(tx_processor, + storage, + status_bus_, + std::chrono::seconds(1), + 2 * proposal_delay_); log_->info("[Init] => command service"); } diff --git a/irohad/main/application.hpp b/irohad/main/application.hpp index bcd4e99429..9cafb7ca8d 100644 --- a/irohad/main/application.hpp +++ b/irohad/main/application.hpp @@ -133,6 +133,8 @@ class Irohad { virtual void initPeerCommunicationService(); + virtual void initStatusBus(); + virtual void initMstProcessor(); virtual void initTransactionCommandService(); @@ -188,6 +190,8 @@ class Irohad { // mst std::shared_ptr mst_processor; + std::shared_ptr status_bus_; + // transaction service std::shared_ptr command_service; diff --git a/irohad/torii/CMakeLists.txt b/irohad/torii/CMakeLists.txt index 6f5ad86cd6..88099d588d 100644 --- a/irohad/torii/CMakeLists.txt +++ b/irohad/torii/CMakeLists.txt @@ -38,3 +38,11 @@ target_link_libraries(torii_service shared_model_stateless_validation processors ) + +add_library(status_bus + impl/status_bus_impl.cpp + ) +target_link_libraries(status_bus + rxcpp + shared_model_interfaces + ) diff --git a/irohad/torii/command_service.hpp b/irohad/torii/command_service.hpp index 4c6da721e8..03c2233272 100644 --- a/irohad/torii/command_service.hpp +++ b/irohad/torii/command_service.hpp @@ -1,18 +1,6 @@ /** - * Copyright Soramitsu Co., Ltd. 2018 All Rights Reserved. - * http://soramitsu.co.jp - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 */ #ifndef TORII_COMMAND_SERVICE_HPP @@ -29,6 +17,7 @@ #include "endpoint.pb.h" #include "logger/logger.hpp" #include "torii/processor/transaction_processor.hpp" +#include "torii/status_bus.hpp" namespace torii { /** @@ -40,12 +29,14 @@ namespace torii { * Creates a new instance of CommandService * @param tx_processor - processor of received transactions * @param storage - to query transactions outside the cache + * @param status_bus is a common notifier for tx statuses * @param initial_timeout - streaming timeout when tx is not received * @param nonfinal_timeout - streaming timeout when tx is being processed */ CommandService( std::shared_ptr tx_processor, std::shared_ptr storage, + std::shared_ptr status_bus, std::chrono::milliseconds initial_timeout, std::chrono::milliseconds nonfinal_timeout); @@ -150,9 +141,15 @@ namespace torii { inline void handleEvents(rxcpp::composite_subscription &subscription, rxcpp::schedulers::run_loop &run_loop); - void addTxToCacheAndLog(const std::string &who, - const shared_model::crypto::Hash &hash, - const iroha::protocol::ToriiResponse &response); + /** + * Share tx status and log it + * @param who identifier for the logging + * @param hash of the tx + * @param response to be pushed + */ + void pushStatus(const std::string &who, + const shared_model::crypto::Hash &hash, + const iroha::protocol::ToriiResponse &response); private: using CacheType = iroha::cache::Cache tx_processor_; std::shared_ptr storage_; + std::shared_ptr status_bus_; std::chrono::milliseconds initial_timeout_; std::chrono::milliseconds nonfinal_timeout_; std::shared_ptr cache_; - /** - * Mutex for propagating stateless validation status - */ - std::mutex stateless_tx_status_notifier_mutex_; - - /** - * Subject with stateless validation statuses - */ - rxcpp::subjects::subject< - std::shared_ptr> - stateless_notifier_; - - /** - * Observable with all transaction statuses - */ - rxcpp::observable< - std::shared_ptr> - responses_; - logger::Logger log_; }; diff --git a/irohad/torii/impl/command_service.cpp b/irohad/torii/impl/command_service.cpp index 101e237b86..42e109d6ce 100644 --- a/irohad/torii/impl/command_service.cpp +++ b/irohad/torii/impl/command_service.cpp @@ -36,23 +36,18 @@ namespace torii { CommandService::CommandService( std::shared_ptr tx_processor, std::shared_ptr storage, + std::shared_ptr status_bus, std::chrono::milliseconds initial_timeout, std::chrono::milliseconds nonfinal_timeout) : tx_processor_(tx_processor), storage_(storage), + status_bus_(status_bus), initial_timeout_(initial_timeout), nonfinal_timeout_(nonfinal_timeout), cache_(std::make_shared()), - // merge with mutex, since notifications can be made from different - // threads - // TODO 11.07.2018 andrei rework status handling with event bus IR-1517 - responses_(tx_processor_->transactionNotifier().merge( - rxcpp::serialize_one_worker( - rxcpp::schedulers::make_current_thread()), - stateless_notifier_.get_observable())), log_(logger::log("CommandService")) { // Notifier for all clients - responses_.subscribe([this](auto iroha_response) { + status_bus_->statuses().subscribe([this](auto iroha_response) { // find response for this tx in cache; if status of received response // isn't "greater" than cached one, dismiss received one auto proto_response = @@ -69,6 +64,17 @@ namespace torii { }); } + namespace { + iroha::protocol::ToriiResponse makeResponse( + const shared_model::crypto::Hash &h, + const iroha::protocol::TxStatus &status) { + iroha::protocol::ToriiResponse response; + response.set_tx_hash(shared_model::crypto::toBinaryString(h)); + response.set_tx_status(status); + return response; + } + } // namespace + void CommandService::Torii(const iroha::protocol::Transaction &request) { shared_model::proto::TransportBuilder< shared_model::proto::Transaction, @@ -86,20 +92,17 @@ namespace torii { return; } - // setting response - iroha::protocol::ToriiResponse response; - response.set_tx_hash( - shared_model::crypto::toBinaryString(tx_hash)); - response.set_tx_status( - iroha::protocol::TxStatus::STATELESS_VALIDATION_SUCCESS); - // Send transaction to iroha tx_processor_->transactionHandle( std::make_shared( std::move(iroha_tx.value))); - this->addTxToCacheAndLog( - "Torii", std::move(tx_hash), std::move(response)); + this->pushStatus( + "Torii", + std::move(tx_hash), + makeResponse( + tx_hash, + iroha::protocol::TxStatus::STATELESS_VALIDATION_SUCCESS)); }, [this, &request](const auto &error) { // getting hash from invalid transaction @@ -113,14 +116,12 @@ namespace torii { tx_hash.hex()); // setting response - iroha::protocol::ToriiResponse response; - response.set_tx_hash( - shared_model::crypto::toBinaryString(tx_hash)); - response.set_tx_status( + auto response = makeResponse( + tx_hash, iroha::protocol::TxStatus::STATELESS_VALIDATION_FAILED); response.set_error_message(std::move(error.error)); - this->addTxToCacheAndLog( + this->pushStatus( "Torii", std::move(tx_hash), std::move(response)); }); } @@ -145,18 +146,15 @@ namespace torii { return; } - // setting response - iroha::protocol::ToriiResponse response; - response.set_tx_hash( - shared_model::crypto::toBinaryString(tx_hash)); - response.set_tx_status( - iroha::protocol::TxStatus::STATELESS_VALIDATION_SUCCESS); - // Send transaction to iroha tx_processor_->transactionHandle(tx); - this->addTxToCacheAndLog( - "ToriiList", std::move(tx_hash), std::move(response)); + this->pushStatus( + "ToriiList", + std::move(tx_hash), + makeResponse(tx_hash, + iroha::protocol::TxStatus:: + STATELESS_VALIDATION_SUCCESS)); }); }, [this, &tx_list](auto &error) { @@ -189,14 +187,12 @@ namespace torii { shared_model::crypto::DefaultHashProvider::makeHash( shared_model::proto::makeBlob(tx.payload())); - iroha::protocol::ToriiResponse response; - response.set_tx_hash( - shared_model::crypto::toBinaryString(hash)); - response.set_tx_status( + auto response = makeResponse( + hash, iroha::protocol::TxStatus::STATELESS_VALIDATION_FAILED); response.set_error_message(sequence_error); - this->addTxToCacheAndLog( + this->pushStatus( "ToriiList", std::move(hash), std::move(response)); }); }); @@ -225,16 +221,15 @@ namespace torii { response.CopyFrom(*resp); } else { response.set_tx_hash(request.tx_hash()); - if (storage_->getBlockQuery()->hasTxWithHash( - shared_model::crypto::Hash(request.tx_hash()))) { + auto hash = shared_model::crypto::Hash(request.tx_hash()); + if (storage_->getBlockQuery()->hasTxWithHash(hash)) { response.set_tx_status(iroha::protocol::TxStatus::COMMITTED); + cache_->addItem(std::move(hash), response); } else { log_->warn("Asked non-existing tx: {}", iroha::bytestringToHexstring(request.tx_hash())); response.set_tx_status(iroha::protocol::TxStatus::NOT_RECEIVED); } - this->addTxToCacheAndLog( - "Status", std::move(tx_hash), std::move(response)); } } @@ -274,7 +269,8 @@ namespace torii { .build() .getTransport(); }()))); - return responses_ + return status_bus_ + ->statuses() // prepend initial status .start_with(initial_status) // select statuses with requested hash @@ -368,7 +364,7 @@ namespace torii { } } - void CommandService::addTxToCacheAndLog( + void CommandService::pushStatus( const std::string &who, const shared_model::crypto::Hash &hash, const iroha::protocol::ToriiResponse &response) { @@ -376,10 +372,7 @@ namespace torii { who, hash.hex(), response.tx_status()); - // transactions can be handled from multiple threads, therefore a lock is - // required - std::lock_guard lock(stateless_tx_status_notifier_mutex_); - stateless_notifier_.get_subscriber().on_next( + status_bus_->publish( std::make_shared( std::move(response))); } diff --git a/irohad/torii/impl/status_bus_impl.cpp b/irohad/torii/impl/status_bus_impl.cpp new file mode 100644 index 0000000000..e24a2edf8e --- /dev/null +++ b/irohad/torii/impl/status_bus_impl.cpp @@ -0,0 +1,21 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "torii/impl/status_bus_impl.hpp" + +namespace iroha { + namespace torii { + StatusBusImpl::StatusBusImpl(rxcpp::observe_on_one_worker worker) + : worker_(worker), subject_(worker_) {} + + void StatusBusImpl::publish(StatusBus::Objects resp) { + subject_.get_subscriber().on_next(resp); + } + + rxcpp::observable StatusBusImpl::statuses() { + return subject_.get_observable(); + } + } // namespace torii +} // namespace iroha diff --git a/irohad/torii/impl/status_bus_impl.hpp b/irohad/torii/impl/status_bus_impl.hpp new file mode 100644 index 0000000000..7bb4ed37d3 --- /dev/null +++ b/irohad/torii/impl/status_bus_impl.hpp @@ -0,0 +1,32 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef TORII_STATUS_BUS_IMPL +#define TORII_STATUS_BUS_IMPL + +#include "torii/status_bus.hpp" + +namespace iroha { + namespace torii { + /** + * StatusBus implementation + */ + class StatusBusImpl : public StatusBus { + public: + StatusBusImpl( + rxcpp::observe_on_one_worker worker = rxcpp::observe_on_new_thread()); + + void publish(StatusBus::Objects) override; + /// Subscribers will be invoked in separate thread + rxcpp::observable statuses() override; + + rxcpp::observe_on_one_worker worker_; + rxcpp::subjects::synchronize + subject_; + }; + } // namespace torii +} // namespace iroha + +#endif // TORII_STATUS_BUS_IMPL diff --git a/irohad/torii/processor/CMakeLists.txt b/irohad/torii/processor/CMakeLists.txt index 7dc7685717..d87aba6fbc 100644 --- a/irohad/torii/processor/CMakeLists.txt +++ b/irohad/torii/processor/CMakeLists.txt @@ -10,4 +10,5 @@ target_link_libraries(processors PUBLIC mst_processor shared_model_proto_builders query_execution + status_bus ) diff --git a/irohad/torii/processor/impl/transaction_processor_impl.cpp b/irohad/torii/processor/impl/transaction_processor_impl.cpp index 332bec52de..5762d0f37b 100644 --- a/irohad/torii/processor/impl/transaction_processor_impl.cpp +++ b/irohad/torii/processor/impl/transaction_processor_impl.cpp @@ -1,18 +1,6 @@ /** - * Copyright Soramitsu Co., Ltd. 2017 All Rights Reserved. - * http://soramitsu.co.jp - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 */ #include "torii/processor/transaction_processor_impl.hpp" @@ -50,10 +38,12 @@ namespace iroha { TransactionProcessorImpl::TransactionProcessorImpl( std::shared_ptr pcs, - std::shared_ptr mst_processor) - : pcs_(std::move(pcs)), mst_processor_(std::move(mst_processor)) { - log_ = logger::log("TxProcessor"); - + std::shared_ptr mst_processor, + std::shared_ptr status_bus) + : pcs_(std::move(pcs)), + mst_processor_(std::move(mst_processor)), + status_bus_(std::move(status_bus)), + log_(logger::log("TxProcessor")) { // notify about stateless success pcs_->on_proposal().subscribe([this](auto model_proposal) { for (const auto &tx : model_proposal->transactions()) { @@ -61,8 +51,7 @@ namespace iroha { log_->info("on proposal stateless success: {}", hash.hex()); // different on_next() calls (this one and below) can happen in // different threads and we don't expect emitting them concurrently - std::lock_guard lock(notifier_mutex_); - notifier_.get_subscriber().on_next( + status_bus_->publish( shared_model::builder::DefaultTransactionStatusBuilder() .statelessValidationSuccess() .txHash(hash) @@ -80,7 +69,7 @@ namespace iroha { for (const auto &tx_error : errors) { auto error_msg = composeErrorMessage(tx_error); log_->info(error_msg); - notifier_.get_subscriber().on_next( + status_bus_->publish( shared_model::builder::DefaultTransactionStatusBuilder() .statefulValidationFailed() .txHash(tx_error.second) @@ -92,7 +81,7 @@ namespace iroha { proposal_and_errors->first->transactions()) { log_->info("on stateful validation success: {}", successful_tx.hash().hex()); - notifier_.get_subscriber().on_next( + status_bus_->publish( shared_model::builder::DefaultTransactionStatusBuilder() .statefulValidationSuccess() .txHash(successful_tx.hash()) @@ -119,7 +108,7 @@ namespace iroha { std::lock_guard lock(notifier_mutex_); for (const auto &tx_hash : current_txs_hashes_) { log_->info("on commit committed: {}", tx_hash.hex()); - notifier_.get_subscriber().on_next( + status_bus_->publish( shared_model::builder::DefaultTransactionStatusBuilder() .committed() .txHash(tx_hash) @@ -137,7 +126,7 @@ namespace iroha { mst_processor_->onExpiredTransactions().subscribe([this](auto &&tx) { log_->info("MST tx expired"); std::lock_guard lock(notifier_mutex_); - this->notifier_.get_subscriber().on_next( + this->status_bus_->publish( shared_model::builder::DefaultTransactionStatusBuilder() .mstExpired() .txHash(tx->hash()) @@ -176,11 +165,5 @@ namespace iroha { } } - rxcpp::observable< - std::shared_ptr> - TransactionProcessorImpl::transactionNotifier() { - return notifier_.get_observable(); - } - } // namespace torii } // namespace iroha diff --git a/irohad/torii/processor/transaction_processor.hpp b/irohad/torii/processor/transaction_processor.hpp index e40b9dbfc1..2f48408c49 100644 --- a/irohad/torii/processor/transaction_processor.hpp +++ b/irohad/torii/processor/transaction_processor.hpp @@ -54,14 +54,6 @@ namespace iroha { const shared_model::interface::TransactionSequence &transaction_sequence) const = 0; - /** - * Subscribers will be notified with transaction status - * @return observable for subscribing - */ - virtual rxcpp::observable< - std::shared_ptr> - transactionNotifier() = 0; - virtual ~TransactionProcessor() = default; }; } // namespace torii diff --git a/irohad/torii/processor/transaction_processor_impl.hpp b/irohad/torii/processor/transaction_processor_impl.hpp index 7a2ff0c53e..4a249f573c 100644 --- a/irohad/torii/processor/transaction_processor_impl.hpp +++ b/irohad/torii/processor/transaction_processor_impl.hpp @@ -25,6 +25,7 @@ #include "multi_sig_transactions/mst_processor.hpp" #include "network/peer_communication_service.hpp" #include "torii/processor/transaction_processor.hpp" +#include "torii/status_bus.hpp" namespace iroha { namespace torii { @@ -33,10 +34,12 @@ namespace iroha { /** * @param pcs - provide information proposals and commits * @param mst_processor is a handler for multisignature transactions + * @param status_bus is a common notifier for tx statuses */ TransactionProcessorImpl( std::shared_ptr pcs, - std::shared_ptr mst_processor); + std::shared_ptr mst_processor, + std::shared_ptr status_bus); void transactionHandle( std::shared_ptr transaction) @@ -46,10 +49,6 @@ namespace iroha { const shared_model::interface::TransactionSequence &transaction_sequence) const override; - rxcpp::observable< - std::shared_ptr> - transactionNotifier() override; - private: // connections std::shared_ptr pcs_; @@ -58,6 +57,8 @@ namespace iroha { std::shared_ptr mst_processor_; std::vector current_txs_hashes_; + std::shared_ptr status_bus_; + // internal rxcpp::subjects::subject< std::shared_ptr> diff --git a/irohad/torii/status_bus.hpp b/irohad/torii/status_bus.hpp new file mode 100644 index 0000000000..35f8c41bba --- /dev/null +++ b/irohad/torii/status_bus.hpp @@ -0,0 +1,40 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef TORII_STATUS_BUS +#define TORII_STATUS_BUS + +#include +#include "interfaces/transaction_responses/tx_response.hpp" + +namespace iroha { + namespace torii { + /** + * Interface of bus for transaction statuses + */ + class StatusBus { + public: + virtual ~StatusBus() = default; + + /// Objects that represent status to operate with + using Objects = + std::shared_ptr; + + /** + * Shares object among the bus subscribers + * @param object to share + * note: guaranteed to be non-blocking call + */ + virtual void publish(Objects) = 0; + + /** + * @return observable over objects in bus + */ + virtual rxcpp::observable statuses() = 0; + }; + } // namespace torii +} // namespace iroha + +#endif // TORII_STATUS_BUS diff --git a/test/framework/integration_framework/integration_test_framework.cpp b/test/framework/integration_framework/integration_test_framework.cpp index c89171a81a..412102cb3b 100644 --- a/test/framework/integration_framework/integration_test_framework.cpp +++ b/test/framework/integration_framework/integration_test_framework.cpp @@ -19,6 +19,8 @@ #include +#include + #include "backend/protobuf/block.hpp" #include "backend/protobuf/queries/proto_query.hpp" #include "backend/protobuf/query_responses/proto_query_response.hpp" @@ -179,10 +181,30 @@ namespace integration_framework { std::function validation) { log_->info("send transaction"); + + // Required for StatusBus synchronization + boost::barrier bar1(2); + auto bar2 = std::make_shared(2); + iroha_instance_->instance_->getStatusBus() + ->statuses() + .filter([&](auto s) { return s->transactionHash() == tx.hash(); }) + .take(1) + .subscribe([&bar1, b2 = std::weak_ptr(bar2)](auto s) { + bar1.wait(); + if (auto lock = b2.lock()) { + lock->wait(); + } + }); + iroha_instance_->getIrohaInstance()->getCommandService()->Torii( tx.getTransport()); + // make sure that the first (stateless) status is come + bar1.wait(); // fetch status of transaction shared_model::proto::TransactionResponse status = getTxStatus(tx.hash()); + // make sure that the following statuses (stateful/commited) + // isn't reached the bus yet + bar2->wait(); // check validation function validation(status); diff --git a/test/framework/integration_framework/test_irohad.hpp b/test/framework/integration_framework/test_irohad.hpp index 25c324d9d8..214a7a1735 100644 --- a/test/framework/integration_framework/test_irohad.hpp +++ b/test/framework/integration_framework/test_irohad.hpp @@ -64,6 +64,10 @@ namespace integration_framework { return crypto_signer_; } + auto getStatusBus() { + return status_bus_; + } + void run() override { internal_server = std::make_unique( "127.0.0.1:" + std::to_string(internal_port_)); diff --git a/test/module/iroha-cli/client_test.cpp b/test/module/iroha-cli/client_test.cpp index fe522e5902..0e08a1f9b8 100644 --- a/test/module/iroha-cli/client_test.cpp +++ b/test/module/iroha-cli/client_test.cpp @@ -19,6 +19,7 @@ #include "execution/query_execution_impl.hpp" #include "main/server_runner.hpp" #include "torii/command_service.hpp" +#include "torii/impl/status_bus_impl.hpp" #include "torii/processor/query_processor_impl.hpp" #include "torii/processor/transaction_processor_impl.hpp" #include "torii/query_service.hpp" @@ -74,8 +75,10 @@ class ClientServerTest : public testing::Test { EXPECT_CALL(*mst, onExpiredTransactionsImpl()) .WillRepeatedly(Return(mst_expired_notifier.get_observable())); + auto status_bus = std::make_shared(); auto tx_processor = - std::make_shared(pcsMock, mst); + std::make_shared( + pcsMock, mst, status_bus); auto pb_tx_factory = std::make_shared(); @@ -89,8 +92,11 @@ class ClientServerTest : public testing::Test { //----------- Server run ---------------- runner - ->append(std::make_unique( - tx_processor, storage, initial_timeout, nonfinal_timeout)) + ->append(std::make_unique(tx_processor, + storage, + status_bus, + initial_timeout, + nonfinal_timeout)) .append(std::make_unique(qpi)) .run() .match( diff --git a/test/module/irohad/torii/processor/transaction_processor_test.cpp b/test/module/irohad/torii/processor/transaction_processor_test.cpp index 4765b570a0..69a3d4c15b 100644 --- a/test/module/irohad/torii/processor/transaction_processor_test.cpp +++ b/test/module/irohad/torii/processor/transaction_processor_test.cpp @@ -13,9 +13,11 @@ #include "interfaces/iroha_internal/transaction_sequence.hpp" #include "module/irohad/multi_sig_transactions/mst_mocks.hpp" #include "module/irohad/network/network_mocks.hpp" +#include "module/irohad/torii/torii_mocks.hpp" #include "module/shared_model/builders/protobuf/test_block_builder.hpp" #include "module/shared_model/builders/protobuf/test_proposal_builder.hpp" #include "module/shared_model/builders/protobuf/test_transaction_builder.hpp" +#include "torii/impl/status_bus_impl.hpp" #include "torii/processor/transaction_processor_impl.hpp" using namespace iroha; @@ -45,7 +47,8 @@ class TransactionProcessorTest : public ::testing::Test { EXPECT_CALL(*mp, onExpiredTransactionsImpl()) .WillRepeatedly(Return(mst_expired_notifier.get_observable())); - tp = std::make_shared(pcs, mp); + status_bus = std::make_shared(); + tp = std::make_shared(pcs, mp, status_bus); } auto base_tx() { @@ -82,6 +85,7 @@ class TransactionProcessorTest : public ::testing::Test { rxcpp::subjects::subject mst_expired_notifier; std::shared_ptr pcs; + std::shared_ptr status_bus; std::shared_ptr tp; std::shared_ptr mp; @@ -115,11 +119,11 @@ TEST_F(TransactionProcessorTest, TransactionProcessorOnProposalTest) { status_builder.notReceived().txHash(tx.hash()).build(); } - auto wrapper = - make_test_subscriber(tp->transactionNotifier(), proposal_size); - wrapper.subscribe([this](auto response) { - status_map[response->transactionHash()] = response; - }); + EXPECT_CALL(*status_bus, publish(_)) + .Times(proposal_size) + .WillRepeatedly(testing::Invoke([this](auto response) { + status_map[response->transactionHash()] = response; + })); EXPECT_CALL(*mp, propagateTransactionImpl(_)).Times(0); EXPECT_CALL(*pcs, propagate_transaction(_)).Times(txs.size()); @@ -136,8 +140,6 @@ TEST_F(TransactionProcessorTest, TransactionProcessorOnProposalTest) { prop_notifier.get_subscriber().on_next(proposal); prop_notifier.get_subscriber().on_completed(); - ASSERT_TRUE(wrapper.validate()); - SCOPED_TRACE("Stateless valid status verification"); validateStatuses(txs); } @@ -160,11 +162,11 @@ TEST_F(TransactionProcessorTest, TransactionProcessorOnProposalBatchTest) { auto transactions = framework::batch::createValidBatch(proposal_size).transactions(); - auto wrapper = - make_test_subscriber(tp->transactionNotifier(), proposal_size); - wrapper.subscribe([this](auto response) { - status_map[response->transactionHash()] = response; - }); + EXPECT_CALL(*status_bus, publish(_)) + .Times(proposal_size) + .WillRepeatedly(testing::Invoke([this](auto response) { + status_map[response->transactionHash()] = response; + })); auto transaction_sequence_result = shared_model::interface::TransactionSequence::createTransactionSequence( @@ -195,8 +197,6 @@ TEST_F(TransactionProcessorTest, TransactionProcessorOnProposalBatchTest) { prop_notifier.get_subscriber().on_next(proposal); prop_notifier.get_subscriber().on_completed(); - ASSERT_TRUE(wrapper.validate()); - SCOPED_TRACE("Stateless valid status verification"); validateStatuses( proto_transactions); @@ -217,13 +217,11 @@ TEST_F(TransactionProcessorTest, TransactionProcessorBlockCreatedTest) { status_builder.notReceived().txHash(tx.hash()).build(); } - auto wrapper = make_test_subscriber( - tp->transactionNotifier(), - txs.size() * 2); // every transaction is notified that it is stateless - // valid and then stateful valid - wrapper.subscribe([this](auto response) { - status_map[response->transactionHash()] = response; - }); + EXPECT_CALL(*status_bus, publish(_)) + .Times(txs.size() * 2) + .WillRepeatedly(testing::Invoke([this](auto response) { + status_map[response->transactionHash()] = response; + })); EXPECT_CALL(*mp, propagateTransactionImpl(_)).Times(0); EXPECT_CALL(*pcs, propagate_transaction(_)).Times(txs.size()); @@ -258,8 +256,6 @@ TEST_F(TransactionProcessorTest, TransactionProcessorBlockCreatedTest) { // Note blocks_notifier hasn't invoked on_completed, so // transactions are not commited - ASSERT_TRUE(wrapper.validate()); - SCOPED_TRACE("Stateful valid status verification"); validateStatuses(txs); } @@ -280,14 +276,11 @@ TEST_F(TransactionProcessorTest, TransactionProcessorOnCommitTest) { status_builder.notReceived().txHash(tx.hash()).build(); } - auto wrapper = make_test_subscriber( - tp->transactionNotifier(), - txs.size() * 3); // evey transaction is notified that it is first - // stateless valid, then stateful valid and - // eventually committed - wrapper.subscribe([this](auto response) { - status_map[response->transactionHash()] = response; - }); + EXPECT_CALL(*status_bus, publish(_)) + .Times(txs.size() * 3) + .WillRepeatedly(testing::Invoke([this](auto response) { + status_map[response->transactionHash()] = response; + })); EXPECT_CALL(*mp, propagateTransactionImpl(_)).Times(0); EXPECT_CALL(*pcs, propagate_transaction(_)).Times(txs.size()); @@ -316,8 +309,6 @@ TEST_F(TransactionProcessorTest, TransactionProcessorOnCommitTest) { std::shared_ptr(clone(block))); commit_notifier.get_subscriber().on_next(single_commit); - ASSERT_TRUE(wrapper.validate()); - SCOPED_TRACE("Committed status verification"); validateStatuses(txs); } @@ -349,20 +340,18 @@ TEST_F(TransactionProcessorTest, TransactionProcessorInvalidTxsTest) { status_builder.notReceived().txHash(tx.hash()).build(); } - auto wrapper = make_test_subscriber( - tp->transactionNotifier(), - proposal_size * 2 - + block_size); // For all transactions from proposal - // transaction notifier will notified - // twice (first that they are stateless - // valid and second that they either - // passed or not stateful validation) - // Plus all transactions from block will - // be committed and corresponding status will be sent - - wrapper.subscribe([this](auto response) { - status_map[response->transactionHash()] = response; - }); + // For all transactions from proposal + // transaction will be published twice + // (first that they are stateless + // valid and second that they either + // passed or not stateful validation) + // Plus all transactions from block will + // be committed and corresponding status will be sent + EXPECT_CALL(*status_bus, publish(_)) + .Times(proposal_size * 2 + block_size) + .WillRepeatedly(testing::Invoke([this](auto response) { + status_map[response->transactionHash()] = response; + })); auto proposal = std::make_shared( TestProposalBuilder() @@ -391,7 +380,6 @@ TEST_F(TransactionProcessorTest, TransactionProcessorInvalidTxsTest) { Commit single_commit = rxcpp::observable<>::just( std::shared_ptr(clone(block))); commit_notifier.get_subscriber().on_next(single_commit); - ASSERT_TRUE(wrapper.validate()); { SCOPED_TRACE("Stateful invalid status verification"); @@ -462,16 +450,13 @@ TEST_F(TransactionProcessorTest, MultisigExpired) { shared_model::crypto::DefaultCryptoAlgorithmType:: generateKeypair()) .finish()); - - auto wrapper = make_test_subscriber(tp->transactionNotifier(), 1); - wrapper.subscribe([](auto response) { - ASSERT_NO_THROW( - boost::apply_visitor(framework::SpecifiedVisitor< - shared_model::interface::MstExpiredResponse>(), - response->get())); - }); + EXPECT_CALL(*status_bus, publish(_)) + .WillOnce(testing::Invoke([](auto response) { + ASSERT_NO_THROW(boost::apply_visitor( + framework::SpecifiedVisitor< + shared_model::interface::MstExpiredResponse>(), + response->get())); + })); tp->transactionHandle(tx); mst_expired_notifier.get_subscriber().on_next(tx); - - ASSERT_TRUE(wrapper.validate()); } diff --git a/test/module/irohad/torii/torii_mocks.hpp b/test/module/irohad/torii/torii_mocks.hpp index b30968fbee..cdc4a72123 100644 --- a/test/module/irohad/torii/torii_mocks.hpp +++ b/test/module/irohad/torii/torii_mocks.hpp @@ -1,27 +1,18 @@ /** - * Copyright Soramitsu Co., Ltd. 2017 All Rights Reserved. - * http://soramitsu.co.jp - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 */ #ifndef IROHA_TORII_MOCKS_HPP #define IROHA_TORII_MOCKS_HPP -#include "torii/processor/query_processor.hpp" - #include +#include "interfaces/query_responses/block_query_response.hpp" +#include "interfaces/query_responses/query_response.hpp" +#include "torii/processor/query_processor.hpp" +#include "torii/status_bus.hpp" + namespace iroha { namespace torii { @@ -36,6 +27,12 @@ namespace iroha { std::shared_ptr>( const shared_model::interface::BlocksQuery &)); }; + + class MockStatusBus : public StatusBus { + public: + MOCK_METHOD1(publish, void(StatusBus::Objects)); + MOCK_METHOD0(statuses, rxcpp::observable()); + }; } // namespace torii } // namespace iroha diff --git a/test/module/irohad/torii/torii_service_test.cpp b/test/module/irohad/torii/torii_service_test.cpp index c28deb1aa1..eda4d68ff1 100644 --- a/test/module/irohad/torii/torii_service_test.cpp +++ b/test/module/irohad/torii/torii_service_test.cpp @@ -1,18 +1,7 @@ -/* -Copyright Soramitsu Co., Ltd. 2017 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ #include "builders/protobuf/block.hpp" #include "builders/protobuf/proposal.hpp" @@ -28,6 +17,7 @@ limitations under the License. #include "module/shared_model/builders/protobuf/test_transaction_builder.hpp" #include "torii/command_client.hpp" #include "torii/command_service.hpp" +#include "torii/impl/status_bus_impl.hpp" #include "torii/processor/transaction_processor_impl.hpp" constexpr size_t TimesToriiBlocking = 5; @@ -108,8 +98,10 @@ class ToriiServiceTest : public testing::Test { EXPECT_CALL(*mst, onExpiredTransactionsImpl()) .WillRepeatedly(Return(mst_expired_notifier.get_observable())); + auto status_bus = std::make_shared(); auto tx_processor = - std::make_shared(pcsMock, mst); + std::make_shared( + pcsMock, mst, status_bus); EXPECT_CALL(*block_query, getTxByHashSync(_)) .WillRepeatedly(Return(boost::none)); @@ -117,8 +109,11 @@ class ToriiServiceTest : public testing::Test { //----------- Server run ---------------- runner - ->append(std::make_unique( - tx_processor, storage, initial_timeout, nonfinal_timeout)) + ->append(std::make_unique(tx_processor, + storage, + status_bus, + initial_timeout, + nonfinal_timeout)) .run() .match( [this](iroha::expected::Value port) { From 68f2238d9294e8d16bd5559ae2f109c2cfe7ce2f Mon Sep 17 00:00:00 2001 From: Kitsu Date: Thu, 26 Jul 2018 12:22:59 +0300 Subject: [PATCH 2/2] Add comment about worker_ member purpose Signed-off-by: Kitsu --- irohad/torii/impl/status_bus_impl.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/irohad/torii/impl/status_bus_impl.hpp b/irohad/torii/impl/status_bus_impl.hpp index 7bb4ed37d3..f56e99405a 100644 --- a/irohad/torii/impl/status_bus_impl.hpp +++ b/irohad/torii/impl/status_bus_impl.hpp @@ -22,6 +22,7 @@ namespace iroha { /// Subscribers will be invoked in separate thread rxcpp::observable statuses() override; + // Need to create once, otherwise will create thread for each subscriber rxcpp::observe_on_one_worker worker_; rxcpp::subjects::synchronize subject_;