Skip to content

Commit

Permalink
PostgresIndexer uses bulk insert
Browse files Browse the repository at this point in the history
Signed-off-by: Mikhail Boldyrev <miboldyrev@gmail.com>
  • Loading branch information
MBoldyrev committed Jan 20, 2020
1 parent 16dbbe6 commit f26975a
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 32 deletions.
104 changes: 76 additions & 28 deletions irohad/ametsuchi/impl/postgres_indexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,14 @@ PostgresIndexer::PostgresIndexer(soci::session &sql) : sql_(sql) {}

void PostgresIndexer::txHashPosition(const HashType &hash,
TxPosition position) {
boost::format base(
"INSERT INTO position_by_hash"
"(hash, height, index) VALUES "
"('%s', '%s', '%s');\n");
statements_.append(
(base % hash.hex() % position.height % position.index).str());
tx_hash_position_.hash.emplace_back(hash.hex());
tx_hash_position_.height.emplace_back(position.height);
tx_hash_position_.index.emplace_back(position.index);
}

void PostgresIndexer::txHashStatus(const HashType &rejected_tx_hash,
bool is_committed) {
boost::format base(
"INSERT INTO tx_status_by_hash"
"(hash, status) VALUES "
"('%s', '%s');\n");
statements_.append(
(base % rejected_tx_hash.hex() % (is_committed ? "TRUE" : "FALSE"))
.str());
void PostgresIndexer::txHashStatus(const HashType &tx_hash, bool is_committed) {
tx_hash_status_.hash.emplace_back(tx_hash.hex());
tx_hash_status_.status.emplace_back(is_committed ? "TRUE" : "FALSE");
}

void PostgresIndexer::committedTxHash(const HashType &committed_tx_hash) {
Expand All @@ -45,28 +36,85 @@ void PostgresIndexer::rejectedTxHash(const HashType &rejected_tx_hash) {

void PostgresIndexer::txPositionByCreator(const AccountIdType creator,
TxPosition position) {
boost::format base(
"INSERT INTO tx_position_by_creator"
"(creator_id, height, index) VALUES "
"('%s', '%s', '%s');\n");
statements_.append((base % creator % position.height % position.index).str());
tx_position_by_creator_.creator.emplace_back(creator);
tx_position_by_creator_.height.emplace_back(position.height);
tx_position_by_creator_.index.emplace_back(position.index);
}

void PostgresIndexer::accountAssetTxPosition(const AccountIdType &account_id,
const AssetIdType &asset_id,
TxPosition position) {
boost::format base(
"INSERT INTO position_by_account_asset"
"(account_id, asset_id, height, index) VALUES "
"('%s', '%s', '%s', '%s');\n");
statements_.append(
(base % account_id % asset_id % position.height % position.index).str());
account_asset_tx_position_.account_id.emplace_back(account_id);
account_asset_tx_position_.asset_id.emplace_back(asset_id);
account_asset_tx_position_.height.emplace_back(position.height);
account_asset_tx_position_.index.emplace_back(position.index);
}

iroha::expected::Result<void, std::string> PostgresIndexer::flush() {
try {
sql_ << statements_;
statements_.clear();
assert(tx_hash_position_.hash.size() == tx_hash_position_.height.size());
assert(tx_hash_position_.hash.size() == tx_hash_position_.index.size());
if (not tx_hash_position_.hash.empty()) {
sql_ << "INSERT INTO position_by_hash"
"(hash, height, index) VALUES "
"(:hash, :height, :index);",
soci::use(tx_hash_position_.hash),
soci::use(tx_hash_position_.height),
soci::use(tx_hash_position_.index);

tx_hash_position_.hash.clear();
tx_hash_position_.height.clear();
tx_hash_position_.index.clear();
}

assert(tx_hash_status_.hash.size() == tx_hash_status_.status.size());
if (not tx_hash_status_.hash.empty()) {
sql_ << "INSERT INTO tx_status_by_hash"
"(hash, status) VALUES "
"(:hash, :status);",
soci::use(tx_hash_status_.hash), soci::use(tx_hash_status_.status);

tx_hash_status_.hash.clear();
tx_hash_status_.status.clear();
}

assert(tx_position_by_creator_.creator.size()
== tx_position_by_creator_.height.size());
assert(tx_position_by_creator_.creator.size()
== tx_position_by_creator_.index.size());
if (not tx_position_by_creator_.creator.empty()) {
sql_ << "INSERT INTO tx_position_by_creator"
"(creator_id, height, index) VALUES "
"(:creator_id, :height, :index);",
soci::use(tx_position_by_creator_.creator),
soci::use(tx_position_by_creator_.height),
soci::use(tx_position_by_creator_.index);

tx_position_by_creator_.creator.clear();
tx_position_by_creator_.height.clear();
tx_position_by_creator_.index.clear();
}

assert(account_asset_tx_position_.account_id.size()
== account_asset_tx_position_.asset_id.size());
assert(account_asset_tx_position_.account_id.size()
== account_asset_tx_position_.height.size());
assert(account_asset_tx_position_.account_id.size()
== account_asset_tx_position_.index.size());
if (not account_asset_tx_position_.account_id.empty()) {
sql_ << "INSERT INTO position_by_account_asset"
"(account_id, asset_id, height, index) VALUES "
"(:account_id, :asset_id, :height, :index);",
soci::use(account_asset_tx_position_.account_id),
soci::use(account_asset_tx_position_.asset_id),
soci::use(account_asset_tx_position_.height),
soci::use(account_asset_tx_position_.index);

account_asset_tx_position_.account_id.clear();
account_asset_tx_position_.asset_id.clear();
account_asset_tx_position_.height.clear();
account_asset_tx_position_.index.clear();
}
} catch (const std::exception &e) {
return e.what();
}
Expand Down
33 changes: 29 additions & 4 deletions irohad/ametsuchi/impl/postgres_indexer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

#include "ametsuchi/indexer.hpp"

#include <string>
#include <vector>

namespace soci {
class session;
}
Expand Down Expand Up @@ -40,13 +43,35 @@ namespace iroha {
iroha::expected::Result<void, std::string> flush() override;

private:
struct TxHashPosition {
std::vector<std::string> hash;
std::vector<size_t> height;
std::vector<size_t> index;
} tx_hash_position_;

struct TxHashStatus {
std::vector<std::string> hash;
std::vector<std::string> status;
} tx_hash_status_;

struct TxPositionByCreator {
std::vector<std::string> creator;
std::vector<size_t> height;
std::vector<size_t> index;
} tx_position_by_creator_;

struct AccountAssetTxPosition {
std::vector<std::string> account_id;
std::vector<std::string> asset_id;
std::vector<size_t> height;
std::vector<size_t> index;
} account_asset_tx_position_;

/// Index tx status by its hash.
void txHashStatus(
const shared_model::interface::types::HashType &rejected_tx_hash,
bool is_committed);
void txHashStatus(const shared_model::interface::types::HashType &tx_hash,
bool is_committed);

soci::session &sql_;
std::string statements_; ///< A bunch of SQL to be committed on flush().
};

} // namespace ametsuchi
Expand Down

0 comments on commit f26975a

Please sign in to comment.