Skip to content

Commit

Permalink
[feature](inverted index) String type inverted index match function c…
Browse files Browse the repository at this point in the history
…ompletion (apache#38170)

1. Inverted index of string type supports match_phrase_prefix and
match_regexp.
  • Loading branch information
zzzxl1993 committed Jul 31, 2024
1 parent 7c4cdcf commit 6fdce0d
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 76 deletions.
21 changes: 11 additions & 10 deletions be/src/olap/match_predicate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ Status MatchPredicate::evaluate(const vectorized::IndexFieldNameAndTypePair& nam
if (iterator == nullptr) {
return Status::OK();
}
if (_skip_evaluate(iterator)) {
return Status::Error<ErrorCode::INVERTED_INDEX_EVALUATE_SKIPPED>(
"match predicate evaluate skipped.");
if (_check_evaluate(iterator)) {
return Status::Error<ErrorCode::INVERTED_INDEX_INVALID_PARAMETERS>(
"phrase queries require setting support_phrase = true");
}
auto type = name_with_type.second;
const std::string& name = name_with_type.first;
Expand Down Expand Up @@ -122,13 +122,14 @@ InvertedIndexQueryType MatchPredicate::_to_inverted_index_query_type(MatchType m
return ret;
}

bool MatchPredicate::_skip_evaluate(InvertedIndexIterator* iterator) const {
if ((_match_type == MatchType::MATCH_PHRASE || _match_type == MatchType::MATCH_PHRASE_PREFIX ||
_match_type == MatchType::MATCH_PHRASE_EDGE) &&
iterator->get_inverted_index_reader_type() == InvertedIndexReaderType::FULLTEXT &&
get_parser_phrase_support_string_from_properties(iterator->get_index_properties()) ==
INVERTED_INDEX_PARSER_PHRASE_SUPPORT_NO) {
return true;
bool MatchPredicate::_check_evaluate(InvertedIndexIterator* iterator) const {
if (_match_type == MatchType::MATCH_PHRASE || _match_type == MatchType::MATCH_PHRASE_PREFIX ||
_match_type == MatchType::MATCH_PHRASE_EDGE) {
if (iterator->get_inverted_index_reader_type() == InvertedIndexReaderType::FULLTEXT &&
get_parser_phrase_support_string_from_properties(iterator->get_index_properties()) ==
INVERTED_INDEX_PARSER_PHRASE_SUPPORT_NO) {
return true;
}
}
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/match_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class MatchPredicate : public ColumnPredicate {
std::string info = "MatchPredicate";
return info;
}
bool _skip_evaluate(InvertedIndexIterator* iterator) const;
bool _check_evaluate(InvertedIndexIterator* iterator) const;

private:
std::string _value;
Expand Down
103 changes: 47 additions & 56 deletions be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,27 @@ Status InvertedIndexReader::create_index_searcher(lucene::store::Directory* dir,
return Status::OK();
};

Status InvertedIndexReader::match_index_search(
OlapReaderStatistics* stats, RuntimeState* runtime_state, InvertedIndexQueryType query_type,
const InvertedIndexQueryInfo& query_info, const FulltextIndexSearcherPtr& index_searcher,
const std::shared_ptr<roaring::Roaring>& term_match_bitmap) {
TQueryOptions queryOptions = runtime_state->query_options();
try {
SCOPED_RAW_TIMER(&stats->inverted_index_searcher_search_timer);
auto query = QueryFactory::create(query_type, index_searcher, queryOptions);
if (!query) {
return Status::Error<ErrorCode::INVERTED_INDEX_INVALID_PARAMETERS>(
"query type " + query_type_to_string(query_type) + ", query is nullptr");
}
query->add(query_info);
query->search(*term_match_bitmap);
} catch (const CLuceneError& e) {
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>("CLuceneError occured: {}",
e.what());
}
return Status::OK();
}

Status FullTextIndexReader::new_iterator(OlapReaderStatistics* stats, RuntimeState* runtime_state,
std::unique_ptr<InvertedIndexIterator>* iterator) {
*iterator = InvertedIndexIterator::create_unique(stats, runtime_state, shared_from_this());
Expand Down Expand Up @@ -384,27 +405,6 @@ Status FullTextIndexReader::query(OlapReaderStatistics* stats, RuntimeState* run
}
}

Status FullTextIndexReader::match_index_search(
OlapReaderStatistics* stats, RuntimeState* runtime_state, InvertedIndexQueryType query_type,
const InvertedIndexQueryInfo& query_info, const FulltextIndexSearcherPtr& index_searcher,
const std::shared_ptr<roaring::Roaring>& term_match_bitmap) {
TQueryOptions queryOptions = runtime_state->query_options();
try {
SCOPED_RAW_TIMER(&stats->inverted_index_searcher_search_timer);
auto query = QueryFactory::create(query_type, index_searcher, queryOptions);
if (!query) {
return Status::Error<ErrorCode::INVERTED_INDEX_INVALID_PARAMETERS>(
"query type " + query_type_to_string(query_type) + ", query is nullptr");
}
query->add(query_info);
query->search(*term_match_bitmap);
} catch (const CLuceneError& e) {
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>("CLuceneError occured: {}",
e.what());
}
return Status::OK();
}

InvertedIndexReaderType FullTextIndexReader::type() {
return InvertedIndexReaderType::FULLTEXT;
}
Expand Down Expand Up @@ -461,28 +461,25 @@ Status StringTypeInvertedIndexReader::query(OlapReaderStatistics* stats,
std::string search_str(search_query->data, act_len);
VLOG_DEBUG << "begin to query the inverted index from clucene"
<< ", column_name: " << column_name << ", search_str: " << search_str;
std::wstring column_name_ws = StringUtil::string_to_wstring(column_name);
std::wstring search_str_ws = StringUtil::string_to_wstring(search_str);
// unique_ptr with custom deleter
std::unique_ptr<lucene::index::Term, void (*)(lucene::index::Term*)> term {
_CLNEW lucene::index::Term(column_name_ws.c_str(), search_str_ws.c_str()),
[](lucene::index::Term* term) { _CLDECDELETE(term); }};
std::unique_ptr<lucene::search::Query> query;

auto index_file_key = _inverted_index_file_reader->get_index_file_cache_key(&_index_meta);

// try to get query bitmap result from cache and return immediately on cache hit
InvertedIndexQueryCache::CacheKey cache_key {index_file_key, column_name, query_type,
search_str};
auto* cache = InvertedIndexQueryCache::instance();
InvertedIndexQueryCacheHandle cache_handler;

auto cache_status = handle_query_cache(cache, cache_key, &cache_handler, stats, bit_map);
if (cache_status.ok()) {
return Status::OK();
}

roaring::Roaring result;
std::wstring column_name_ws = StringUtil::string_to_wstring(column_name);

InvertedIndexQueryInfo query_info;
query_info.field_name = column_name_ws;
query_info.terms.emplace_back(search_str);

auto result = std::make_shared<roaring::Roaring>();
FulltextIndexSearcherPtr* searcher_ptr = nullptr;
InvertedIndexCacheHandle inverted_index_cache_handle;
RETURN_IF_ERROR(handle_searcher_cache(&inverted_index_cache_handle, stats));
Expand All @@ -494,33 +491,29 @@ Status StringTypeInvertedIndexReader::query(OlapReaderStatistics* stats,
case InvertedIndexQueryType::MATCH_ANY_QUERY:
case InvertedIndexQueryType::MATCH_ALL_QUERY:
case InvertedIndexQueryType::EQUAL_QUERY: {
query = std::make_unique<lucene::search::TermQuery>(term.get());
SCOPED_RAW_TIMER(&stats->inverted_index_searcher_search_timer);
(*searcher_ptr)->_search(query.get(), [&result](DocRange* doc_range) {
if (doc_range->type_ == DocRangeType::kMany) {
result.addMany(doc_range->doc_many_size_, doc_range->doc_many->data());
} else {
result.addRange(doc_range->doc_range.first, doc_range->doc_range.second);
}
});
RETURN_IF_ERROR(match_index_search(stats, runtime_state,
InvertedIndexQueryType::MATCH_ANY_QUERY,
query_info, *searcher_ptr, result));
break;
}
case InvertedIndexQueryType::MATCH_PHRASE_QUERY: {
query = std::make_unique<lucene::search::TermQuery>(term.get());
SCOPED_RAW_TIMER(&stats->inverted_index_searcher_search_timer);
(*searcher_ptr)
->_search(query.get(),
[&result](const int32_t docid, const float_t /*score*/) {
// docid equal to rowid in segment
result.add(docid);
});
case InvertedIndexQueryType::MATCH_PHRASE_QUERY:
case InvertedIndexQueryType::MATCH_PHRASE_PREFIX_QUERY:
case InvertedIndexQueryType::MATCH_REGEXP_QUERY: {
RETURN_IF_ERROR(match_index_search(stats, runtime_state, query_type, query_info,
*searcher_ptr, result));
break;
}

case InvertedIndexQueryType::LESS_THAN_QUERY:
case InvertedIndexQueryType::LESS_EQUAL_QUERY:
case InvertedIndexQueryType::GREATER_THAN_QUERY:
case InvertedIndexQueryType::GREATER_EQUAL_QUERY: {
std::wstring search_str_ws = StringUtil::string_to_wstring(search_str);
// unique_ptr with custom deleter
std::unique_ptr<lucene::index::Term, void (*)(lucene::index::Term*)> term {
_CLNEW lucene::index::Term(column_name_ws.c_str(), search_str_ws.c_str()),
[](lucene::index::Term* term) { _CLDECDELETE(term); }};
std::unique_ptr<lucene::search::Query> query;

bool include_upper = query_type == InvertedIndexQueryType::LESS_EQUAL_QUERY;
bool include_lower = query_type == InvertedIndexQueryType::GREATER_EQUAL_QUERY;

Expand All @@ -537,7 +530,7 @@ Status StringTypeInvertedIndexReader::query(OlapReaderStatistics* stats,
(*searcher_ptr)
->_search(query.get(),
[&result](const int32_t docid, const float_t /*score*/) {
result.add(docid);
result->add(docid);
});
break;
}
Expand All @@ -560,12 +553,10 @@ Status StringTypeInvertedIndexReader::query(OlapReaderStatistics* stats,
}

// add to cache
std::shared_ptr<roaring::Roaring> term_match_bitmap =
std::make_shared<roaring::Roaring>(result);
term_match_bitmap->runOptimize();
cache->insert(cache_key, term_match_bitmap, &cache_handler);
result->runOptimize();
cache->insert(cache_key, result, &cache_handler);

bit_map = term_match_bitmap;
bit_map = result;
}
return Status::OK();
}
Expand Down
13 changes: 6 additions & 7 deletions be/src/olap/rowset/segment_v2/inverted_index_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ class InvertedIndexReader : public std::enable_shared_from_this<InvertedIndexRea
InvertedIndexReaderType reader_type);

protected:
Status match_index_search(OlapReaderStatistics* stats, RuntimeState* runtime_state,
InvertedIndexQueryType query_type,
const InvertedIndexQueryInfo& query_info,
const FulltextIndexSearcherPtr& index_searcher,
const std::shared_ptr<roaring::Roaring>& term_match_bitmap);

friend class InvertedIndexIterator;
std::shared_ptr<InvertedIndexFileReader> _inverted_index_file_reader;
TabletIndex _index_meta;
Expand Down Expand Up @@ -177,13 +183,6 @@ class FullTextIndexReader : public InvertedIndexReader {
const std::map<string, string>& properties);
static void setup_analyzer_use_stopwords(std::unique_ptr<lucene::analysis::Analyzer>& analyzer,
const std::map<string, string>& properties);

private:
Status match_index_search(OlapReaderStatistics* stats, RuntimeState* runtime_state,
InvertedIndexQueryType query_type,
const InvertedIndexQueryInfo& query_info,
const FulltextIndexSearcherPtr& index_searcher,
const std::shared_ptr<roaring::Roaring>& term_match_bitmap);
};

class StringTypeInvertedIndexReader : public InvertedIndexReader {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
22

-- !sql --
22

-- !sql --
270

-- !sql --
210

-- !sql --
180

-- !sql --
875

Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.


suite("test_index_complex_match", "p0"){
def indexTbName1 = "test_index_complex_match_1"
def indexTbName2 = "test_index_complex_match_2"

sql "DROP TABLE IF EXISTS ${indexTbName1}"
sql "DROP TABLE IF EXISTS ${indexTbName2}"

sql """
CREATE TABLE ${indexTbName1} (
`@timestamp` int(11) NULL COMMENT "",
`clientip` varchar(20) NULL COMMENT "",
`request` text NULL COMMENT "",
`status` int(11) NULL COMMENT "",
`size` int(11) NULL COMMENT "",
INDEX idx_1 (`clientip`) USING INVERTED COMMENT '',
INDEX idx_2 (`request`) USING INVERTED PROPERTIES("parser" = "english") COMMENT ''
) ENGINE=OLAP
DUPLICATE KEY(`@timestamp`)
COMMENT "OLAP"
DISTRIBUTED BY RANDOM BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

sql """
CREATE TABLE ${indexTbName2} (
`@timestamp` int(11) NULL COMMENT "",
`clientip` varchar(20) NULL COMMENT "",
`request` text NULL COMMENT "",
`status` int(11) NULL COMMENT "",
`size` int(11) NULL COMMENT "",
INDEX idx_1 (`clientip`) USING INVERTED COMMENT '',
INDEX idx_2 (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "false") COMMENT ''
) ENGINE=OLAP
DUPLICATE KEY(`@timestamp`)
COMMENT "OLAP"
DISTRIBUTED BY RANDOM BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false,
expected_succ_rows = -1, load_to_single_tablet = 'true' ->

// load the json data
streamLoad {
table "${table_name}"

// set http request header params
set 'label', label + "_" + UUID.randomUUID().toString()
set 'read_json_by_line', read_flag
set 'format', format_flag
file file_name // import json file
time 10000 // limit inflight 10s
if (expected_succ_rows >= 0) {
set 'max_filter_ratio', '1'
}

// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (ignore_failure && expected_succ_rows < 0) { return }
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
if (expected_succ_rows >= 0) {
assertEquals(json.NumberLoadedRows, expected_succ_rows)
} else {
assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
}
}

try {
load_httplogs_data.call(indexTbName1, indexTbName1, 'true', 'json', 'documents-1000.json')
load_httplogs_data.call(indexTbName2, indexTbName2, 'true', 'json', 'documents-1000.json')

sql "sync"

qt_sql """ select count() from ${indexTbName1} where clientip match_phrase '247.37.0.0'; """
qt_sql """ select count() from ${indexTbName1} where clientip match_phrase_prefix '247'; """
qt_sql """ select count() from ${indexTbName1} where clientip match_regexp '2'; """

qt_sql """ select count() from ${indexTbName1} where request match_phrase 'hm'; """
qt_sql """ select count() from ${indexTbName1} where request match_phrase_prefix 'na'; """
qt_sql """ select count() from ${indexTbName1} where request match_regexp 'ag'; """


try {
sql """ select count() from ${indexTbName2} where request match_phrase 'hm'; """
} catch (Exception e) {
log.info(e.getMessage());
assertTrue(e.getMessage().contains("phrase queries require setting support_phrase = true"))
}

try {
sql """ select count() from ${indexTbName2} where request match_phrase_prefix 'na'; """
} catch (Exception e) {
log.info(e.getMessage());
assertTrue(e.getMessage().contains("phrase queries require setting support_phrase = true"))
}

try {
sql """ select count() from ${indexTbName2} where request match_regexp 'ag'; """
} catch (Exception e) {
log.info(e.getMessage());
assertTrue(e.getMessage().contains("phrase queries require setting support_phrase = true"))
}

} finally {
//try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}
Loading

0 comments on commit 6fdce0d

Please sign in to comment.