Skip to content

Commit

Permalink
fix & refactor PostgresBlockStorage
Browse files Browse the repository at this point in the history
Signed-off-by: Mikhail Boldyrev <miboldyrev@gmail.com>
  • Loading branch information
MBoldyrev committed Aug 27, 2019
1 parent 1efd68e commit 0a37feb
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 103 deletions.
176 changes: 79 additions & 97 deletions irohad/ametsuchi/impl/postgres_block_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

using namespace iroha::ametsuchi;

using shared_model::interface::types::HeightType;

PostgresBlockStorage::PostgresBlockStorage(
std::shared_ptr<PoolWrapper> pool_wrapper,
std::shared_ptr<BlockTransportFactory> block_factory,
Expand All @@ -22,102 +24,83 @@ PostgresBlockStorage::PostgresBlockStorage(

bool PostgresBlockStorage::insert(
std::shared_ptr<const shared_model::interface::Block> block) {
shared_model::interface::types::HeightType last_block = 0;
using T = boost::tuple<shared_model::interface::types::HeightType>;
soci::session sql(*pool_wrapper_->connection_pool_);
auto result_last_block = execute<T>(
[&] { return (sql.prepare << "SELECT MAX(height) FROM " << table_); });
try {
last_block =
flatMapValue<
boost::optional<shared_model::interface::types::HeightType>>(
result_last_block,
[](auto &last) { return boost::make_optional(last); })
.value_or(0);
} catch (const std::exception &e) {
log_->warn("Problem with a query result parsing: {}", e.what());
}

if (last_block != 0) {
if (block->height() != last_block + 1) {
log_->warn(
"Only blocks with sequential heights could be inserted. Last block "
"height: {}, inserting: {}",
last_block,
block->height());
return false;
}
auto inserted_height = block->height();

auto opt_range = getBlockHeightsRange();
if (opt_range and inserted_height != opt_range->max + 1) {
log_->warn(
"Only blocks with sequential heights could be inserted. "
"Last block height: {}, inserting: {}",
opt_range->max,
inserted_height);
return false;
}

auto h = block->height();
auto b = block->blob().hex();

soci::session sql(*pool_wrapper_->connection_pool_);
soci::statement st = (sql.prepare << "INSERT INTO " << table_
<< " (height, block_data) VALUES(:height, "
":block_data)",
soci::use(h),
soci::use(inserted_height),
soci::use(b));
log_->debug("insert block {}: {}", h, b);
log_->debug("insert block {}: {}", inserted_height, b);
try {
st.execute(true);
return true;
} catch (const std::exception &e) {
log_->warn("Failed to insert block {}, reason {}", h, e.what());
log_->warn(
"Failed to insert block {}, reason {}", inserted_height, e.what());
return false;
}
}

boost::optional<std::shared_ptr<const shared_model::interface::Block>>
PostgresBlockStorage::fetch(
shared_model::interface::types::HeightType height) const {
using T = boost::tuple<std::string>;
PostgresBlockStorage::fetch(HeightType height) const {
soci::session sql(*pool_wrapper_->connection_pool_);
auto result = execute<T>([&] {
return (sql.prepare << "SELECT block_data FROM " << table_
<< " WHERE height = :height",
soci::use(height));
});
return flatMapValue<
boost::optional<std::shared_ptr<const shared_model::interface::Block>>>(
result, [&](auto &block_data) {
log_->debug("fetched: {}", block_data);
auto byte_block = iroha::hexstringToBytestring(block_data);
if (not byte_block) {
return boost::optional<
std::shared_ptr<const shared_model::interface::Block>>(
boost::none);
}

iroha::protocol::Block_v1 b1;
b1.ParseFromString(*byte_block);
iroha::protocol::Block block;
*block.mutable_block_v1() = b1;
return block_factory_->createBlock(std::move(block))
.match(
[&](auto &&v) {
return boost::make_optional(
std::shared_ptr<const shared_model::interface::Block>(
std::move(v.value)));
},
[&](const auto &e)
-> boost::optional<
std::shared_ptr<const shared_model::interface::Block>> {
log_->error("Could not build block at height {}: {}",
height,
e.error);
return boost::none;
});
});
using QueryTuple = boost::tuple<boost::optional<std::string>>;
QueryTuple row;
try {
sql << "SELECT block_data FROM " << table_ << " WHERE height = :height",
soci::use(height), soci::into(row);
} catch (const std::exception &e) {
log_->error("Failed to execute query: {}", e.what());
return boost::none;
}
return rebind(viewQuery<QueryTuple>(row)) | [&, this](auto row) {
return apply(row, [&, this](auto &block_data) {
log_->debug("fetched: {}", block_data);
return iroha::hexstringToBytestring(block_data) |
[&, this](auto byte_block) {
iroha::protocol::Block_v1 b1;
b1.ParseFromString(byte_block);
iroha::protocol::Block block;
*block.mutable_block_v1() = b1;
return block_factory_->createBlock(std::move(block))
.match(
[&](auto &&v) {
return boost::make_optional(
std::shared_ptr<const shared_model::interface::Block>(
std::move(v.value)));
},
[&](const auto &e)
-> boost::optional<std::shared_ptr<
const shared_model::interface::Block>> {
log_->error("Could not build block at height {}: {}",
height,
e.error);
return boost::none;
});
};
});
};
}

size_t PostgresBlockStorage::size() const {
using T = boost::tuple<shared_model::interface::types::HeightType>;
soci::session sql(*pool_wrapper_->connection_pool_);
auto result = execute<T>(
[&] { return (sql.prepare << "SELECT COUNT(*) FROM " << table_); });
return flatMapValue<
boost::optional<shared_model::interface::types::HeightType>>(
result, [](auto &count) { return boost::make_optional(count); })
return (getBlockHeightsRange() |
[](auto range) {
return boost::make_optional(range.max - range.min + 1);
})
.value_or(0);
}

Expand All @@ -133,36 +116,35 @@ void PostgresBlockStorage::clear() {

void PostgresBlockStorage::forEach(
iroha::ametsuchi::BlockStorage::FunctionType function) const {
using T = boost::tuple<shared_model::interface::types::HeightType>;
soci::session sql(*pool_wrapper_->connection_pool_);
// TODO: IR-577 Add caching if it will gain a performance boost
// luckychess 29.06.2019
auto result_min = execute<T>(
[&] { return (sql.prepare << "SELECT MIN(height) FROM " << table_); });
auto min =
flatMapValue<boost::optional<shared_model::interface::types::HeightType>>(
result_min, [](auto &min) { return boost::make_optional(min); })
.value_or(0);
auto result_max = execute<T>(
[&] { return (sql.prepare << "SELECT MAX(height) FROM " << table_); });
auto max =
flatMapValue<boost::optional<shared_model::interface::types::HeightType>>(
result_max, [](auto &max) { return boost::make_optional(max); })
.value_or(0);
while (min <= max) {
function(*fetch(min));
++min;
}
getBlockHeightsRange() | [this, &function](auto range) {
while (range.min <= range.max) {
function(*this->fetch(range.min));
++range.min;
}
};
}

template <typename T, typename F>
boost::optional<soci::rowset<T>> PostgresBlockStorage::execute(F &&f) const {
boost::optional<PostgresBlockStorage::HeightRange>
PostgresBlockStorage::getBlockHeightsRange() const {
// TODO: IR-577 Add caching if it will gain a performance boost
// luckychess 29.06.2019
soci::session sql(*pool_wrapper_->connection_pool_);
using QueryTuple =
boost::tuple<boost::optional<size_t>, boost::optional<size_t>>;
QueryTuple row;
try {
return soci::rowset<T>{std::forward<F>(f)()};
sql << "SELECT MIN(height), MAX(height) FROM " << table_, soci::into(row);
} catch (const std::exception &e) {
log_->error("Failed to execute query: {}", e.what());
return boost::none;
}
return rebind(viewQuery<QueryTuple>(row)) | [](auto row) {
return apply(row, [](size_t min, size_t max) {
assert(max >= min);
return boost::make_optional(HeightRange{min, max});
});
};
}

PostgresTemporaryBlockStorage::PostgresTemporaryBlockStorage(
Expand Down
13 changes: 7 additions & 6 deletions irohad/ametsuchi/impl/postgres_block_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ namespace iroha {
void forEach(FunctionType function) const override;

private:
/**
* Executes given lambda of type F, catches exceptions if any, logs the
* message, and returns an optional rowset<T>
*/
template <typename T, typename F>
boost::optional<soci::rowset<T>> execute(F &&f) const;
struct HeightRange {
shared_model::interface::types::HeightType min;
shared_model::interface::types::HeightType max;
};

/// Get the range of stored block heights.
boost::optional<HeightRange> getBlockHeightsRange() const;

protected:
std::shared_ptr<PoolWrapper> pool_wrapper_;
Expand Down

0 comments on commit 0a37feb

Please sign in to comment.