Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Feature/tx status bus #1575

Merged
merged 2 commits into from
Jul 26, 2018
Merged

Feature/tx status bus #1575

merged 2 commits into from
Jul 26, 2018

Conversation

l4l
Copy link
Contributor

@l4l l4l commented Jul 18, 2018

Description of the Change

  • Add StatusBus interface
  • Add StatusBusImpl based on manual working with std::thread
  • Use StatusBus for sharing tx statuses among CommandService and TransactionProcessor
  • Introduce boost::thread dependency for boost::barrier
  • ITF::sendTx synchronize is based on StatusBus

Benefits

Cleaner architecture, some work delegate to other unit, lesser RC issues in tests

Possible Drawbacks

None

@l4l l4l added the needs-review pr awaits review from maintainers label Jul 18, 2018
@@ -29,6 +29,7 @@
#include "endpoint.pb.h"
#include "logger/logger.hpp"
#include "torii/processor/transaction_processor.hpp"
#include "torii/status_bus_impl.hpp"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

status_bus.hpp?

void addTxToCacheAndLog(const std::string &who,
const shared_model::crypto::Hash &hash,
const iroha::protocol::ToriiResponse &response);
void pushStatus(const std::string &who,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment please

}

void StatusBusImpl::update() {
while (not q_.empty() and q_.try_pop(obj_)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do not need to check for emptiness, try_pop returns false if the queue is empty.

std::shared_ptr<iroha::torii::StatusBus> status_bus)
: pcs_(std::move(pcs)),
mst_processor_(std::move(mst_processor)),
status_bus_(status_bus) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not move here?

#include <mutex>
#include <thread>

#include <tbb/concurrent_queue.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add empty line after include please

@l4l l4l requested a review from nickaleks July 23, 2018 08:43
@@ -155,7 +150,7 @@ namespace torii {
// Send transaction to iroha
tx_processor_->transactionHandle(tx);

this->addTxToCacheAndLog(
this->pushStatus(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and above there is a slight duplication, maybe put it in a function?

status_map[response->transactionHash()] = response;
});
EXPECT_CALL(*status_bus, publish(_))
// evey transaction is notified that it is first
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

@@ -19,6 +19,7 @@

#include <memory>

#include <boost/thread/barrier.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add whitespace please


command_service = std::make_shared<::torii::CommandService>(
tx_processor, storage, std::chrono::seconds(1), 2 * proposal_delay_);
status_bus_ = std::make_shared<StatusBusImpl>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move status bus as a separate method? I think it is will be more clear to a reader of the code. And, also, we will introduce status bus as a dependency in other modules.

@@ -69,6 +64,16 @@ namespace torii {
});
}

namespace {
iroha::protocol::ToriiResponse makeResponse(
shared_model::crypto::Hash h, iroha::protocol::TxStatus status) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the point to pass parameters as value instead of const ref?

@@ -69,6 +64,16 @@ namespace torii {
});
}

namespace {
iroha::protocol::ToriiResponse makeResponse(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add documentation for the method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it self-explanatory?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the purpose is hidden.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anonymous namespace make a clue that it's for internal usage
And there's nothing fancy, it just make response with the passed params

namespace {
iroha::protocol::ToriiResponse makeResponse(
shared_model::crypto::Hash h, iroha::protocol::TxStatus status) {
iroha::protocol::ToriiResponse response;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using raw proto instead of the model?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it used for grpc

@@ -270,7 +264,8 @@ namespace torii {
.build()
.getTransport();
}())));
return responses_
return status_bus_
->statuses()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this code is well-formed with clang-format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

public:
virtual ~StatusBus() = default;

using Objects =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add doc for using

/**
* Shares object among the bus users
* @param object to share
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add guarantees of interface about multi-thread interaction. This documentation may be moved to impl class doc.

/**
* @return observable over objects in bus
*/
virtual rxcpp::observable<Objects> statuses() = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add information about the thread where subscribes will be invoked. This documentation may be moved to impl class doc.

#include "interfaces/transaction_responses/tx_response.hpp"

namespace iroha {
namespace torii {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the namespace torii? Mabe move it class to libs?

@@ -179,10 +180,29 @@ namespace integration_framework {
std::function<void(const shared_model::proto::TransactionResponse &)>
validation) {
log_->info("send transaction");

boost::barrier bar1(2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment with explanation of barriers here.

@l4l l4l force-pushed the develop branch 8 times, most recently from 4750870 to b2a5906 Compare July 25, 2018 17:15
- Add StatusBusImpl
- Add MockStatusBus
- Introduce boost::thread
- Use proper synchronization for recieving stateless statuses in
ITF::sendTx
- Fix cache issue in CommandService
- Update dockerfiles
- Refactor CommandService with makeResponse

Signed-off-by: Kitsu <mail@kitsu.me>
namespace iroha {
namespace torii {
StatusBusImpl::StatusBusImpl(rxcpp::observe_on_one_worker worker)
: worker_(worker), subject_(worker_) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the point to hold worker_ field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it passed directly to the subject_ that will create a new thread for each subscriber

Signed-off-by: Kitsu <mail@kitsu.me>
@l4l l4l removed the needs-review pr awaits review from maintainers label Jul 26, 2018
@l4l l4l merged commit 19f3cd6 into develop Jul 26, 2018
@l4l l4l deleted the feature/tx_status_bus branch July 26, 2018 10:35
@dram
Copy link

dram commented Aug 21, 2018

As boost::thread dependency is introduced, I think that docs related to boost installation also need to be updated, i.e. https://github.com/hyperledger/iroha/blame/v1.0.0_beta-4/docs/source/guides/build.rst#L78-L82

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants