-
Notifications
You must be signed in to change notification settings - Fork 297
Feature/mst batches #1642
Feature/mst batches #1642
Changes from 28 commits
b68adfa
1d7e205
ca717fc
ce2be62
891901f
d18366b
bc4aba9
5e13baf
3341cc2
8378abf
2423470
e2b2d75
ac70ae6
65c650a
ecb6821
f9eb03b
515d14b
0faa112
1448adf
dea7819
5338f46
982e194
1645f54
a4d3b51
01c65d7
9606637
f6a4ef4
25d27b3
cc56c2e
515ff36
6af0b69
7cf5f3f
10d22da
cf87a14
68beda7
7d050ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,11 +24,12 @@ namespace iroha { | |
template <typename Subject> | ||
void shareState(ConstRefState state, Subject &subject) { | ||
if (not state.isEmpty()) { | ||
auto completed_transactions = state.getTransactions(); | ||
std::for_each( | ||
completed_transactions.begin(), | ||
completed_transactions.end(), | ||
[&subject](const auto tx) { subject.get_subscriber().on_next(tx); }); | ||
auto completed_batches = state.getBatches(); | ||
std::for_each(completed_batches.begin(), | ||
completed_batches.end(), | ||
[&subject](const auto batch) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe use |
||
subject.get_subscriber().on_next(batch); | ||
}); | ||
} | ||
} | ||
|
||
|
@@ -53,9 +54,9 @@ namespace iroha { | |
|
||
// -------------------------| MstProcessor override |------------------------- | ||
|
||
auto FairMstProcessor::propagateTransactionImpl(const DataType transaction) | ||
-> decltype(propagateTransaction(transaction)) { | ||
shareState(storage_->updateOwnState(transaction), transactions_subject_); | ||
auto FairMstProcessor::propagateBatchImpl(const iroha::DataType &batch) | ||
-> decltype(propagateBatch(batch)) { | ||
shareState(storage_->updateOwnState(batch), batches_subject_); | ||
shareState( | ||
storage_->getExpiredTransactions(time_provider_->getCurrentTime()), | ||
expired_subject_); | ||
|
@@ -66,13 +67,13 @@ namespace iroha { | |
return state_subject_.get_observable(); | ||
} | ||
|
||
auto FairMstProcessor::onPreparedTransactionsImpl() const | ||
-> decltype(onPreparedTransactions()) { | ||
return transactions_subject_.get_observable(); | ||
auto FairMstProcessor::onPreparedBatchesImpl() const | ||
-> decltype(onPreparedBatches()) { | ||
return batches_subject_.get_observable(); | ||
} | ||
|
||
auto FairMstProcessor::onExpiredTransactionsImpl() const | ||
-> decltype(onExpiredTransactions()) { | ||
auto FairMstProcessor::onExpiredBatchesImpl() const | ||
-> decltype(onExpiredBatches()) { | ||
return expired_subject_.get_observable(); | ||
} | ||
|
||
|
@@ -85,18 +86,17 @@ namespace iroha { | |
auto current_time = time_provider_->getCurrentTime(); | ||
|
||
// update state | ||
// todo wrap in method | ||
auto new_transactions = | ||
auto new_batches = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [required] |
||
std::make_shared<MstState>(storage_->whatsNew(new_state)); | ||
state_subject_.get_subscriber().on_next(new_transactions); | ||
state_subject_.get_subscriber().on_next(new_batches); | ||
|
||
log_->info("New txes size: {}", new_transactions->getTransactions().size()); | ||
// completed transactions | ||
shareState(storage_->apply(from, new_state), transactions_subject_); | ||
log_->info("New batches size: {}", new_batches->getBatches().size()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [optional] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you mean? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rephrasing. amount of batches is not the same as their size. |
||
// completed batches | ||
shareState(storage_->apply(from, new_state), batches_subject_); | ||
|
||
// expired transactions | ||
auto expired_transactions = storage_->getDiffState(from, current_time); | ||
shareState(expired_transactions, this->expired_subject_); | ||
// expired batches | ||
auto expired_batches = storage_->getDiffState(from, current_time); | ||
shareState(expired_batches, this->expired_subject_); | ||
} | ||
|
||
// -----------------------------| private api |----------------------------- | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,26 +37,27 @@ namespace iroha { | |
// ---------------------------| user interface |---------------------------- | ||
|
||
/** | ||
* Propagate in network multi-signature transaction for signing by other | ||
* Propagate in network batch for signing by other | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider rephrasing, because it means 'network batch' now. |
||
* participants | ||
* @param transaction - transaction for propagation | ||
*/ | ||
void propagateTransaction(const DataType transaction); | ||
void propagateBatch(const DataType &batch); | ||
|
||
/** | ||
* Prove updating of state for handling status of signing | ||
*/ | ||
rxcpp::observable<std::shared_ptr<MstState>> onStateUpdate() const; | ||
|
||
/** | ||
* Observable emit transactions that prepared to processing in system | ||
* Observable emit batches which are prepared for further processing in | ||
* system | ||
*/ | ||
rxcpp::observable<DataType> onPreparedTransactions() const; | ||
rxcpp::observable<DataType> onPreparedBatches() const; | ||
|
||
/** | ||
* Observable emit expired by time transactions | ||
*/ | ||
rxcpp::observable<DataType> onExpiredTransactions() const; | ||
rxcpp::observable<DataType> onExpiredBatches() const; | ||
|
||
virtual ~MstProcessor() = default; | ||
|
||
|
@@ -71,8 +72,8 @@ namespace iroha { | |
/** | ||
* @see propagateTransaction method | ||
*/ | ||
virtual auto propagateTransactionImpl(DataType transaction) | ||
-> decltype(propagateTransaction(transaction)) = 0; | ||
virtual auto propagateBatchImpl(const DataType &batch) | ||
-> decltype(propagateBatch(batch)) = 0; | ||
|
||
/** | ||
* @see onStateUpdate method | ||
|
@@ -82,14 +83,14 @@ namespace iroha { | |
/** | ||
* @see onPreparedTransactions method | ||
*/ | ||
virtual auto onPreparedTransactionsImpl() const | ||
-> decltype(onPreparedTransactions()) = 0; | ||
virtual auto onPreparedBatchesImpl() const | ||
-> decltype(onPreparedBatches()) = 0; | ||
|
||
/** | ||
* @see onExpiredTransactions method | ||
*/ | ||
virtual auto onExpiredTransactionsImpl() const | ||
-> decltype(onExpiredTransactions()) = 0; | ||
virtual auto onExpiredBatchesImpl() const | ||
-> decltype(onExpiredBatches()) = 0; | ||
}; | ||
} // namespace iroha | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/isocpp/CppCoreGuidelines/blob/master/CppCoreGuidelines.md#c49-prefer-initialization-to-assignment-in-constructors