Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

elasticsearch history api #1682 #1725

Merged
merged 16 commits into from Aug 14, 2019
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -13,7 +13,7 @@ add_library( graphene_app
)

# need to link graphene_debug_witness because plugins aren't sufficiently isolated #246
target_link_libraries( graphene_app graphene_market_history graphene_account_history graphene_grouped_orders graphene_chain fc graphene_db graphene_net graphene_utilities graphene_debug_witness )
target_link_libraries( graphene_app graphene_market_history graphene_account_history graphene_elasticsearch graphene_grouped_orders graphene_chain fc graphene_db graphene_net graphene_utilities graphene_debug_witness )
target_include_directories( graphene_app
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include"
"${CMAKE_CURRENT_SOURCE_DIR}/../egenesis/include" )
@@ -328,6 +328,10 @@ namespace graphene { namespace app {
start = node.operation_id;
} catch(...) { return result; }

auto es = _app.get_plugin<elasticsearch::elasticsearch_plugin>("elasticsearch");
This conversation was marked as resolved by pmconrad

This comment has been minimized.

Copy link
@pmconrad

pmconrad Apr 20, 2019

Contributor

This will throw if the plugin is not enabled, that's probably not what you want. I suppose that's also the reason why the travis tests failed.

This comment has been minimized.

Copy link
@oxarbitrage

oxarbitrage Apr 20, 2019

Author Member

What about having the get_plugin option in a try block and do the normal non elasticsearch code in the catch?

I will of course prefer something else but i cant find a way to check this without this problem.

This comment has been minimized.

Copy link
@pmconrad

pmconrad Apr 23, 2019

Contributor

Doing try/catch for this feels dirty. Maybe add an is_plugin_enabled call to application?

if(es)
This conversation was marked as resolved by pmconrad

This comment has been minimized.

Copy link
@pmconrad

pmconrad Apr 20, 2019

Contributor

Need to check if _elasticsearch_operation_string is enabled too, because otherwise we can't return proper history entries from ES.

What we actually need to check is if the entries we're looking up have the op field, i. e. if _elasticsearch_operation_string has always been enabled on this database. We can't know that from current settings, therefore I think it would be better to use an explicit config flag that says if we want to use ES for queries or not.

That said, I think an important use-case is to run a node that queries ES but does not write into it (an ES cluster that is written by only one node can be used by many for queries). So perhaps you should move ES queries into a separate plugin.

This comment has been minimized.

Copy link
@oxarbitrage

oxarbitrage Apr 20, 2019

Author Member

What about having a mode enum field with 3 options:

  • only_save - Will connect to apply block, elasticsearch-operation-string can be on or off, same as elasticsearch-operation-object.
  • only_query - Will skip connection to apply_block altogether, saving options as elasticsearch-operation-string and elasticsearch-operation-object will be ignored.
  • both - Will connect to apply_block, elasticsearch-operation-string will be forced to be on, elasticsearch-operation-object will remain optional.

A new plugin just for the queries is still a doable option however i am not sure if it will be the best to setup. I am adding @clockworkgr and @sschiessl-bcp to the discussion as they have public ES nodes.

This comment has been minimized.

Copy link
@pmconrad

pmconrad Apr 23, 2019

Contributor

That would work as well, unless you want different ES server URLs for reading and writing.

return es->get_account_history(account, stop, limit, start);
This conversation was marked as resolved by pmconrad

This comment has been minimized.

Copy link
@pmconrad

pmconrad Apr 20, 2019

Contributor

Problem: this works synchronously, inside the database thread. It is important to decouple this.
The ES query plugin should set up a separate thread for executing the queries and parsing the responses. The database thread should then make the call to the ES thread and wait for the result to return. Waiting will yield, which frees up the DB thread for other things.

This comment has been minimized.

Copy link
@oxarbitrage

oxarbitrage Apr 20, 2019

Author Member

I understand the importance of this.

Can you point me in the right direction on how to make it following the graphene/bitshares style. Example in the codebase, functions to use, etc will be of great help.

Thank you.

This comment has been minimized.

Copy link
@pmconrad

pmconrad Apr 23, 2019

Contributor

We don't have many examples where threading is used.

Check the network node.

_async returns a future. Waiting for the future result will yield, i. e. other tasks within the current thread will be executed in parallel. Not sure what that means in the context of an API call, you'll have to experiment there.


const auto& hist_idx = db.get_index_type<account_transaction_history_index>();
const auto& by_op_idx = hist_idx.indices().get<by_op>();
auto index_start = by_op_idx.begin();
@@ -32,6 +32,8 @@

#include <graphene/grouped_orders/grouped_orders_plugin.hpp>

#include <graphene/elasticsearch/elasticsearch_plugin.hpp>

#include <graphene/debug_witness/debug_api.hpp>

#include <graphene/net/node.hpp>
@@ -26,7 +26,6 @@
#include <graphene/chain/impacted.hpp>
#include <graphene/chain/account_evaluator.hpp>
#include <curl/curl.h>
#include <graphene/utilities/elasticsearch.hpp>

namespace graphene { namespace elasticsearch {

@@ -59,6 +58,7 @@ class elasticsearch_plugin_impl
std::string _elasticsearch_index_prefix = "bitshares-";
bool _elasticsearch_operation_object = false;
uint32_t _elasticsearch_start_es_after_block = 0;
bool _elasticsearch_operation_string = true;
CURL *curl; // curl handler
vector <string> bulk_lines; // vector of op lines
vector<std::string> prepare;
@@ -223,9 +223,8 @@ void elasticsearch_plugin_impl::doOperationHistory(const optional <operation_his
adaptor_struct adaptor;
os.op_object = adaptor.adapt(os.op_object.get_object());
}
else
if(_elasticsearch_operation_string)
os.op = fc::json::to_string(oho->op);

}

void elasticsearch_plugin_impl::doBlock(uint32_t trx_in_block, const signed_block& b)
@@ -436,6 +435,7 @@ void elasticsearch_plugin::plugin_set_program_options(
("elasticsearch-index-prefix", boost::program_options::value<std::string>(), "Add a prefix to the index(bitshares-)")
("elasticsearch-operation-object", boost::program_options::value<bool>(), "Save operation as object(false)")
("elasticsearch-start-es-after-block", boost::program_options::value<uint32_t>(), "Start doing ES job after block(0)")
("elasticsearch-operation-string", boost::program_options::value<bool>(), "Save operation as string. Needed to serve history api calls(true)")
;
cfg.add(cli);
}
@@ -473,7 +473,10 @@ void elasticsearch_plugin::plugin_initialize(const boost::program_options::varia
}
if (options.count("elasticsearch-start-es-after-block")) {
my->_elasticsearch_start_es_after_block = options["elasticsearch-start-es-after-block"].as<uint32_t>();
}
}
if (options.count("elasticsearch-operation-string")) {
my->_elasticsearch_operation_string = options["elasticsearch-operation-string"].as<bool>();
}
}

