Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 69 additions & 16 deletions be/src/exec/es/es_scan_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ const std::string SOURCE_SCROLL_SEARCH_FILTER_PATH =
// hits.hits._score used for processing field not exists in one batch
const std::string DOCVALUE_SCROLL_SEARCH_FILTER_PATH =
"filter_path=_scroll_id,hits.total,hits.hits._score,hits.hits.fields";
// for avg, sum, min, max, count push down to ES
const std::string AGG_SCROLL_SEARCH_FILTER_PATH =
"filter_path=_scroll_id,hits.total,hits.hits._score,hits.hits.fields,aggregations,aggregations.groupby.buckets";

const std::string REQUEST_SCROLL_PATH = "_scroll";
const std::string REQUEST_PREFERENCE_PREFIX = "&preference=_shards:";
const std::string REQUEST_PREFERENCE_PREFIX = "preference=_shards:";
const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll";
const std::string REQUEST_SEPARATOR = "/";

Expand All @@ -63,20 +66,37 @@ ESScanReader::ESScanReader(const std::string& target,
if (props.find(KEY_HTTP_SSL_ENABLED) != props.end()) {
std::istringstream(props.at(KEY_HTTP_SSL_ENABLED)) >> std::boolalpha >> _use_ssl_client;
}
if (props.find(KEY_AGG) != props.end()) {
_is_agg = true;
if (props.find(KEY_HAS_GROUP_BY) != props.end()) {
_has_group_by = true;
}
}

std::string batch_size_str = props.at(KEY_BATCH_SIZE);
_batch_size = atoi(batch_size_str.c_str());

std::string filter_path =
_doc_value_mode ? DOCVALUE_SCROLL_SEARCH_FILTER_PATH : SOURCE_SCROLL_SEARCH_FILTER_PATH;

if (props.find(KEY_TERMINATE_AFTER) != props.end()) {
if (_is_agg) {
filter_path = AGG_SCROLL_SEARCH_FILTER_PATH;
_exactly_once = !_has_group_by;
std::stringstream scratch;
scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
<< "/_search?"
<< REQUEST_PREFERENCE_PREFIX << _shards
<< "&" << filter_path;
_get_agg_url = scratch.str();
} else if (props.find(KEY_TERMINATE_AFTER) != props.end()) {
_exactly_once = true;
std::stringstream scratch;
// just send a normal search against the elasticsearch with additional terminate_after param to achieve terminate early effect when limit take effect
scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
<< "/_search?"
<< "terminate_after=" << props.at(KEY_TERMINATE_AFTER) << REQUEST_PREFERENCE_PREFIX
<< _shards << "&" << filter_path;
<< "terminate_after=" << props.at(KEY_TERMINATE_AFTER)
<< "&" << REQUEST_PREFERENCE_PREFIX << _shards
<< "&" << filter_path;
_search_url = scratch.str();
} else {
_exactly_once = false;
Expand All @@ -85,8 +105,10 @@ ESScanReader::ESScanReader(const std::string& target,
// add terminate_after for the first scroll to avoid decompress all postings list
scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
<< "/_search?"
<< "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards << "&"
<< filter_path << "&terminate_after=" << batch_size_str;
<< "scroll=" << _scroll_keep_alive
<< "&" << REQUEST_PREFERENCE_PREFIX << _shards
<< "&" << filter_path
<< "&terminate_after=" << batch_size_str;
_init_scroll_url = scratch.str();
_next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + filter_path;
}
Expand All @@ -98,26 +120,37 @@ ESScanReader::~ESScanReader() {}
Status ESScanReader::open() {
_is_first = true;
if (_exactly_once) {
RETURN_IF_ERROR(_network_client.init(_search_url));
LOG(INFO) << "search request URL: " << _search_url;
if (_is_agg) {
LOG(INFO) << "ES get aggregation URL: " << _get_agg_url;
RETURN_IF_ERROR(_network_client.init(_get_agg_url));
} else {
LOG(INFO) << "ES search request URL: " << _search_url;
RETURN_IF_ERROR(_network_client.init(_search_url));
}
} else {
RETURN_IF_ERROR(_network_client.init(_init_scroll_url));
LOG(INFO) << "First scroll request URL: " << _init_scroll_url;
if (_is_agg) {
LOG(INFO) << "ES First paged request URL: " << _get_agg_url;
RETURN_IF_ERROR(_network_client.init(_get_agg_url));
} else {
LOG(INFO) << "ES First scroll request URL: " << _init_scroll_url;
RETURN_IF_ERROR(_network_client.init(_init_scroll_url));
}
}
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_content_type("application/json");
if (_use_ssl_client) {
_network_client.use_untrusted_ssl();
}
// phase open, we cached the first response for `get_next` phase
VLOG_DEBUG << "ESScanReader::open() _query: " << _query;
Status status = _network_client.execute_post_request(_query, &_cached_response);
if (!status.ok() || _network_client.get_http_status() != 200) {
std::stringstream ss;
ss << "Failed to connect to ES server, errmsg is: " << status.get_error_msg();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
VLOG_CRITICAL << "open _cached response: " << _cached_response;
VLOG_CRITICAL << "ES open _cached response: " << _cached_response;
return Status::OK();
}

Expand All @@ -136,16 +169,31 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr<ScrollParser>& scr
if (_exactly_once) {
return Status::OK();
}
RETURN_IF_ERROR(_network_client.init(_next_scroll_url));
if (_is_agg) {
VLOG_DEBUG << "ES get_next_agg_url: " << _get_agg_url;
RETURN_IF_ERROR(_network_client.init(_get_agg_url));
} else {
VLOG_DEBUG << "ES get_next_scroll_url: " << _next_scroll_url;
RETURN_IF_ERROR(_network_client.init(_next_scroll_url));
}
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_content_type("application/json");
_network_client.set_timeout_ms(_http_timeout_ms);

if (_use_ssl_client) {
_network_client.use_untrusted_ssl();
}
RETURN_IF_ERROR(_network_client.execute_post_request(
ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, _scroll_keep_alive),
&response));

if (_is_agg) {
std::string paged_composite_body;
RETURN_IF_ERROR(ESScrollQueryBuilder::build_composite_page_body(_query, _after_key, paged_composite_body));
RETURN_IF_ERROR(_network_client.execute_post_request(paged_composite_body, &response));
} else {
RETURN_IF_ERROR(_network_client.execute_post_request(
ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, _scroll_keep_alive),
&response));
}

long status = _network_client.get_http_status();
if (status == 404) {
LOG(WARNING) << "request scroll search failure 404["
Expand Down Expand Up @@ -174,7 +222,12 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr<ScrollParser>& scr
if (_exactly_once) {
_eos = true;
} else {
_scroll_id = scroll_parser->get_scroll_id();
if (_is_agg) {
_after_key = scroll_parser->get_after_key();
} else {
_scroll_id = scroll_parser->get_scroll_id();
}

if (scroll_parser->get_size() == 0) {
_eos = true;
return Status::OK();
Expand Down
10 changes: 10 additions & 0 deletions be/src/exec/es/es_scan_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class ESScanReader {
static constexpr const char* KEY_TERMINATE_AFTER = "limit";
static constexpr const char* KEY_DOC_VALUES_MODE = "doc_values_mode";
static constexpr const char* KEY_HTTP_SSL_ENABLED = "http_ssl_enabled";
static constexpr const char* KEY_AGG = "agg";
static constexpr const char* KEY_HAS_GROUP_BY = "has_group_by";

ESScanReader(const std::string& target, const std::map<std::string, std::string>& props,
bool doc_value_mode);
~ESScanReader();
Expand All @@ -57,6 +60,7 @@ class ESScanReader {
std::string _user_name;
std::string _passwd;
std::string _scroll_id;
std::string _after_key;
HttpClient _network_client;
std::string _index;
std::string _type;
Expand Down Expand Up @@ -88,6 +92,12 @@ class ESScanReader {
// _search_url would go into effect when `limit` specified:
// select * from es_table limit 10 -> /es_table/doc/_search?terminate_after=10
std::string _search_url;

// _get_agg_url used to execute pushdown avg,sum,count,min,max
std::string _get_agg_url;
bool _is_agg = false;
bool _has_group_by = false;

bool _eos;
int _batch_size;

Expand Down
Loading