-
Notifications
You must be signed in to change notification settings - Fork 297
Conversation
Signed-off-by: Andrei Lebedev <lebdron@gmail.com>
Signed-off-by: Andrei Lebedev <lebdron@gmail.com> # Conflicts: # irohad/torii/impl/command_service.cpp
Signed-off-by: Andrei Lebedev <lebdron@gmail.com>
response_writer.Write(resp_sub); | ||
} | ||
rxcpp::observable< | ||
std::shared_ptr<shared_model::interface::TransactionResponse>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This type used multiple times, maybe create alias, e.g using Response = std::shared_ptr<shared_model::interface::TransactionResponse>;
: 2 * proposal_delay_; | ||
}, | ||
current_thread)) | ||
.take_while([=](const auto &) { return not context->IsCancelled(); }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be useful to add logging here as well
irohad/torii/impl/timeout.hpp
Outdated
/** | ||
* Copyright Soramitsu Co., Ltd. All Rights Reserved. | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess there should be one more license
irohad/torii/impl/timeout.hpp
Outdated
typedef typename coordination_type::coordinator_type coordinator_type; | ||
typedef rxcpp::util::decay_t<Selector> select_type; | ||
typedef decltype( | ||
(*(select_type *)nullptr)(*(source_value_type *)nullptr)) value_type; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it used?
Also declval might be applicable
irohad/torii/impl/timeout.hpp
Outdated
if (selectedWork.empty()) { | ||
return; | ||
} | ||
localState->worker.schedule(selectedWork.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe invert if
above and move this line up?
irohad/torii/command_service.hpp
Outdated
std::mutex notifier_mutex_; | ||
rxcpp::subjects::subject< | ||
std::shared_ptr<shared_model::interface::TransactionResponse>> | ||
notifier_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is not so meaningful, maybe rename or add some documentation.
@@ -44,9 +46,15 @@ namespace torii { | |||
proposal_delay_(proposal_delay), | |||
start_tx_processing_duration_(1s), | |||
cache_(std::make_shared<CacheType>()), | |||
// merge with mutex, since notifications can be made from different | |||
// threads | |||
responses_(tx_processor_->transactionNotifier().merge( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pls, add TODO about event bus rework.
return responses_.start_with(initial_status) | ||
.filter( | ||
[&](auto response) { return response->transactionHash() == hash; }) | ||
.lift<std::shared_ptr<shared_model::interface::TransactionResponse>>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, better to add some comments about what happened here.
return response.tx_status() | ||
== iroha::protocol::TxStatus::NOT_RECEIVED | ||
? start_tx_processing_duration_ | ||
: 2 * proposal_delay_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe move out of here configuration logic?
.subscribe( | ||
subscription, | ||
[&](iroha::protocol::ToriiResponse response) { | ||
log_->debug("writing status"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the debug after Write
method.
request_hash->hex()); | ||
// run loop while subscription is active or there are pending events in the | ||
// queue | ||
while (subscription.is_subscribed() or not rl.empty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to wrap this function with an inline method for the improving readability.
irohad/torii/impl/timeout.hpp
Outdated
namespace torii { | ||
|
||
/** | ||
* Return an observable that terminates with timeout_error if a particular |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe better to write here about "This class is copypaste of ..." for reducing misleading usages of code as an example.
irohad/torii/impl/timeout.hpp
Outdated
|
||
#include <rxcpp/operators/rx-timeout.hpp> | ||
|
||
namespace torii { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe move this file to lib for sharing usage?
Signed-off-by: Andrei Lebedev <lebdron@gmail.com>
Signed-off-by: Andrei Lebedev <lebdron@gmail.com>
.take_while([=](const auto &) { | ||
auto is_cancelled = context->IsCancelled(); | ||
if (is_cancelled) { | ||
log_->debug("client unsubscribed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe specify here some identify information about a client?
log_->debug("writing status"); | ||
response_writer->Write(response); | ||
if (response_writer->Write(response)) { | ||
log_->debug("status written"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
Signed-off-by: Andrei Lebedev <lebdron@gmail.com>
* Rewrite StatusStream with rxcpp * Minor fixes for types and logger calls * Review fixes; fix irohad_test using wrong example path * Fix tx hash setter in stateless valid case * Add client id to debug log Signed-off-by: Andrei Lebedev <lebdron@gmail.com>
* Rewrite StatusStream with rxcpp * Minor fixes for types and logger calls * Review fixes; fix irohad_test using wrong example path * Fix tx hash setter in stateless valid case * Add client id to debug log Signed-off-by: Andrei Lebedev <lebdron@gmail.com>
* Rewrite StatusStream with rxcpp * Minor fixes for types and logger calls * Review fixes; fix irohad_test using wrong example path * Fix tx hash setter in stateless valid case * Add client id to debug log Signed-off-by: Andrei Lebedev <lebdron@gmail.com>
* Rewrite StatusStream with rxcpp * Minor fixes for types and logger calls * Review fixes; fix irohad_test using wrong example path * Fix tx hash setter in stateless valid case * Add client id to debug log Signed-off-by: Andrei Lebedev <lebdron@gmail.com>
* Rewrite StatusStream with rxcpp * Minor fixes for types and logger calls * Review fixes; fix irohad_test using wrong example path * Fix tx hash setter in stateless valid case * Add client id to debug log Signed-off-by: Andrei Lebedev <lebdron@gmail.com>
Signed-off-by: Andrei Lebedev lebdron@gmail.com
Description of the Change
Rewrite StatusStream method without manual synchronization with condition variables and mutexes using rxcpp
Benefits
Separate status mapping and transport logic
Possible Drawbacks
Custom rxcpp timeout operator