Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Commit

Permalink
Rewrite StatusStream with rxcpp (#1541)
Browse files Browse the repository at this point in the history
* Rewrite StatusStream with rxcpp

* Minor fixes for types and logger calls

* Review fixes; fix irohad_test using wrong example path

* Fix tx hash setter in stateless valid case

* Add client id to debug log

Signed-off-by: Andrei Lebedev <lebdron@gmail.com>
  • Loading branch information
lebdron authored and l4l committed Jul 25, 2018
1 parent 18edb18 commit 9ae5bb7
Show file tree
Hide file tree
Showing 10 changed files with 435 additions and 157 deletions.
2 changes: 1 addition & 1 deletion irohad/main/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ void Irohad::initTransactionCommandService() {
std::make_shared<TransactionProcessorImpl>(pcs, mst_processor);

command_service = std::make_shared<::torii::CommandService>(
tx_processor, storage, proposal_delay_);
tx_processor, storage, std::chrono::seconds(1), 2 * proposal_delay_);

log_->info("[Init] => command service");
}
Expand Down
63 changes: 42 additions & 21 deletions irohad/torii/command_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ namespace torii {
* @param pb_factory - model->protobuf and vice versa converter
* @param tx_processor - processor of received transactions
* @param block_query - to query transactions outside the cache
* @param proposal_delay - time of a one proposal propagation.
* @param initial_timeout - streaming timeout when tx is not received
* @param nonfinal_timeout - streaming timeout when tx is being processed
*/
CommandService(
std::shared_ptr<iroha::torii::TransactionProcessor> tx_processor,
std::shared_ptr<iroha::ametsuchi::Storage> storage,
std::chrono::milliseconds proposal_delay);
std::chrono::milliseconds initial_timeout,
std::chrono::milliseconds nonfinal_timeout);

/**
* Disable copying in any way to prevent potential issues with common
Expand Down Expand Up @@ -102,12 +104,11 @@ namespace torii {
* the some final transaction status (which cannot change anymore)
* @param request- TxStatusRequest object which identifies transaction
* uniquely
* @param response_writer - grpc::ServerWriter which can repeatedly send
* transaction statuses back to the client
* @return observable with transaction statuses
*/
void StatusStream(
iroha::protocol::TxStatusRequest const &request,
grpc::ServerWriter<iroha::protocol::ToriiResponse> &response_writer);
rxcpp::observable<
std::shared_ptr<shared_model::interface::TransactionResponse>>
StatusStream(const shared_model::crypto::Hash &hash);

/**
* StatusStream call via grpc
Expand All @@ -118,30 +119,50 @@ namespace torii {
* transaction statuses back to the client
* @return - grpc::Status
*/
virtual grpc::Status StatusStream(
grpc::ServerContext *context,
const iroha::protocol::TxStatusRequest *request,
grpc::ServerWriter<iroha::protocol::ToriiResponse> *response_writer)
override;
grpc::Status StatusStream(grpc::ServerContext *context,
const iroha::protocol::TxStatusRequest *request,
grpc::ServerWriter<iroha::protocol::ToriiResponse>
*response_writer) override;

private:
bool checkCacheAndSend(
const boost::optional<iroha::protocol::ToriiResponse> &resp,
grpc::ServerWriter<iroha::protocol::ToriiResponse> &response_writer)
const;

bool isFinalStatus(const iroha::protocol::TxStatus &status) const;
/**
* Execute events scheduled in run loop until it is not empty and the
* subscriber is active
* @param subscription - tx status subscription
* @param run_loop - gRPC thread run loop
*/
inline void handleEvents(rxcpp::composite_subscription &subscription,
rxcpp::schedulers::run_loop &run_loop);

private:
using CacheType = iroha::cache::Cache<shared_model::crypto::Hash,
iroha::protocol::ToriiResponse,
shared_model::crypto::Hash::Hasher>;

std::shared_ptr<iroha::torii::TransactionProcessor> tx_processor_;
std::shared_ptr<iroha::ametsuchi::Storage> storage_;
std::chrono::milliseconds proposal_delay_;
std::chrono::milliseconds start_tx_processing_duration_;
std::chrono::milliseconds initial_timeout_;
std::chrono::milliseconds nonfinal_timeout_;
std::shared_ptr<CacheType> cache_;

/**
* Mutex for propagating stateless validation status
*/
std::mutex stateless_tx_status_notifier_mutex_;

/**
* Subject with stateless validation statuses
*/
rxcpp::subjects::subject<
std::shared_ptr<shared_model::interface::TransactionResponse>>
stateless_notifier_;

/**
* Observable with all transaction statuses
*/
rxcpp::observable<
std::shared_ptr<shared_model::interface::TransactionResponse>>
responses_;

logger::Logger log_;
};

Expand Down
Loading

0 comments on commit 9ae5bb7

Please sign in to comment.