void elasticsearch_plugin::plugin_startup()
@@ -488,4 +491,145 @@ void elasticsearch_plugin::plugin_startup()
ilog("elasticsearch ACCOUNT HISTORY: plugin_startup() begin");
}

operation_history_object elasticsearch_plugin::get_operation_by_id(operation_history_id_type id)
{
const string operation_id_string = idToString(id);

const string query = R"(
{
"query": {
"bool": {
"must": [
{
"query_string": {
"query": "account_history.operation_id: )" + operation_id_string + R"("
}
},
{
"range": {
"block_data.block_time": {
"gte": "now-20y",

This comment has been minimized.

Copy link
@pmconrad

pmconrad Jul 26, 2019

Contributor

Put a TODO in your calendar for the year 2034: bump this. ;-)

Can't you leave out this part of the query?

This comment has been minimized.

Copy link
@sschiessl-bcp

sschiessl-bcp Jul 29, 2019

I agree that the range could be omitted here.

Also I am wondering why you do query_string and not

"query": {
        "match" : {
            "account_history.operation_id" : "operation_id_string"
        }
    }

This comment has been minimized.

Copy link
@oxarbitrage

oxarbitrage Jul 31, 2019

Author Member

my apologies, this was not a production ready query but just a proof of concept probably pasted from kibana. i had changed this at 1f6ac14

This comment has been minimized.

Copy link
@oxarbitrage

oxarbitrage Jul 31, 2019

Author Member

for the second query i removed the range but query_string needs to stay as we are making a lucene query there and not just matching a field. 63f7aff

"lte": "now"
}
}
}
]
}
}
}
)";

auto es = prepareHistoryQuery(query);
const auto response = graphene::utilities::simpleQuery(es);
variant variant_response = fc::json::from_string(response);
const auto source = variant_response["hits"]["hits"][size_t(0)]["_source"];
return fromEStoOperation(source);
}

vector<operation_history_object> elasticsearch_plugin::get_account_history(
const account_id_type account_id,
operation_history_id_type stop = operation_history_id_type(),
unsigned limit = 100,
operation_history_id_type start = operation_history_id_type())
{
const string account_id_string = idToString(account_id);

const auto stop_number = stop.instance.value;
const auto start_number = start.instance.value;

string range = "";
if(stop_number == 0)
range = " AND operation_id_num: ["+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]";
else if(stop_number > 0)
range = " AND operation_id_num: {"+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]";

const string query = R"(
{
"size": )" + fc::to_string(limit) + R"(,
"sort" : [{ "operation_id_num" : {"order" : "desc"}}],
"query": {
"bool": {
"must": [
{
"query_string": {
"query": "account_history.account: )" + account_id_string + range + R"("
}
},
{
"range": {
"block_data.block_time": {
"gte": "now-20y",
"lte": "now"
}
}
}
]
}
}
}
)";

auto es = prepareHistoryQuery(query);

vector<operation_history_object> result;

if(!graphene::utilities::checkES(es))
return result;

const auto response = graphene::utilities::simpleQuery(es);
variant variant_response = fc::json::from_string(response);

const auto hits = variant_response["hits"]["total"];
const auto size = std::min(static_cast<uint32_t>(hits.as_uint64()), limit);

