Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Commit

Permalink
Feature/tx status bus (#1575)
Browse files Browse the repository at this point in the history
- Add StatusBusImpl
- Add MockStatusBus
- Introduce boost::thread for boost::barrier & update dockerfiles
- Use proper synchronization for recieving stateless statuses in
ITF::sendTx
- Fix cache issue in CommandService
- Refactor CommandService with makeResponse

Signed-off-by: Kitsu <mail@kitsu.me>
  • Loading branch information
l4l committed Jul 26, 2018
1 parent f77fe60 commit 19f3cd6
Show file tree
Hide file tree
Showing 21 changed files with 304 additions and 228 deletions.
1 change: 1 addition & 0 deletions cmake/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ find_package(Boost 1.65.0 REQUIRED
COMPONENTS
filesystem
system
thread
)
add_library(boost INTERFACE IMPORTED)
set_target_properties(boost PROPERTIES
Expand Down
2 changes: 1 addition & 1 deletion docker/dependencies/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ RUN set -e; \
git clone https://github.com/boostorg/boost /tmp/boost; \
(cd /tmp/boost ; git checkout 436ad1dfcfc7e0246141beddd11c8a4e9c10b146); \
(cd /tmp/boost ; git submodule update --init --recursive); \
(cd /tmp/boost ; /tmp/boost/bootstrap.sh --with-libraries=system,filesystem); \
(cd /tmp/boost ; /tmp/boost/bootstrap.sh --with-libraries=system,filesystem,thread); \
(cd /tmp/boost ; /tmp/boost/b2 headers); \
(cd /tmp/boost ; /tmp/boost/b2 cxxflags="-std=c++14" -j ${PARALLELISM} install --prefix=/opt/dependencies/boost); \
ldconfig; \
Expand Down
2 changes: 1 addition & 1 deletion docker/develop/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ RUN set -e; \
git clone https://github.com/boostorg/boost /tmp/boost; \
(cd /tmp/boost ; git checkout 436ad1dfcfc7e0246141beddd11c8a4e9c10b146); \
(cd /tmp/boost ; git submodule update --init --recursive); \
(cd /tmp/boost ; /tmp/boost/bootstrap.sh --with-libraries=system,filesystem); \
(cd /tmp/boost ; /tmp/boost/bootstrap.sh --with-libraries=system,filesystem,thread); \
(cd /tmp/boost ; /tmp/boost/b2 headers); \
(cd /tmp/boost ; /tmp/boost/b2 cxxflags="-std=c++14" -j ${PARALLELISM} install); \
ldconfig; \
Expand Down
21 changes: 16 additions & 5 deletions irohad/main/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "multi_sig_transactions/mst_time_provider_impl.hpp"
#include "multi_sig_transactions/storage/mst_storage_impl.hpp"
#include "multi_sig_transactions/transport/mst_transport_grpc.hpp"
#include "torii/impl/status_bus_impl.hpp"
#include "validators/field_validator.hpp"

using namespace iroha;
Expand Down Expand Up @@ -74,6 +75,7 @@ void Irohad::init() {
initConsensusGate();
initSynchronizer();
initPeerCommunicationService();
initStatusBus();
initMstProcessor();

// Torii
Expand Down Expand Up @@ -233,6 +235,11 @@ void Irohad::initPeerCommunicationService() {
log_->info("[Init] => pcs");
}

void Irohad::initStatusBus() {
status_bus_ = std::make_shared<StatusBusImpl>();
log_->info("[Init] => Tx status bus");
}

void Irohad::initMstProcessor() {
if (is_mst_supported_) {
auto mst_transport = std::make_shared<MstTransportGrpc>();
Expand All @@ -257,11 +264,15 @@ void Irohad::initMstProcessor() {
* Initializing transaction command service
*/
void Irohad::initTransactionCommandService() {
auto tx_processor =
std::make_shared<TransactionProcessorImpl>(pcs, mst_processor);

command_service = std::make_shared<::torii::CommandService>(
tx_processor, storage, std::chrono::seconds(1), 2 * proposal_delay_);
auto tx_processor = std::make_shared<TransactionProcessorImpl>(
pcs, mst_processor, status_bus_);

command_service =
std::make_shared<::torii::CommandService>(tx_processor,
storage,
status_bus_,
std::chrono::seconds(1),
2 * proposal_delay_);

log_->info("[Init] => command service");
}
Expand Down
4 changes: 4 additions & 0 deletions irohad/main/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ class Irohad {

virtual void initPeerCommunicationService();

virtual void initStatusBus();

virtual void initMstProcessor();

virtual void initTransactionCommandService();
Expand Down Expand Up @@ -188,6 +190,8 @@ class Irohad {
// mst
std::shared_ptr<iroha::MstProcessor> mst_processor;

std::shared_ptr<iroha::torii::StatusBus> status_bus_;

// transaction service
std::shared_ptr<torii::CommandService> command_service;

Expand Down
8 changes: 8 additions & 0 deletions irohad/torii/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,11 @@ target_link_libraries(torii_service
shared_model_stateless_validation
processors
)

add_library(status_bus
impl/status_bus_impl.cpp
)
target_link_libraries(status_bus
rxcpp
shared_model_interfaces
)
51 changes: 15 additions & 36 deletions irohad/torii/command_service.hpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,6 @@
/**
* Copyright Soramitsu Co., Ltd. 2018 All Rights Reserved.
* http://soramitsu.co.jp
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef TORII_COMMAND_SERVICE_HPP
Expand All @@ -29,6 +17,7 @@
#include "endpoint.pb.h"
#include "logger/logger.hpp"
#include "torii/processor/transaction_processor.hpp"
#include "torii/status_bus.hpp"

namespace torii {
/**
Expand All @@ -40,12 +29,14 @@ namespace torii {
* Creates a new instance of CommandService
* @param tx_processor - processor of received transactions
* @param storage - to query transactions outside the cache
* @param status_bus is a common notifier for tx statuses
* @param initial_timeout - streaming timeout when tx is not received
* @param nonfinal_timeout - streaming timeout when tx is being processed
*/
CommandService(
std::shared_ptr<iroha::torii::TransactionProcessor> tx_processor,
std::shared_ptr<iroha::ametsuchi::Storage> storage,
std::shared_ptr<iroha::torii::StatusBus> status_bus,
std::chrono::milliseconds initial_timeout,
std::chrono::milliseconds nonfinal_timeout);

Expand Down Expand Up @@ -150,9 +141,15 @@ namespace torii {
inline void handleEvents(rxcpp::composite_subscription &subscription,
rxcpp::schedulers::run_loop &run_loop);

void addTxToCacheAndLog(const std::string &who,
const shared_model::crypto::Hash &hash,
const iroha::protocol::ToriiResponse &response);
/**
* Share tx status and log it
* @param who identifier for the logging
* @param hash of the tx
* @param response to be pushed
*/
void pushStatus(const std::string &who,
const shared_model::crypto::Hash &hash,
const iroha::protocol::ToriiResponse &response);

private:
using CacheType = iroha::cache::Cache<shared_model::crypto::Hash,
Expand All @@ -161,29 +158,11 @@ namespace torii {

std::shared_ptr<iroha::torii::TransactionProcessor> tx_processor_;
std::shared_ptr<iroha::ametsuchi::Storage> storage_;
std::shared_ptr<iroha::torii::StatusBus> status_bus_;
std::chrono::milliseconds initial_timeout_;
std::chrono::milliseconds nonfinal_timeout_;
std::shared_ptr<CacheType> cache_;

/**
* Mutex for propagating stateless validation status
*/
std::mutex stateless_tx_status_notifier_mutex_;

/**
* Subject with stateless validation statuses
*/
rxcpp::subjects::subject<
std::shared_ptr<shared_model::interface::TransactionResponse>>
stateless_notifier_;

/**
* Observable with all transaction statuses
*/
rxcpp::observable<
std::shared_ptr<shared_model::interface::TransactionResponse>>
responses_;

logger::Logger log_;
};

Expand Down
85 changes: 39 additions & 46 deletions irohad/torii/impl/command_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,18 @@ namespace torii {
CommandService::CommandService(
std::shared_ptr<iroha::torii::TransactionProcessor> tx_processor,
std::shared_ptr<iroha::ametsuchi::Storage> storage,
std::shared_ptr<iroha::torii::StatusBus> status_bus,
std::chrono::milliseconds initial_timeout,
std::chrono::milliseconds nonfinal_timeout)
: tx_processor_(tx_processor),
storage_(storage),
status_bus_(status_bus),
initial_timeout_(initial_timeout),
nonfinal_timeout_(nonfinal_timeout),
cache_(std::make_shared<CacheType>()),
// merge with mutex, since notifications can be made from different
// threads
// TODO 11.07.2018 andrei rework status handling with event bus IR-1517
responses_(tx_processor_->transactionNotifier().merge(
rxcpp::serialize_one_worker(
rxcpp::schedulers::make_current_thread()),
stateless_notifier_.get_observable())),
log_(logger::log("CommandService")) {
// Notifier for all clients
responses_.subscribe([this](auto iroha_response) {
status_bus_->statuses().subscribe([this](auto iroha_response) {
// find response for this tx in cache; if status of received response
// isn't "greater" than cached one, dismiss received one
auto proto_response =
Expand All @@ -69,6 +64,17 @@ namespace torii {
});
}

namespace {
iroha::protocol::ToriiResponse makeResponse(
const shared_model::crypto::Hash &h,
const iroha::protocol::TxStatus &status) {
iroha::protocol::ToriiResponse response;
response.set_tx_hash(shared_model::crypto::toBinaryString(h));
response.set_tx_status(status);
return response;
}
} // namespace

void CommandService::Torii(const iroha::protocol::Transaction &request) {
shared_model::proto::TransportBuilder<
shared_model::proto::Transaction,
Expand All @@ -86,20 +92,17 @@ namespace torii {
return;
}

// setting response
iroha::protocol::ToriiResponse response;
response.set_tx_hash(
shared_model::crypto::toBinaryString(tx_hash));
response.set_tx_status(
iroha::protocol::TxStatus::STATELESS_VALIDATION_SUCCESS);

// Send transaction to iroha
tx_processor_->transactionHandle(
std::make_shared<shared_model::proto::Transaction>(
std::move(iroha_tx.value)));

this->addTxToCacheAndLog(
"Torii", std::move(tx_hash), std::move(response));
this->pushStatus(
"Torii",
std::move(tx_hash),
makeResponse(
tx_hash,
iroha::protocol::TxStatus::STATELESS_VALIDATION_SUCCESS));
},
[this, &request](const auto &error) {
// getting hash from invalid transaction
Expand All @@ -113,14 +116,12 @@ namespace torii {
tx_hash.hex());

// setting response
iroha::protocol::ToriiResponse response;
response.set_tx_hash(
shared_model::crypto::toBinaryString(tx_hash));
response.set_tx_status(
auto response = makeResponse(
tx_hash,
iroha::protocol::TxStatus::STATELESS_VALIDATION_FAILED);
response.set_error_message(std::move(error.error));

this->addTxToCacheAndLog(
this->pushStatus(
"Torii", std::move(tx_hash), std::move(response));
});
}
Expand All @@ -145,18 +146,15 @@ namespace torii {
return;
}

// setting response
iroha::protocol::ToriiResponse response;
response.set_tx_hash(
shared_model::crypto::toBinaryString(tx_hash));
response.set_tx_status(
iroha::protocol::TxStatus::STATELESS_VALIDATION_SUCCESS);

// Send transaction to iroha
tx_processor_->transactionHandle(tx);

this->addTxToCacheAndLog(
"ToriiList", std::move(tx_hash), std::move(response));
this->pushStatus(
"ToriiList",
std::move(tx_hash),
makeResponse(tx_hash,
iroha::protocol::TxStatus::
STATELESS_VALIDATION_SUCCESS));
});
},
[this, &tx_list](auto &error) {
Expand Down Expand Up @@ -189,14 +187,12 @@ namespace torii {
shared_model::crypto::DefaultHashProvider::makeHash(
shared_model::proto::makeBlob(tx.payload()));

iroha::protocol::ToriiResponse response;
response.set_tx_hash(
shared_model::crypto::toBinaryString(hash));
response.set_tx_status(
auto response = makeResponse(
hash,
iroha::protocol::TxStatus::STATELESS_VALIDATION_FAILED);
response.set_error_message(sequence_error);

this->addTxToCacheAndLog(
this->pushStatus(
"ToriiList", std::move(hash), std::move(response));
});
});
Expand Down Expand Up @@ -225,16 +221,15 @@ namespace torii {
response.CopyFrom(*resp);
} else {
response.set_tx_hash(request.tx_hash());
if (storage_->getBlockQuery()->hasTxWithHash(
shared_model::crypto::Hash(request.tx_hash()))) {
auto hash = shared_model::crypto::Hash(request.tx_hash());
if (storage_->getBlockQuery()->hasTxWithHash(hash)) {
response.set_tx_status(iroha::protocol::TxStatus::COMMITTED);
cache_->addItem(std::move(hash), response);
} else {
log_->warn("Asked non-existing tx: {}",
iroha::bytestringToHexstring(request.tx_hash()));
response.set_tx_status(iroha::protocol::TxStatus::NOT_RECEIVED);
}
this->addTxToCacheAndLog(
"Status", std::move(tx_hash), std::move(response));
}
}

Expand Down Expand Up @@ -274,7 +269,8 @@ namespace torii {
.build()
.getTransport();
}())));
return responses_
return status_bus_
->statuses()
// prepend initial status
.start_with(initial_status)
// select statuses with requested hash
Expand Down Expand Up @@ -368,18 +364,15 @@ namespace torii {
}
}

void CommandService::addTxToCacheAndLog(
void CommandService::pushStatus(
const std::string &who,
const shared_model::crypto::Hash &hash,
const iroha::protocol::ToriiResponse &response) {
log_->debug("{}: adding item to cache: {}, status {} ",
who,
hash.hex(),
response.tx_status());
// transactions can be handled from multiple threads, therefore a lock is
// required
std::lock_guard<std::mutex> lock(stateless_tx_status_notifier_mutex_);
stateless_notifier_.get_subscriber().on_next(
status_bus_->publish(
std::make_shared<shared_model::proto::TransactionResponse>(
std::move(response)));
}
Expand Down
Loading

0 comments on commit 19f3cd6

Please sign in to comment.