Skip to content

Commit

Permalink
REFACTORED add_peer_test
Browse files Browse the repository at this point in the history
/build all

Signed-off-by: kuvaldini <ivan@kuvaldini.pro>
  • Loading branch information
kuvaldini committed Sep 8, 2021
1 parent 85ec9b5 commit 8a99302
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 93 deletions.
149 changes: 80 additions & 69 deletions test/framework/integration_framework/integration_test_framework.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ class IntegrationTestFramework::CheckerQueue {
return obj;
}

boost::optional<T> try_pop() {
std::optional<T> try_pop() {
std::unique_lock<std::mutex> lock(queue_mutex_);
if (queue_.empty()) {
if (not cv_.wait_for(
lock, timeout_, [this] { return not queue_.empty(); })) {
return boost::none;
return std::nullopt;
}
}
T obj(std::move(queue_.front()));
Expand All @@ -154,6 +154,78 @@ class IntegrationTestFramework::CheckerQueue {
std::condition_variable cv_;
};

#include <unordered_map>

struct IntegrationTestFramework::ResponsesQueues {
public:
using HashType = shared_model::interface::types::HashType;

private:
/// maximum time of waiting before appearing next transaction response
std::chrono::milliseconds tx_response_waiting_time_ms;
std::recursive_mutex mtx;
std::unordered_map<HashType,
std::unique_ptr<CheckerQueue<TxResponseType>>,
HashType::Hasher>
map;

public:
ResponsesQueues(std::chrono::milliseconds ms)
: tx_response_waiting_time_ms(ms) {}
auto lock() {
return std::unique_lock(mtx);
}
auto findOrEmplace(HashType const &txhash) -> decltype(map)::iterator {
// assert(not mtx.try_lock()); // expecting it is locked before
auto lk = lock();
auto it = map.find(txhash);
if (it == map.end()) {
it = map.emplace(txhash,
std::make_unique<CheckerQueue<TxResponseType>>(
tx_response_waiting_time_ms))
.first;
}
return it;
}
auto find(HashType const &txhash) -> std::optional<decltype(map)::iterator> {
// assert(not mtx.try_lock()); // expecting it is locked before
auto lk = lock();
auto it = map.find(txhash);
if (it == map.end()) {
return std::nullopt;
}
return {it};
}
auto push(TxResponseType txresp) {
auto lk = lock();
return findOrEmplace(txresp->transactionHash())->second->push(txresp);
}
auto try_peek(HashType const &txhash) -> std::optional<TxResponseType> {
auto lk = lock();
auto opt_it = find(txhash);
if (!opt_it)
return std::nullopt;
auto &it = *opt_it;
auto &unique_responses_queue = it->second;
auto opt_response = unique_responses_queue->try_peek();
if (opt_response)
return *opt_response;
return std::nullopt;
}
auto try_pop(HashType const &txhash) -> std::optional<TxResponseType> {
auto lk = lock();
auto opt_it = find(txhash);
if (!opt_it)
return std::nullopt;
auto &it = *opt_it;
auto &unique_responses_queue = it->second;
auto opt_response = unique_responses_queue->try_pop();
if (opt_response)
return *opt_response;
return std::nullopt;
}
};

IntegrationTestFramework::IntegrationTestFramework(
size_t maximum_proposal_size,
iroha::StorageType db_type,
Expand Down Expand Up @@ -435,46 +507,6 @@ void IntegrationTestFramework::initPipeline(
log_->info("created pipeline");
}

auto IntegrationTestFramework::ResponsesQueues::findOrEmplace(
std::string const &hash) -> decltype(map)::iterator {
// assert(not mtx.try_lock()); // expecting it is locked before coming hre
auto lk = lock();
auto it = map.find(hash);
if (it == map.end()) {
it = map.emplace(hash,
std::make_unique<CheckerQueue<TxResponseType>>(
tx_response_waiting_time_ms))
.first;
}
return it;
}
auto IntegrationTestFramework::ResponsesQueues::find(std::string const &hash)
-> std::optional<decltype(map)::iterator> {
// assert(not mtx.try_lock()); // expecting it is locked before coming here
auto lk = lock();
auto it = map.find(hash);
if (it == map.end()) {
return std::nullopt;
}
return {it};
}
auto IntegrationTestFramework::ResponsesQueues::try_peek(std::string const &txhash) ->std::optional<TxResponseType>{
auto lk = lock();
auto opt_it = find(txhash);
if(!opt_it)
return std::nullopt;
auto&it = *opt_it;
auto&unique_responses_queue = it->second;
auto opt_response = unique_responses_queue->try_peek();
if(opt_response)
return *opt_response;
return std::nullopt;
}
IntegrationTestFramework::ResponsesQueues::ResponsesQueues(
std::chrono::milliseconds ms)
: tx_response_waiting_time_ms(ms) {}


void IntegrationTestFramework::subscribeQueuesAndRun() {
// subscribing for components

Expand Down Expand Up @@ -546,14 +578,9 @@ void IntegrationTestFramework::subscribeQueuesAndRun() {
// return;
auto log = std::shared_ptr(wlog);
auto responses_queues = std::shared_ptr(w_responses_queues);
const auto hash = response->transactionHash().hex();
log->debug("kOnTransactionResponse: hash: {}", hash);
auto lk = responses_queues->lock();
responses_queues->findOrEmplace(hash)->second->push(response);
responses_queues->push(response);
log->info("response added to status queue: {}",
response->toString());
log->info("responses_queues->...->size(): {}",
responses_queues->findOrEmplace(hash)->second->size());
});

if (fake_peers_.size() > 0) {
Expand Down Expand Up @@ -627,13 +654,8 @@ IntegrationTestFramework &IntegrationTestFramework::sendTx(
validation) {
log_->debug("sendTx()");
sendTxWithoutValidation(tx);
std::optional<TxResponseType> opt_response = responses_queues_->try_peek(tx.hash().hex());
// {
// auto lk = responses_queues_->lock();
// auto it = responses_queues_->findOrEmplace(tx.hash().hex());
// log_->debug("2. sendTx(): it->second->size(): {}", it->second->size());
// opt_response = it->second->try_peek();
// }
std::optional<TxResponseType> opt_response =
responses_queues_->try_peek(tx.hash());
if (not opt_response)
throw std::runtime_error("sendTx(): missed status for hash "
+ tx.hash().hex());
Expand Down Expand Up @@ -672,28 +694,23 @@ IntegrationTestFramework &IntegrationTestFramework::sendTxSequence(
log_->info("send transactions");
const auto &transactions = tx_sequence.transactions();

auto lk = responses_queues_->lock();

// put all transactions to the TxList and send them to iroha
iroha::protocol::TxList tx_list;
for (const auto &tx : transactions) {
auto proto_tx =
std::static_pointer_cast<shared_model::proto::Transaction>(tx)
->getTransport();
*tx_list.add_transactions() = proto_tx;
responses_queues_->findOrEmplace(tx->hash().hex());
}
command_client_->ListTorii(tx_list);

// save all stateless statuses into a vector
std::vector<shared_model::proto::TransactionResponse> observed_statuses;
for (const auto &tx : transactions) {
// fetch first response associated with the tx from related queue
boost::optional<TxResponseType> opt_response(
responses_queues_->map.find(tx->hash().hex())->second->try_peek());
auto opt_response = responses_queues_->try_peek(tx->hash());
if (not opt_response)
throw std::runtime_error("sendTxSequence: missed status");

observed_statuses.push_back(
static_cast<const shared_model::proto::TransactionResponse &>(
*opt_response.value()));
Expand Down Expand Up @@ -810,15 +827,9 @@ IntegrationTestFramework &IntegrationTestFramework::checkStatus(
std::function<void(const shared_model::proto::TransactionResponse &)>
validation) {
// fetch first response associated with the tx from related queue
boost::optional<TxResponseType> opt_response;
log_->debug("checkStatus()");
{
auto lk = responses_queues_->lock();
const auto it = responses_queues_->map.find(tx_hash.hex());
if (it != responses_queues_->map.end()) {
opt_response = it->second->try_pop();
}
}
std::optional<TxResponseType> opt_response =
responses_queues_->try_pop(tx_hash);
if (not opt_response) {
throw std::runtime_error("checkStatus(): missed status");
}
Expand Down
50 changes: 26 additions & 24 deletions test/framework/integration_framework/integration_test_framework.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
#include "main/subscription_fwd.hpp"
#include "synchronizer/synchronizer_common.hpp"

namespace google {
namespace protobuf {
class Empty;
}
} // namespace google
namespace google::protobuf {
class Empty;
} // namespace google::protobuf

namespace shared_model {
namespace crypto {
Expand Down Expand Up @@ -73,7 +71,6 @@ namespace iroha {
class GenericClientFactory;
class MstTransportGrpc;
struct OrderingEvent;
class OrderingServiceTransport;
class ServerRunner;
template <typename Response>
class AsyncGrpcClient;
Expand Down Expand Up @@ -510,23 +507,28 @@ namespace integration_framework {
block_subscription_;
std::shared_ptr<CheckerQueue<BlockType>> block_queue_;

struct ResponsesQueues {
public:
std::map<std::string, std::unique_ptr<CheckerQueue<TxResponseType>>> map;
private:
/// maximum time of waiting before appearing next transaction response
std::chrono::milliseconds tx_response_waiting_time_ms;
std::recursive_mutex mtx;
public:
ResponsesQueues(std::chrono::milliseconds ms);
auto findOrEmplace(std::string const &hash) -> decltype(map)::iterator;
auto find(std::string const &hash) -> std::optional<decltype(map)::iterator>;
auto lock() {
return std::unique_lock(mtx);
}
//push(std::string const &hash, TxResponseType txresp);
auto try_peek(std::string const &hash) ->std::optional<TxResponseType>;
};
struct ResponsesQueues;
// {
// public:
// //using HashType = shared_model::crypto::Hash;
// std::map<std::string, std::unique_ptr<CheckerQueue<TxResponseType>>>
// map;
// private:
// /// maximum time of waiting before appearing next transaction
// response std::chrono::milliseconds tx_response_waiting_time_ms;
// std::recursive_mutex mtx;
// public:
// ResponsesQueues(std::chrono::milliseconds ms);
// auto findOrEmplace(std::string const &hash) ->
// decltype(map)::iterator; auto find(std::string const &hash) ->
// std::optional<decltype(map)::iterator>; auto lock() {
// return std::unique_lock(mtx);
// }
// //push(std::string const &hash, TxResponseType txresp);
// auto try_peek(std::string const &hash)
// ->std::optional<TxResponseType>; auto try_pop(std::string const
// &hash) ->std::optional<TxResponseType>;
// };

std::shared_ptr<ResponsesQueues> responses_queues_;
std::shared_ptr<iroha::BaseSubscriber<
Expand Down Expand Up @@ -570,7 +572,7 @@ namespace integration_framework {
std::shared_ptr<iroha::network::MstTransportGrpc> mst_transport_;
std::shared_ptr<iroha::consensus::yac::YacNetwork> yac_transport_;

boost::optional<shared_model::crypto::Keypair> my_key_;
std::optional<shared_model::crypto::Keypair> my_key_;
std::shared_ptr<shared_model::interface::Peer> this_peer_;

private:
Expand Down

0 comments on commit 8a99302

Please sign in to comment.