diff --git a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp index 2eb601fd65..ba02c4d074 100644 --- a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp +++ b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp @@ -264,20 +264,24 @@ void elasticsearch_plugin_impl::getOperationType(const optional & oho) -{ +{ try { os.trx_in_block = oho->trx_in_block; os.op_in_trx = oho->op_in_trx; os.operation_result = fc::json::to_string(oho->result); os.virtual_op = oho->virtual_op; if(_elasticsearch_operation_object) { + // op oho->op.visit(fc::from_static_variant(os.op_object, FC_PACK_MAX_DEPTH)); - adaptor_struct adaptor; - os.op_object = adaptor.adapt(os.op_object.get_object()); + os.op_object = graphene::utilities::es_data_adaptor::adapt( os.op_object.get_object() ); + // operation_result + variant v; + fc::to_variant( oho->result, v, FC_PACK_MAX_DEPTH ); + os.operation_result_object = graphene::utilities::es_data_adaptor::adapt_static_variant( v.get_array() ); } if(_elasticsearch_operation_string) os.op = fc::json::to_string(oho->op); -} +} FC_CAPTURE_LOG_AND_RETHROW( (oho) ) } void elasticsearch_plugin_impl::doBlock(uint32_t trx_in_block, const signed_block& b) { @@ -289,6 +293,61 @@ void elasticsearch_plugin_impl::doBlock(uint32_t trx_in_block, const signed_bloc bs.trx_id = trx_id; } +struct operation_visitor +{ + using result_type = void; + + share_type fee_amount; + asset_id_type fee_asset; + + asset_id_type transfer_asset_id; + share_type transfer_amount; + account_id_type transfer_from; + account_id_type transfer_to; + + void operator()( const graphene::chain::transfer_operation& o ) + { + fee_asset = o.fee.asset_id; + fee_amount = o.fee.amount; + + transfer_asset_id = o.amount.asset_id; + transfer_amount = o.amount.amount; + transfer_from = o.from; + transfer_to = o.to; + } + + object_id_type fill_order_id; + account_id_type fill_account_id; + asset_id_type fill_pays_asset_id; + share_type fill_pays_amount; + asset_id_type fill_receives_asset_id; + share_type fill_receives_amount; + double fill_fill_price; + bool fill_is_maker; + + void operator()( const graphene::chain::fill_order_operation& o ) + { + fee_asset = o.fee.asset_id; + fee_amount = o.fee.amount; + + fill_order_id = o.order_id; + fill_account_id = o.account_id; + fill_pays_asset_id = o.pays.asset_id; + fill_pays_amount = o.pays.amount; + fill_receives_asset_id = o.receives.asset_id; + fill_receives_amount = o.receives.amount; + fill_fill_price = o.fill_price.to_real(); + fill_is_maker = o.is_maker; + } + + template + void operator()( const T& o ) + { + fee_asset = o.fee.asset_id; + fee_amount = o.fee.amount; + } +}; + void elasticsearch_plugin_impl::doVisitor(const optional & oho) { graphene::chain::database& db = database(); diff --git a/libraries/plugins/elasticsearch/include/graphene/elasticsearch/elasticsearch_plugin.hpp b/libraries/plugins/elasticsearch/include/graphene/elasticsearch/elasticsearch_plugin.hpp index 05e24843f0..fbc012acba 100644 --- a/libraries/plugins/elasticsearch/include/graphene/elasticsearch/elasticsearch_plugin.hpp +++ b/libraries/plugins/elasticsearch/include/graphene/elasticsearch/elasticsearch_plugin.hpp @@ -80,61 +80,6 @@ class elasticsearch_plugin : public graphene::app::plugin }; -struct operation_visitor -{ - typedef void result_type; - - share_type fee_amount; - asset_id_type fee_asset; - - asset_id_type transfer_asset_id; - share_type transfer_amount; - account_id_type transfer_from; - account_id_type transfer_to; - - void operator()( const graphene::chain::transfer_operation& o ) - { - fee_asset = o.fee.asset_id; - fee_amount = o.fee.amount; - - transfer_asset_id = o.amount.asset_id; - transfer_amount = o.amount.amount; - transfer_from = o.from; - transfer_to = o.to; - } - - object_id_type fill_order_id; - account_id_type fill_account_id; - asset_id_type fill_pays_asset_id; - share_type fill_pays_amount; - asset_id_type fill_receives_asset_id; - share_type fill_receives_amount; - double fill_fill_price; - bool fill_is_maker; - - void operator()( const graphene::chain::fill_order_operation& o ) - { - fee_asset = o.fee.asset_id; - fee_amount = o.fee.amount; - - fill_order_id = o.order_id; - fill_account_id = o.account_id; - fill_pays_asset_id = o.pays.asset_id; - fill_pays_amount = o.pays.amount; - fill_receives_asset_id = o.receives.asset_id; - fill_receives_amount = o.receives.amount; - fill_fill_price = o.fill_price.to_real(); - fill_is_maker = o.is_maker; - } - - template - void operator()( const T& o ) - { - fee_asset = o.fee.asset_id; - fee_amount = o.fee.amount; - } -}; - struct operation_history_struct { int trx_in_block; int op_in_trx; @@ -142,6 +87,7 @@ struct operation_history_struct { int virtual_op; std::string op; variant op_object; + variant operation_result_object; }; struct block_struct { @@ -197,115 +143,18 @@ struct bulk_struct { optional additional_data; }; -struct adaptor_struct { - variant adapt(const variant_object& op) - { - fc::mutable_variant_object o(op); - vector keys_to_rename; - for (auto i = o.begin(); i != o.end(); ++i) - { - auto& element = (*i).value(); - if (element.is_object()) - { - const string& name = (*i).key(); - auto& vo = element.get_object(); - if (vo.contains(name.c_str())) - keys_to_rename.emplace_back(name); - element = adapt(vo); - } - else if (element.is_array()) - adapt(element.get_array()); - } - for (const auto& i : keys_to_rename) - { - string new_name = i + "_"; - o[new_name] = variant(o[i]); - o.erase(i); - } - - if (o.find("memo") != o.end()) - { - auto& memo = o["memo"]; - if (memo.is_string()) - { - o["memo_"] = o["memo"]; - o.erase("memo"); - } - else if (memo.is_object()) - { - fc::mutable_variant_object tmp(memo.get_object()); - if (tmp.find("nonce") != tmp.end()) - { - tmp["nonce"] = tmp["nonce"].as_string(); - o["memo"] = tmp; - } - } - } - if (o.find("new_parameters") != o.end()) - { - auto& tmp = o["new_parameters"]; - if (tmp.is_object()) - { - fc::mutable_variant_object tmp2(tmp.get_object()); - if (tmp2.find("current_fees") != tmp2.end()) - { - tmp2.erase("current_fees"); - o["new_parameters"] = tmp2; - } - } - } - if (o.find("owner") != o.end() && o["owner"].is_string()) - { - o["owner_"] = o["owner"].as_string(); - o.erase("owner"); - } - - vector to_string_fields = { - "proposed_ops", - "initializer", - "policy", - "predicates", - "active_special_authority", - "owner_special_authority", - "acceptable_collateral", - "acceptable_borrowers" - }; - for( const auto& name : to_string_fields ) - { - if (o.find(name) != o.end()) - { - o[name] = fc::json::to_string(o[name]); - } - } - - variant v; - fc::to_variant(o, v, FC_PACK_MAX_DEPTH); - return v; - } - - void adapt(fc::variants& v) - { - for (auto& array_element : v) - { - if (array_element.is_object()) - array_element = adapt(array_element.get_object()); - else if (array_element.is_array()) - adapt(array_element.get_array()); - else - array_element = array_element.as_string(); - } - } -}; - } } //graphene::elasticsearch FC_REFLECT_ENUM( graphene::elasticsearch::mode, (only_save)(only_query)(all) ) -FC_REFLECT( graphene::elasticsearch::operation_history_struct, (trx_in_block)(op_in_trx)(operation_result)(virtual_op)(op)(op_object) ) +FC_REFLECT( graphene::elasticsearch::operation_history_struct, + (trx_in_block)(op_in_trx)(operation_result)(virtual_op)(op)(op_object)(operation_result_object) ) FC_REFLECT( graphene::elasticsearch::block_struct, (block_num)(block_time)(trx_id) ) FC_REFLECT( graphene::elasticsearch::fee_struct, (asset)(asset_name)(amount)(amount_units) ) FC_REFLECT( graphene::elasticsearch::transfer_struct, (asset)(asset_name)(amount)(amount_units)(from)(to) ) -FC_REFLECT( graphene::elasticsearch::fill_struct, (order_id)(account_id)(pays_asset_id)(pays_asset_name)(pays_amount)(pays_amount_units) - (receives_asset_id)(receives_asset_name)(receives_amount)(receives_amount_units)(fill_price) - (fill_price_units)(is_maker)) +FC_REFLECT( graphene::elasticsearch::fill_struct, + (order_id)(account_id)(pays_asset_id)(pays_asset_name)(pays_amount)(pays_amount_units) + (receives_asset_id)(receives_asset_name)(receives_amount)(receives_amount_units)(fill_price) + (fill_price_units)(is_maker) ) FC_REFLECT( graphene::elasticsearch::visitor_struct, (fee_data)(transfer_data)(fill_data) ) -FC_REFLECT( graphene::elasticsearch::bulk_struct, (account_history)(operation_history)(operation_type)(operation_id_num)(block_data)(additional_data) ) +FC_REFLECT( graphene::elasticsearch::bulk_struct, + (account_history)(operation_history)(operation_type)(operation_id_num)(block_data)(additional_data) ) diff --git a/libraries/plugins/es_objects/es_objects.cpp b/libraries/plugins/es_objects/es_objects.cpp index 93a4687f36..3e70e28688 100644 --- a/libraries/plugins/es_objects/es_objects.cpp +++ b/libraries/plugins/es_objects/es_objects.cpp @@ -54,26 +54,37 @@ class es_objects_plugin_impl void remove_from_database(object_id_type id, std::string index); friend class graphene::es_objects::es_objects_plugin; + friend struct genesis_inserter; private: + struct plugin_options + { + std::string _es_objects_elasticsearch_url = "http://localhost:9200/"; + std::string _es_objects_auth = ""; + uint32_t _es_objects_bulk_replay = 10000; + uint32_t _es_objects_bulk_sync = 100; + bool _es_objects_proposals = true; + bool _es_objects_accounts = true; + bool _es_objects_assets = true; + bool _es_objects_balances = true; + bool _es_objects_limit_orders = false; + bool _es_objects_asset_bitasset = true; + std::string _es_objects_index_prefix = "objects-"; + uint32_t _es_objects_start_es_after_block = 0; + bool _es_objects_keep_only_current = true; + + void init(const boost::program_options::variables_map& options); + }; + es_objects_plugin& _self; - std::string _es_objects_elasticsearch_url = "http://localhost:9200/"; - std::string _es_objects_auth = ""; - uint32_t _es_objects_bulk_replay = 10000; - uint32_t _es_objects_bulk_sync = 100; - bool _es_objects_proposals = true; - bool _es_objects_accounts = true; - bool _es_objects_assets = true; - bool _es_objects_balances = true; - bool _es_objects_limit_orders = false; - bool _es_objects_asset_bitasset = true; - std::string _es_objects_index_prefix = "objects-"; - uint32_t _es_objects_start_es_after_block = 0; + plugin_options _options; + + uint32_t limit_documents = _options._es_objects_bulk_replay; + CURL *curl; // curl handler - vector bulk; + vector bulk; vector prepare; - bool _es_objects_keep_only_current = true; uint32_t block_number; fc::time_point_sec block_time; @@ -85,6 +96,30 @@ class es_objects_plugin_impl void init_program_options(const boost::program_options::variables_map& options); }; +struct genesis_inserter +{ + es_objects_plugin_impl* my; + graphene::chain::database &db; + + explicit genesis_inserter( es_objects_plugin_impl* _my ) + : my(_my), db( my->_self.database() ) + { // Nothing to do + } + + template + void insert( bool b, const string& prefix ) + { + if( !b ) + return; + + db.get_index( ObjType::space_id, ObjType::type_id ).inspect_all_objects( + [this, &prefix](const graphene::db::object &o) { + auto a = static_cast(&o); + my->prepareTemplate(*a, prefix); + }); + } +}; + bool es_objects_plugin_impl::genesis() { ilog("elasticsearch OBJECTS: inserting data from genesis"); @@ -94,40 +129,12 @@ bool es_objects_plugin_impl::genesis() block_number = db.head_block_num(); block_time = db.head_block_time(); - if (_es_objects_accounts) { - auto &index_accounts = db.get_index(1, 2); - index_accounts.inspect_all_objects([this, &db](const graphene::db::object &o) { - auto obj = db.find_object(o.id); - auto a = static_cast(obj); - prepareTemplate(*a, "account"); - }); - } - if (_es_objects_assets) { - auto &index_assets = db.get_index(1, 3); - index_assets.inspect_all_objects([this, &db](const graphene::db::object &o) { - auto obj = db.find_object(o.id); - auto a = static_cast(obj); - prepareTemplate(*a, "asset"); - }); - } - if (_es_objects_balances) { - auto &index_balances = db.get_index(2, 5); - index_balances.inspect_all_objects([this, &db](const graphene::db::object &o) { - auto obj = db.find_object(o.id); - auto b = static_cast(obj); - prepareTemplate(*b, "balance"); - }); - } + genesis_inserter inserter( this ); - graphene::utilities::ES es; - es.curl = curl; - es.bulk_lines = bulk; - es.elasticsearch_url = _es_objects_elasticsearch_url; - es.auth = _es_objects_auth; - if (!graphene::utilities::SendBulk(std::move(es))) - FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error inserting genesis data."); - else - bulk.clear(); + inserter.insert( _options._es_objects_accounts, "account" ); + inserter.insert( _options._es_objects_assets, "asset" ); + inserter.insert( _options._es_objects_asset_bitasset, "bitasset" ); + inserter.insert( _options._es_objects_balances, "balance" ); return true; } @@ -139,18 +146,17 @@ bool es_objects_plugin_impl::index_database(const vector& ids, s block_time = db.head_block_time(); block_number = db.head_block_num(); - if(block_number > _es_objects_start_es_after_block) { + if(block_number > _options._es_objects_start_es_after_block) { // check if we are in replay or in sync and change number of bulk documents accordingly - uint32_t limit_documents = 0; if ((fc::time_point::now() - block_time) < fc::seconds(30)) - limit_documents = _es_objects_bulk_sync; + limit_documents = _options._es_objects_bulk_sync; else - limit_documents = _es_objects_bulk_replay; + limit_documents = _options._es_objects_bulk_replay; for (auto const &value: ids) { - if (value.is() && _es_objects_proposals) { + if (value.is() && _options._es_objects_proposals) { auto obj = db.find_object(value); auto p = static_cast(obj); if (p != nullptr) { @@ -159,7 +165,7 @@ bool es_objects_plugin_impl::index_database(const vector& ids, s else prepareTemplate(*p, "proposal"); } - } else if (value.is() && _es_objects_accounts) { + } else if (value.is() && _options._es_objects_accounts) { auto obj = db.find_object(value); auto a = static_cast(obj); if (a != nullptr) { @@ -168,7 +174,7 @@ bool es_objects_plugin_impl::index_database(const vector& ids, s else prepareTemplate(*a, "account"); } - } else if (value.is() && _es_objects_assets) { + } else if (value.is() && _options._es_objects_assets) { auto obj = db.find_object(value); auto a = static_cast(obj); if (a != nullptr) { @@ -177,7 +183,7 @@ bool es_objects_plugin_impl::index_database(const vector& ids, s else prepareTemplate(*a, "asset"); } - } else if (value.is() && _es_objects_balances) { + } else if (value.is() && _options._es_objects_balances) { auto obj = db.find_object(value); auto b = static_cast(obj); if (b != nullptr) { @@ -186,7 +192,7 @@ bool es_objects_plugin_impl::index_database(const vector& ids, s else prepareTemplate(*b, "balance"); } - } else if (value.is() && _es_objects_limit_orders) { + } else if (value.is() && _options._es_objects_limit_orders) { auto obj = db.find_object(value); auto l = static_cast(obj); if (l != nullptr) { @@ -195,7 +201,7 @@ bool es_objects_plugin_impl::index_database(const vector& ids, s else prepareTemplate(*l, "limitorder"); } - } else if (value.is() && _es_objects_asset_bitasset) { + } else if (value.is() && _options._es_objects_asset_bitasset) { auto obj = db.find_object(value); auto ba = static_cast(obj); if (ba != nullptr) { @@ -212,8 +218,8 @@ bool es_objects_plugin_impl::index_database(const vector& ids, s graphene::utilities::ES es; es.curl = curl; es.bulk_lines = bulk; - es.elasticsearch_url = _es_objects_elasticsearch_url; - es.auth = _es_objects_auth; + es.elasticsearch_url = _options._es_objects_elasticsearch_url; + es.auth = _options._es_objects_auth; if (!graphene::utilities::SendBulk(std::move(es))) return false; @@ -227,11 +233,11 @@ bool es_objects_plugin_impl::index_database(const vector& ids, s void es_objects_plugin_impl::remove_from_database( object_id_type id, std::string index) { - if(_es_objects_keep_only_current) + if(_options._es_objects_keep_only_current) { fc::mutable_variant_object delete_line; delete_line["_id"] = string(id); - delete_line["_index"] = _es_objects_index_prefix + index; + delete_line["_index"] = _options._es_objects_index_prefix + index; if(!is_es_version_7_or_above) delete_line["_type"] = "_doc"; fc::mutable_variant_object final_delete_line; @@ -246,18 +252,17 @@ template void es_objects_plugin_impl::prepareTemplate(T blockchain_object, string index_name) { fc::mutable_variant_object bulk_header; - bulk_header["_index"] = _es_objects_index_prefix + index_name; + bulk_header["_index"] = _options._es_objects_index_prefix + index_name; if(!is_es_version_7_or_above) bulk_header["_type"] = "_doc"; - if(_es_objects_keep_only_current) + if(_options._es_objects_keep_only_current) { bulk_header["_id"] = string(blockchain_object.id); } - adaptor_struct adaptor; fc::variant blockchain_object_variant; fc::to_variant( blockchain_object, blockchain_object_variant, GRAPHENE_NET_MAX_NESTED_OBJECTS ); - fc::mutable_variant_object o = adaptor.adapt(blockchain_object_variant.get_object()); + fc::mutable_variant_object o( utilities::es_data_adaptor::adapt( blockchain_object_variant.get_object() ) ); o["object_id"] = string(blockchain_object.id); o["block_time"] = block_time; @@ -268,6 +273,19 @@ void es_objects_plugin_impl::prepareTemplate(T blockchain_object, string index_n prepare = graphene::utilities::createBulk(bulk_header, std::move(data)); std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk)); prepare.clear(); + + if( curl && bulk.size() >= limit_documents ) // send data to elasticsearch when bulk is too large + { + graphene::utilities::ES es; + es.curl = curl; + es.bulk_lines = bulk; + es.elasticsearch_url = _options._es_objects_elasticsearch_url; + es.auth = _options._es_objects_auth; + if (!graphene::utilities::SendBulk(std::move(es))) + FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error sending bulk data."); + else + bulk.clear(); + } } es_objects_plugin_impl::~es_objects_plugin_impl() @@ -328,6 +346,11 @@ void es_objects_plugin::plugin_set_program_options( } void detail::es_objects_plugin_impl::init_program_options(const boost::program_options::variables_map& options) +{ + _options.init( options ); +} + +void detail::es_objects_plugin_impl::plugin_options::init(const boost::program_options::variables_map& options) { if (options.count("es-objects-elasticsearch-url") > 0) { _es_objects_elasticsearch_url = options["es-objects-elasticsearch-url"].as(); @@ -375,7 +398,7 @@ void es_objects_plugin::plugin_initialize(const boost::program_options::variable my->init_program_options( options ); database().applied_block.connect([this](const signed_block &b) { - if(b.block_num() == 1 && my->_es_objects_start_es_after_block == 0) { + if( 1U == b.block_num() && 0 == my->_options._es_objects_start_es_after_block ) { if (!my->genesis()) FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error populating genesis data."); } @@ -407,12 +430,12 @@ void es_objects_plugin::plugin_initialize(const boost::program_options::variable graphene::utilities::ES es; es.curl = my->curl; - es.elasticsearch_url = my->_es_objects_elasticsearch_url; - es.auth = my->_es_objects_auth; - es.auth = my->_es_objects_index_prefix; + es.elasticsearch_url = my->_options._es_objects_elasticsearch_url; + es.auth = my->_options._es_objects_auth; + es.auth = my->_options._es_objects_index_prefix; if(!graphene::utilities::checkES(es)) - FC_THROW( "ES database is not up in url ${url}", ("url", my->_es_objects_elasticsearch_url) ); + FC_THROW( "ES database is not up in url ${url}", ("url", my->_options._es_objects_elasticsearch_url) ); graphene::utilities::checkESVersion7OrAbove(es, my->is_es_version_7_or_above); } diff --git a/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp b/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp index dff4812498..cfae63d77b 100644 --- a/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp +++ b/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp @@ -53,61 +53,4 @@ class es_objects_plugin : public graphene::app::plugin std::unique_ptr my; }; -struct adaptor_struct { - fc::mutable_variant_object adapt(const variant_object &obj) { - fc::mutable_variant_object o(obj); - vector keys_to_rename; - for (auto i = o.begin(); i != o.end(); ++i) { - auto &element = (*i).value(); - if (element.is_object()) { - const string &name = (*i).key(); - auto &vo = element.get_object(); - if (vo.contains(name.c_str())) - keys_to_rename.emplace_back(name); - element = adapt(vo); - } else if (element.is_array()) - adapt(element.get_array()); - } - for (const auto &i : keys_to_rename) { - string new_name = i + "_"; - o[new_name] = variant(o[i]); - o.erase(i); - } - if (o.find("owner") != o.end() && o["owner"].is_string()) - { - o["owner_"] = o["owner"].as_string(); - o.erase("owner"); - } - if (o.find("active_special_authority") != o.end()) - { - o["active_special_authority"] = fc::json::to_string(o["active_special_authority"]); - } - if (o.find("owner_special_authority") != o.end()) - { - o["owner_special_authority"] = fc::json::to_string(o["owner_special_authority"]); - } - if (o.find("feeds") != o.end()) - { - o["feeds"] = fc::json::to_string(o["feeds"]); - } - if (o.find("operations") != o.end()) - { - o["operations"] = fc::json::to_string(o["operations"]); - } - - return o; - } - - void adapt(fc::variants &v) { - for (auto &array_element : v) { - if (array_element.is_object()) - array_element = adapt(array_element.get_object()); - else if (array_element.is_array()) - adapt(array_element.get_array()); - else - array_element = array_element.as_string(); - } - } -}; - } } //graphene::es_objects diff --git a/libraries/utilities/elasticsearch.cpp b/libraries/utilities/elasticsearch.cpp index 9a268b766e..75c4abe5cf 100644 --- a/libraries/utilities/elasticsearch.cpp +++ b/libraries/utilities/elasticsearch.cpp @@ -26,6 +26,7 @@ #include #include #include +#include size_t WriteCallback(void *contents, size_t size, size_t nmemb, void *userp) { @@ -216,4 +217,184 @@ std::string doCurl(CurlRequest& curl) return CurlReadBuffer; } +fc::variant es_data_adaptor::adapt(const fc::variant_object& op) +{ + fc::mutable_variant_object o(op); + + // Note: these fields are maps, but were stored in ES as flattened arrays + static const std::map> flattened_fields = { + { "account_auths", data_type::map_type }, + { "address_auths", data_type::map_type }, + { "key_auths", data_type::map_type } + }; + // Note: + // object arrays listed in this map are stored redundantly in ES, with one instance as a nested object and + // the other as a string for backward compatibility, + // object arrays not listed in this map are stored as nested objects only. + static const std::map> to_string_fields = { + { "parameters", data_type::array_type }, // in committee proposals, current_fees.parameters + { "op", data_type::static_variant_type }, // proposal_create_op.proposed_ops[*].op + { "proposed_ops", data_type::array_type }, + { "operations", data_type::array_type }, // proposal_object.operations + { "initializer", data_type::static_variant_type }, + { "policy", data_type::static_variant_type }, + { "predicates", data_type::array_type }, + { "active_special_authority", data_type::static_variant_type }, + { "owner_special_authority", data_type::static_variant_type }, + { "htlc_preimage_hash", data_type::static_variant_type }, + { "feeds", data_type::map_type }, // asset_bitasset_data_object.feeds + { "acceptable_collateral", data_type::map_type }, + { "acceptable_borrowers", data_type::map_type } + }; + std::map> original_arrays; + std::vector keys_to_rename; + for( auto& i : o ) + { + const std::string& name = i.key(); + auto& element = i.value(); + if( element.is_object() ) + { + const auto& vo = element.get_object(); + if( vo.contains(name.c_str()) ) // transfer_operation.amount.amount + keys_to_rename.emplace_back(name); + element = adapt(vo); + } + else if( element.is_array() ) + { + auto& array = element.get_array(); + if( to_string_fields.find(name) != to_string_fields.end() ) + { + // make a backup and convert to string + original_arrays[name] = array; + element = fc::json::to_string(element); + } + else if( flattened_fields.find(name) != flattened_fields.end() ) + { + // make a backup and adapt the original + auto backup = array; + original_arrays[name] = backup; + adapt(array); + } + else + adapt(array); + } + } + + for( const auto& i : keys_to_rename ) // transfer_operation.amount + { + std::string new_name = i + "_"; + o[new_name] = fc::variant(o[i]); + o.erase(i); + } + + if( o.find("nonce") != o.end() ) + { + o["nonce"] = o["nonce"].as_string(); + } + + if( o.find("owner") != o.end() && o["owner"].is_string() ) // vesting_balance_*_operation.owner + { + o["owner_"] = o["owner"].as_string(); + o.erase("owner"); + } + + for( const auto& pair : original_arrays ) + { + const auto& name = pair.first; + auto& value = pair.second; + auto type = data_type::map_type; + if( to_string_fields.find(name) != to_string_fields.end() ) + type = to_string_fields.at(name); + o[name + "_object"] = adapt( value, type ); + } + + fc::variant v; + fc::to_variant(o, v, FC_PACK_MAX_DEPTH); + return v; +} + +fc::variant es_data_adaptor::adapt( const fc::variants& v, data_type type ) +{ + if( data_type::static_variant_type == type ) + return adapt_static_variant(v); + + // map_type or array_type + fc::variants vs; + vs.reserve( v.size() ); + for( const auto& item : v ) + { + if( item.is_array() ) + { + if( data_type::map_type == type ) + vs.push_back( adapt_map_item( item.get_array() ) ); + else // assume it is a static_variant array + vs.push_back( adapt_static_variant( item.get_array() ) ); + } + else if( item.is_object() ) // object array + vs.push_back( adapt( item.get_object() ) ); + else + wlog( "Type of item is unexpected: ${item}", ("item", item) ); + } + + fc::variant nv; + fc::to_variant(vs, nv, FC_PACK_MAX_DEPTH); + return nv; +} + +void es_data_adaptor::extract_data_from_variant( + const fc::variant& v, fc::mutable_variant_object& mv, const std::string& prefix ) +{ + if( v.is_object() ) + mv[prefix + "_object"] = adapt( v.get_object() ); + else if( v.is_int64() || v.is_uint64() ) + mv[prefix + "_int"] = v; + else if( v.is_bool() ) + mv[prefix + "_bool"] = v; + else if( v.is_string() ) + mv[prefix + "_string"] = v.get_string(); + else + mv[prefix + "_string"] = fc::json::to_string( v ); + // Note: we don't use double or array here, and we convert null and blob to string, + // and static_variants (i.e. in custom authorities) and maps (if any) are converted to strings too. +} + +fc::variant es_data_adaptor::adapt_map_item( const fc::variants& v ) +{ + FC_ASSERT( v.size() == 2, "Internal error" ); + fc::mutable_variant_object mv; + + extract_data_from_variant( v[0], mv, "key" ); + extract_data_from_variant( v[1], mv, "data" ); + + fc::variant nv; + fc::to_variant( mv, nv, FC_PACK_MAX_DEPTH ); + return nv; +} + +fc::variant es_data_adaptor::adapt_static_variant( const fc::variants& v ) +{ + FC_ASSERT( v.size() == 2, "Internal error" ); + fc::mutable_variant_object mv; + + mv["which"] = v[0]; + extract_data_from_variant( v[1], mv, "data" ); + + fc::variant nv; + fc::to_variant( mv, nv, FC_PACK_MAX_DEPTH ); + return nv; +} + +void es_data_adaptor::adapt(fc::variants& v) +{ + for (auto& array_element : v) + { + if (array_element.is_object()) + array_element = adapt(array_element.get_object()); + else if (array_element.is_array()) + adapt(array_element.get_array()); + else + array_element = array_element.as_string(); + } +} + } } // end namespace graphene::utilities diff --git a/libraries/utilities/include/graphene/utilities/elasticsearch.hpp b/libraries/utilities/include/graphene/utilities/elasticsearch.hpp index 2fb29a0967..e64f59719d 100644 --- a/libraries/utilities/include/graphene/utilities/elasticsearch.hpp +++ b/libraries/utilities/include/graphene/utilities/elasticsearch.hpp @@ -66,4 +66,31 @@ namespace graphene { namespace utilities { std::string joinBulkLines(const std::vector& bulk); long getResponseCode(CURL *handler); +struct es_data_adaptor +{ + enum class data_type + { + static_variant_type, + map_type, + array_type // can be simple arrays, object arrays, static_variant arrays, or even nested arrays + }; + + static fc::variant adapt( const fc::variant_object& op ); + + static fc::variant adapt( const fc::variants& v, data_type type ); + + static fc::variant adapt_map_item( const fc::variants& v ); + + static fc::variant adapt_static_variant( const fc::variants& v ); + + /// In-place update + static void adapt( fc::variants& v ); + + /// Extract data from @p v into @p mv + static void extract_data_from_variant( const fc::variant& v, + fc::mutable_variant_object& mv, + const std::string& prefix ); + +}; + } } // end namespace graphene::utilities