for(unsigned i=0; i<size; i++)
{
const auto source = variant_response["hits"]["hits"][size_t(i)]["_source"];
result.push_back(fromEStoOperation(source));
}
return result;
}

operation_history_object elasticsearch_plugin::fromEStoOperation(variant source)
{
operation_history_object result;

const auto operation_id = source["account_history"]["operation_id"];
fc::from_variant( operation_id, result.id, GRAPHENE_MAX_NESTED_OBJECTS );

const auto op = fc::json::from_string(source["operation_history"]["op"].as_string());
fc::from_variant( op, result.op, GRAPHENE_MAX_NESTED_OBJECTS );

const auto operation_result = fc::json::from_string(source["operation_history"]["operation_result"].as_string());
fc::from_variant( operation_result, result.result, GRAPHENE_MAX_NESTED_OBJECTS );

result.block_num = source["block_data"]["block_num"].as_uint64();
result.trx_in_block = source["operation_history"]["trx_in_block"].as_uint64();
result.op_in_trx = source["operation_history"]["op_in_trx"].as_uint64();
result.trx_in_block = source["operation_history"]["virtual_op"].as_uint64();

return result;
}

template<typename T>
std::string elasticsearch_plugin::idToString(T id)
{
return fc::to_string(id.space_id) + "." + fc::to_string(id.type_id) + "." + fc::to_string(id.instance.value);
This conversation was marked as resolved by pmconrad

This comment has been minimized.

Copy link
@pmconrad

pmconrad Apr 20, 2019

Contributor

This already exists as std::string(object_id_type(id))

This comment has been minimized.

Copy link
@oxarbitrage

oxarbitrage Apr 20, 2019

Author Member

thank you for this one, i knew there was a way but i could not find it. i have this crappy function in some other code as well that i will replace.

}

graphene::utilities::ES elasticsearch_plugin::prepareHistoryQuery(string query)
{
CURL *curl;
curl = curl_easy_init();

graphene::utilities::ES es;
es.curl = curl;
es.elasticsearch_url = my->_elasticsearch_node_url;
es.index_prefix = my->_elasticsearch_index_prefix;
es.endpoint = es.index_prefix + "*/data/_search";
es.query = query;

return es;
}

} }
@@ -26,6 +26,7 @@
#include <graphene/app/plugin.hpp>
#include <graphene/chain/database.hpp>
#include <graphene/chain/operation_history_object.hpp>
#include <graphene/utilities/elasticsearch.hpp>

namespace graphene { namespace elasticsearch {
using namespace chain;
@@ -63,8 +64,18 @@ class elasticsearch_plugin : public graphene::app::plugin
virtual void plugin_initialize(const boost::program_options::variables_map& options) override;
virtual void plugin_startup() override;

operation_history_object get_operation_by_id(operation_history_id_type id);
vector<operation_history_object> get_account_history(const account_id_type account_id,
operation_history_id_type stop, unsigned limit, operation_history_id_type start);

friend class detail::elasticsearch_plugin_impl;
std::unique_ptr<detail::elasticsearch_plugin_impl> my;

private:
operation_history_object fromEStoOperation(variant source);
template<typename T>
std::string idToString(T id);
graphene::utilities::ES prepareHistoryQuery(string query);
};

struct operation_visitor
@@ -186,15 +186,18 @@ database_fixture::database_fixture(const fc::time_point_sec &initial_timestamp)
boost::unit_test::framework::current_test_case().p_name.value == "track_votes_committee_disabled") {
app.chain_database()->enable_standby_votes_tracking( false );
}
if(current_test_name == "elasticsearch_account_history" || current_test_name == "elasticsearch_suite") {
if(current_test_name == "elasticsearch_account_history" || current_test_name == "elasticsearch_suite" ||
current_test_name == "elasticsearch_history_api") {
auto esplugin = app.register_plugin<graphene::elasticsearch::elasticsearch_plugin>();
esplugin->plugin_set_app(&app);

options.insert(std::make_pair("elasticsearch-node-url", boost::program_options::variable_value(string("http://localhost:9200/"), false)));
options.insert(std::make_pair("elasticsearch-bulk-replay", boost::program_options::variable_value(uint32_t(2), false)));
options.insert(std::make_pair("elasticsearch-bulk-sync", boost::program_options::variable_value(uint32_t(2), false)));
options.insert(std::make_pair("elasticsearch-visitor", boost::program_options::variable_value(true, false)));
//options.insert(std::make_pair("elasticsearch-basic-auth", boost::program_options::variable_value(string("elastic:changeme"), false)));
options.insert(std::make_pair("elasticsearch-start-es-after-block", boost::program_options::variable_value(uint32_t(0), false)));
options.insert(std::make_pair("elasticsearch-visitor", boost::program_options::variable_value(false, false)));
options.insert(std::make_pair("elasticsearch-operation-object", boost::program_options::variable_value(true, false)));
options.insert(std::make_pair("elasticsearch-operation-string", boost::program_options::variable_value(true, false)));

esplugin->plugin_initialize(options);
esplugin->plugin_startup();
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.