Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Feature/tx status bus #1575

Merged
merged 2 commits into from
Jul 26, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmake/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ find_package(Boost 1.65.0 REQUIRED
COMPONENTS
filesystem
system
thread
)
add_library(boost INTERFACE IMPORTED)
set_target_properties(boost PROPERTIES
Expand Down
2 changes: 1 addition & 1 deletion docker/dependencies/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ RUN set -e; \
git clone https://github.com/boostorg/boost /tmp/boost; \
(cd /tmp/boost ; git checkout 436ad1dfcfc7e0246141beddd11c8a4e9c10b146); \
(cd /tmp/boost ; git submodule update --init --recursive); \
(cd /tmp/boost ; /tmp/boost/bootstrap.sh --with-libraries=system,filesystem); \
(cd /tmp/boost ; /tmp/boost/bootstrap.sh --with-libraries=system,filesystem,thread); \
(cd /tmp/boost ; /tmp/boost/b2 headers); \
(cd /tmp/boost ; /tmp/boost/b2 cxxflags="-std=c++14" -j ${PARALLELISM} install --prefix=/opt/dependencies/boost); \
ldconfig; \
Expand Down
2 changes: 1 addition & 1 deletion docker/develop/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ RUN set -e; \
git clone https://github.com/boostorg/boost /tmp/boost; \
(cd /tmp/boost ; git checkout 436ad1dfcfc7e0246141beddd11c8a4e9c10b146); \
(cd /tmp/boost ; git submodule update --init --recursive); \
(cd /tmp/boost ; /tmp/boost/bootstrap.sh --with-libraries=system,filesystem); \
(cd /tmp/boost ; /tmp/boost/bootstrap.sh --with-libraries=system,filesystem,thread); \
(cd /tmp/boost ; /tmp/boost/b2 headers); \
(cd /tmp/boost ; /tmp/boost/b2 cxxflags="-std=c++14" -j ${PARALLELISM} install); \
ldconfig; \
Expand Down
21 changes: 16 additions & 5 deletions irohad/main/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "multi_sig_transactions/mst_time_provider_impl.hpp"
#include "multi_sig_transactions/storage/mst_storage_impl.hpp"
#include "multi_sig_transactions/transport/mst_transport_grpc.hpp"
#include "torii/impl/status_bus_impl.hpp"
#include "validators/field_validator.hpp"

using namespace iroha;
Expand Down Expand Up @@ -74,6 +75,7 @@ void Irohad::init() {
initConsensusGate();
initSynchronizer();
initPeerCommunicationService();
initStatusBus();
initMstProcessor();

// Torii
Expand Down Expand Up @@ -233,6 +235,11 @@ void Irohad::initPeerCommunicationService() {
log_->info("[Init] => pcs");
}

void Irohad::initStatusBus() {
status_bus_ = std::make_shared<StatusBusImpl>();
log_->info("[Init] => Tx status bus");
}

void Irohad::initMstProcessor() {
if (is_mst_supported_) {
auto mst_transport = std::make_shared<MstTransportGrpc>();
Expand All @@ -257,11 +264,15 @@ void Irohad::initMstProcessor() {
* Initializing transaction command service
*/
void Irohad::initTransactionCommandService() {
auto tx_processor =
std::make_shared<TransactionProcessorImpl>(pcs, mst_processor);

command_service = std::make_shared<::torii::CommandService>(
tx_processor, storage, std::chrono::seconds(1), 2 * proposal_delay_);
auto tx_processor = std::make_shared<TransactionProcessorImpl>(
pcs, mst_processor, status_bus_);

command_service =
std::make_shared<::torii::CommandService>(tx_processor,
storage,
status_bus_,
std::chrono::seconds(1),
2 * proposal_delay_);

log_->info("[Init] => command service");
}
Expand Down
4 changes: 4 additions & 0 deletions irohad/main/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ class Irohad {

virtual void initPeerCommunicationService();

virtual void initStatusBus();

virtual void initMstProcessor();

virtual void initTransactionCommandService();
Expand Down Expand Up @@ -188,6 +190,8 @@ class Irohad {
// mst
std::shared_ptr<iroha::MstProcessor> mst_processor;

std::shared_ptr<iroha::torii::StatusBus> status_bus_;

// transaction service
std::shared_ptr<torii::CommandService> command_service;

Expand Down
8 changes: 8 additions & 0 deletions irohad/torii/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,11 @@ target_link_libraries(torii_service
shared_model_stateless_validation
processors
)

add_library(status_bus
impl/status_bus_impl.cpp
)
target_link_libraries(status_bus
rxcpp
shared_model_interfaces
)
51 changes: 15 additions & 36 deletions irohad/torii/command_service.hpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,6 @@
/**
* Copyright Soramitsu Co., Ltd. 2018 All Rights Reserved.
* http://soramitsu.co.jp
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef TORII_COMMAND_SERVICE_HPP
Expand All @@ -29,6 +17,7 @@
#include "endpoint.pb.h"
#include "logger/logger.hpp"
#include "torii/processor/transaction_processor.hpp"
#include "torii/status_bus.hpp"

namespace torii {
/**
Expand All @@ -40,12 +29,14 @@ namespace torii {
* Creates a new instance of CommandService
* @param tx_processor - processor of received transactions
* @param storage - to query transactions outside the cache
* @param status_bus is a common notifier for tx statuses
* @param initial_timeout - streaming timeout when tx is not received
* @param nonfinal_timeout - streaming timeout when tx is being processed
*/
CommandService(
std::shared_ptr<iroha::torii::TransactionProcessor> tx_processor,
std::shared_ptr<iroha::ametsuchi::Storage> storage,
std::shared_ptr<iroha::torii::StatusBus> status_bus,
std::chrono::milliseconds initial_timeout,
std::chrono::milliseconds nonfinal_timeout);

Expand Down Expand Up @@ -150,9 +141,15 @@ namespace torii {
inline void handleEvents(rxcpp::composite_subscription &subscription,
rxcpp::schedulers::run_loop &run_loop);

void addTxToCacheAndLog(const std::string &who,
const shared_model::crypto::Hash &hash,
const iroha::protocol::ToriiResponse &response);
/**
* Share tx status and log it
* @param who identifier for the logging
* @param hash of the tx
* @param response to be pushed
*/
void pushStatus(const std::string &who,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment please

const shared_model::crypto::Hash &hash,
const iroha::protocol::ToriiResponse &response);

private:
using CacheType = iroha::cache::Cache<shared_model::crypto::Hash,
Expand All @@ -161,29 +158,11 @@ namespace torii {

std::shared_ptr<iroha::torii::TransactionProcessor> tx_processor_;
std::shared_ptr<iroha::ametsuchi::Storage> storage_;
std::shared_ptr<iroha::torii::StatusBus> status_bus_;
std::chrono::milliseconds initial_timeout_;
std::chrono::milliseconds nonfinal_timeout_;
std::shared_ptr<CacheType> cache_;

/**
* Mutex for propagating stateless validation status
*/
std::mutex stateless_tx_status_notifier_mutex_;

/**
* Subject with stateless validation statuses
*/
rxcpp::subjects::subject<
std::shared_ptr<shared_model::interface::TransactionResponse>>
stateless_notifier_;

/**
* Observable with all transaction statuses
*/
rxcpp::observable<
std::shared_ptr<shared_model::interface::TransactionResponse>>
responses_;

logger::Logger log_;
};

Expand Down
85 changes: 39 additions & 46 deletions irohad/torii/impl/command_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,18 @@ namespace torii {
CommandService::CommandService(
std::shared_ptr<iroha::torii::TransactionProcessor> tx_processor,
std::shared_ptr<iroha::ametsuchi::Storage> storage,
std::shared_ptr<iroha::torii::StatusBus> status_bus,
std::chrono::milliseconds initial_timeout,
std::chrono::milliseconds nonfinal_timeout)
: tx_processor_(tx_processor),
storage_(storage),
status_bus_(status_bus),
initial_timeout_(initial_timeout),
nonfinal_timeout_(nonfinal_timeout),
cache_(std::make_shared<CacheType>()),
// merge with mutex, since notifications can be made from different
// threads
// TODO 11.07.2018 andrei rework status handling with event bus IR-1517
responses_(tx_processor_->transactionNotifier().merge(
rxcpp::serialize_one_worker(
rxcpp::schedulers::make_current_thread()),
stateless_notifier_.get_observable())),
log_(logger::log("CommandService")) {
// Notifier for all clients
responses_.subscribe([this](auto iroha_response) {
status_bus_->statuses().subscribe([this](auto iroha_response) {
// find response for this tx in cache; if status of received response
// isn't "greater" than cached one, dismiss received one
auto proto_response =
Expand All @@ -69,6 +64,17 @@ namespace torii {
});
}

namespace {
iroha::protocol::ToriiResponse makeResponse(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add documentation for the method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it self-explanatory?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the purpose is hidden.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anonymous namespace make a clue that it's for internal usage
And there's nothing fancy, it just make response with the passed params

const shared_model::crypto::Hash &h,
const iroha::protocol::TxStatus &status) {
iroha::protocol::ToriiResponse response;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using raw proto instead of the model?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it used for grpc

response.set_tx_hash(shared_model::crypto::toBinaryString(h));
response.set_tx_status(status);
return response;
}
} // namespace

void CommandService::Torii(const iroha::protocol::Transaction &request) {
shared_model::proto::TransportBuilder<
shared_model::proto::Transaction,
Expand All @@ -86,20 +92,17 @@ namespace torii {
return;
}

// setting response
iroha::protocol::ToriiResponse response;
response.set_tx_hash(
shared_model::crypto::toBinaryString(tx_hash));
response.set_tx_status(
iroha::protocol::TxStatus::STATELESS_VALIDATION_SUCCESS);

// Send transaction to iroha
tx_processor_->transactionHandle(
std::make_shared<shared_model::proto::Transaction>(
std::move(iroha_tx.value)));

this->addTxToCacheAndLog(
"Torii", std::move(tx_hash), std::move(response));
this->pushStatus(
"Torii",
std::move(tx_hash),
makeResponse(
tx_hash,
iroha::protocol::TxStatus::STATELESS_VALIDATION_SUCCESS));
},
[this, &request](const auto &error) {
// getting hash from invalid transaction
Expand All @@ -113,14 +116,12 @@ namespace torii {
tx_hash.hex());

// setting response
iroha::protocol::ToriiResponse response;
response.set_tx_hash(
shared_model::crypto::toBinaryString(tx_hash));
response.set_tx_status(
auto response = makeResponse(
tx_hash,
iroha::protocol::TxStatus::STATELESS_VALIDATION_FAILED);
response.set_error_message(std::move(error.error));

this->addTxToCacheAndLog(
this->pushStatus(
"Torii", std::move(tx_hash), std::move(response));
});
}
Expand All @@ -145,18 +146,15 @@ namespace torii {
return;
}

// setting response
iroha::protocol::ToriiResponse response;
response.set_tx_hash(
shared_model::crypto::toBinaryString(tx_hash));
response.set_tx_status(
iroha::protocol::TxStatus::STATELESS_VALIDATION_SUCCESS);

// Send transaction to iroha
tx_processor_->transactionHandle(tx);

this->addTxToCacheAndLog(
"ToriiList", std::move(tx_hash), std::move(response));
this->pushStatus(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and above there is a slight duplication, maybe put it in a function?

"ToriiList",
std::move(tx_hash),
makeResponse(tx_hash,
iroha::protocol::TxStatus::
STATELESS_VALIDATION_SUCCESS));
});
},
[this, &tx_list](auto &error) {
Expand Down Expand Up @@ -189,14 +187,12 @@ namespace torii {
shared_model::crypto::DefaultHashProvider::makeHash(
shared_model::proto::makeBlob(tx.payload()));

iroha::protocol::ToriiResponse response;
response.set_tx_hash(
shared_model::crypto::toBinaryString(hash));
response.set_tx_status(
auto response = makeResponse(
hash,
iroha::protocol::TxStatus::STATELESS_VALIDATION_FAILED);
response.set_error_message(sequence_error);

this->addTxToCacheAndLog(
this->pushStatus(
"ToriiList", std::move(hash), std::move(response));
});
});
Expand Down Expand Up @@ -225,16 +221,15 @@ namespace torii {
response.CopyFrom(*resp);
} else {
response.set_tx_hash(request.tx_hash());
if (storage_->getBlockQuery()->hasTxWithHash(
shared_model::crypto::Hash(request.tx_hash()))) {
auto hash = shared_model::crypto::Hash(request.tx_hash());
if (storage_->getBlockQuery()->hasTxWithHash(hash)) {
response.set_tx_status(iroha::protocol::TxStatus::COMMITTED);
cache_->addItem(std::move(hash), response);
} else {
log_->warn("Asked non-existing tx: {}",
iroha::bytestringToHexstring(request.tx_hash()));
response.set_tx_status(iroha::protocol::TxStatus::NOT_RECEIVED);
}
this->addTxToCacheAndLog(
"Status", std::move(tx_hash), std::move(response));
}
}

Expand Down Expand Up @@ -274,7 +269,8 @@ namespace torii {
.build()
.getTransport();
}())));
return responses_
return status_bus_
->statuses()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this code is well-formed with clang-format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

// prepend initial status
.start_with(initial_status)
// select statuses with requested hash
Expand Down Expand Up @@ -368,18 +364,15 @@ namespace torii {
}
}

void CommandService::addTxToCacheAndLog(
void CommandService::pushStatus(
const std::string &who,
const shared_model::crypto::Hash &hash,
const iroha::protocol::ToriiResponse &response) {
log_->debug("{}: adding item to cache: {}, status {} ",
who,
hash.hex(),
response.tx_status());
// transactions can be handled from multiple threads, therefore a lock is
// required
std::lock_guard<std::mutex> lock(stateless_tx_status_notifier_mutex_);
stateless_notifier_.get_subscriber().on_next(
status_bus_->publish(
std::make_shared<shared_model::proto::TransactionResponse>(
std::move(response)));
}
Expand Down
Loading