Skip to content
This repository has been archived by the owner on Oct 4, 2019. It is now read-only.

Commit

Permalink
Fixed transactions formatting. Added program options. Minor bug fix. #…
Browse files Browse the repository at this point in the history
  • Loading branch information
vpavliv committed Feb 27, 2018
1 parent da36330 commit c6f0b51
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 52 deletions.
3 changes: 3 additions & 0 deletions contribution/config.ini
Expand Up @@ -111,3 +111,6 @@ appenders=stderr
[logger.p2p]
level=info
appenders=stderr

# Mongo db connection string
mongodb-uri=mongodb://127.0.0.1:27017
Expand Up @@ -3,10 +3,21 @@
#include <bsoncxx/builder/basic/kvp.hpp>
#include <bsoncxx/builder/basic/document.hpp>
#include <bsoncxx/builder/stream/document.hpp>
#include <bsoncxx/builder/stream/array.hpp>
#include <bsoncxx/builder/stream/value_context.hpp>
#include <bsoncxx/json.hpp>

#include <mongocxx/client.hpp>
#include <mongocxx/instance.hpp>
#include <mongocxx/stdx.hpp>
#include <mongocxx/uri.hpp>

using bsoncxx::builder::stream::close_array;
using bsoncxx::builder::stream::close_document;
using bsoncxx::builder::stream::document;
using bsoncxx::builder::stream::finalize;
using bsoncxx::builder::stream::open_array;
using bsoncxx::builder::stream::open_document;

#include <golos/protocol/block.hpp>

Expand All @@ -18,9 +29,11 @@ namespace mongo_db {

class mongo_db_writer final {
public:
mongo_db_writer(const std::string& uri_str);
mongo_db_writer();
~mongo_db_writer();

bool initialize(const std::string& uri_str);

void on_block(const signed_block& block);

private:
Expand Down
13 changes: 6 additions & 7 deletions libraries/plugins/mongo_db/mongo_db_plugin.cpp
Expand Up @@ -15,8 +15,8 @@ namespace mongo_db {
public:
mongo_db_plugin_impl(mongo_db_plugin &plugin, const std::string& uri_str)
: _my(plugin),
_db(appbase::app().get_plugin<golos::plugins::chain::plugin>().db()),
writer(uri_str) {
_db(appbase::app().get_plugin<golos::plugins::chain::plugin>().db()) {
writer.initialize(uri_str);
}

~mongo_db_plugin_impl() = default;
Expand All @@ -34,13 +34,12 @@ namespace mongo_db {
};

void mongo_db_plugin::mongo_db_plugin_impl::on_block(const signed_block &block) {
ilog("mongo_db plugin: on_block");


writer.on_block(block);
}

// Plugin
mongo_db_plugin::mongo_db_plugin() {
ilog("mongo_db plugin: ctor");
}

mongo_db_plugin::~mongo_db_plugin() {
Expand Down Expand Up @@ -74,8 +73,8 @@ namespace mongo_db {
});

} else {
wlog("golos::mongo_db_plugin configured, but no --mongodb-uri specified.");
wlog("mongo_db_plugin disabled.");
ilog("mongo_db_plugin configured, but no --mongodb-uri specified.");
ilog("mongo_db_plugin disabled.");
}

ilog("mongo_db plugin: plugin_initialize() end");
Expand Down
102 changes: 58 additions & 44 deletions libraries/plugins/mongo_db/mongo_db_writer.cpp
@@ -1,4 +1,6 @@
#include <golos/plugins/mongo_db/mongo_db_writer.hpp>
#include <fc/log/logger.hpp>
#include <mongocxx/exception/exception.hpp>


namespace golos {
Expand All @@ -12,57 +14,69 @@ namespace mongo_db {
const std::string mongo_db_writer::blocks_col = "Blocks";
const std::string mongo_db_writer::trans_col = "Transactions";

mongo_db_writer::mongo_db_writer(const std::string& uri_str) {
mongocxx::uri uri = mongocxx::uri {uri_str};

mongo_conn = mongocxx::client {uri};
db_name = uri.database();
mongo_db_writer::mongo_db_writer() {
}

void mongo_db_writer::on_block(const signed_block& block) {

bool transactions_in_block = false;
mongocxx::options::bulk_write bulk_opts;
bulk_opts.ordered(false);

auto blocks = mongo_conn[db_name][blocks_col]; // Blocks
auto trans = mongo_conn[db_name][trans_col]; // Transactions

stream::document doc{};
const auto block_id = block.id();
const auto block_id_str = block_id.str();
const auto prev_block_id_str = block.previous.str();
auto block_num = block.block_num();


auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()});
bool mongo_db_writer::initialize(const std::string& uri_str) {
try {
mongocxx::uri uri = mongocxx::uri {uri_str};

doc << "block_num" << std::to_string(block_num)
<< "block_id" << block_id_str
<< "prev_block_id" << prev_block_id_str
<< "transaction_merkle_root" << block.transaction_merkle_root.str()
<< "createdAt" << std::to_string(now.count());

int32_t trx_num = -1;
for (const auto& trx : block.transactions) {
++trx_num;

const auto trans_id_str = trx.id().str();
doc << "transaction_id" << trans_id_str
<< "sequence_num" << std::to_string(trx_num)
<< "block_id" << block_id_str
<< "ref_block_num" << std::to_string(trx.ref_block_num)
<< "expiration" << std::to_string(now.count());
mongo_conn = mongocxx::client {uri};
db_name = uri.database().empty() ? "Golos" : uri.database();
}

if (!blocks.insert_one(doc.view())) {
ilog("Failed to insert block ${bid}", ("bid", block_id));
catch (mongocxx::exception & ex) {
ilog(ex.what());
return false;
}
return true;
}

++processed_blocks;
void mongo_db_writer::on_block(const signed_block& block) {
try {
auto blocks = mongo_conn[db_name][blocks_col]; // Blocks

auto doc = document {};

auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()});

auto doc_ctx = doc
<< "block_num" << std::to_string(block.block_num())
<< "block_id" << block.id().str()
<< "prev_block_id" << block.previous.str()
<< "merkle_root" << block.transaction_merkle_root.str()
<< "created_at" << std::to_string(now.count());

if (!block.transactions.empty()) {
auto in_array = doc << "transactions" << open_array;

int trx_num = -1;
for (const auto& trx : block.transactions) {
++trx_num;

const auto trans_id_str = trx.id().str();
in_array << open_document
<< "id" << trans_id_str
<< "sequence_num" << std::to_string(trx_num)
<< "ref_block_num" << std::to_string(trx.ref_block_num)
<< "expiration" << std::to_string(now.count())
<< close_document;
}
in_array << close_array;
}

if (!blocks.insert_one(doc.view())) {
ilog("Failed to insert block ${bid}", ("bid", block.id()));
}

++processed_blocks;
}
catch (mongocxx::exception & ex) {
ilog(ex.what());
throw;
}
}

mongo_db_writer::~mongo_db_writer() {
}
}}}
}}}

0 comments on commit c6f0b51

Please sign in to comment.