Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,9 @@ public <Q> Q extension(SearchQueryExtension<Q, H> extension) {

@Override
public ElasticsearchSearchResult<H> fetch(Integer offset, Integer limit) {
SearchWorkBuilder<ElasticsearchLoadableSearchResult<H>> builder =
workFactory.search( payload, searchResultExtractor );
for ( ElasticsearchSearchIndexContext index : searchContext.indexes().elements() ) {
builder.index( index.names().getRead() );
}
builder.paging( defaultedLimit( limit, offset ), offset )
.routingKeys( routingKeys )
.timeout( timeoutValue, timeoutUnit, exceptionOnTimeout )
.requestTransformer(
ElasticsearchSearchRequestTransformerContextImpl.createTransformerFunction( requestTransformer )
);
NonBulkableWork<ElasticsearchLoadableSearchResult<H>> work = builder.build();
NonBulkableWork<ElasticsearchLoadableSearchResult<H>> work = searchWorkBuilder()
.paging( defaultedLimit( limit, offset ), offset )
.build();

return Futures.unwrappedExceptionJoin( queryOrchestrator.submit( work ) )
/*
Expand Down Expand Up @@ -159,16 +150,11 @@ public long fetchTotalHitCount() {
}

@Override
public SearchScroll<H> scroll(Integer pageSize) {
public SearchScroll<H> scroll(int chunkSize) {
String scrollTimeoutString = this.scrollTimeout + "s";

NonBulkableWork<ElasticsearchLoadableSearchResult<H>> firstScroll = workFactory.search( payload, searchResultExtractor )
.routingKeys( routingKeys )
.timeout( timeoutValue, timeoutUnit, exceptionOnTimeout )
.requestTransformer(
ElasticsearchSearchRequestTransformerContextImpl.createTransformerFunction( requestTransformer )
)
.scrolling( pageSize, scrollTimeoutString )
NonBulkableWork<ElasticsearchLoadableSearchResult<H>> firstScroll = searchWorkBuilder()
.scrolling( chunkSize, scrollTimeoutString )
.build();

return new ElasticsearchSearchScroll<>( queryOrchestrator, workFactory, searchResultExtractor, scrollTimeoutString, firstScroll );
Expand Down Expand Up @@ -202,6 +188,21 @@ public JsonObject explain(String typeName, Object id) {
return doExplain( index, id );
}

private SearchWorkBuilder<ElasticsearchLoadableSearchResult<H>> searchWorkBuilder() {
SearchWorkBuilder<ElasticsearchLoadableSearchResult<H>> builder =
workFactory.search( payload, searchResultExtractor );
for ( ElasticsearchSearchIndexContext index : searchContext.indexes().elements() ) {
builder.index( index.names().getRead() );
}
builder
.routingKeys( routingKeys )
.timeout( timeoutValue, timeoutUnit, exceptionOnTimeout )
.requestTransformer(
ElasticsearchSearchRequestTransformerContextImpl.createTransformerFunction( requestTransformer )
);
return builder;
}

private Integer defaultedLimit(Integer limit, Integer offset) {
/*
* If the user has given a 'size' value, take it as is, let ES itself complain if it's too high;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,12 @@ public long fetchTotalHitCount() {
}

@Override
public SearchScroll<H> scroll(Integer pageSize) {
public SearchScroll<H> scroll(int chunkSize) {
Set<String> indexNames = searchContext.indexes().indexNames();
HibernateSearchMultiReader indexReader = HibernateSearchMultiReader.open(
indexNames, searchContext.indexes().elements(), routingKeys );
return new LuceneSearchScroll<>( queryOrchestrator, workFactory, searchContext, routingKeys, timeoutManager, searcher, indexReader, pageSize );
return new LuceneSearchScroll<>( queryOrchestrator, workFactory, searchContext, routingKeys, timeoutManager,
searcher, indexReader, chunkSize );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class LuceneSearchScroll<H> implements SearchScroll<H> {

// specific to this scroll instance:
private final HibernateSearchMultiReader indexReader;
private final int pageSize;
private final int chunkSize;

private int scrollIndex = 0;
private int queryFetchSize;
Expand All @@ -52,17 +52,16 @@ public LuceneSearchScroll(LuceneSyncWorkOrchestrator queryOrchestrator,
Set<String> routingKeys,
TimeoutManager timeoutManager,
LuceneSearcher<LuceneLoadableSearchResult<H>, LuceneExtractableSearchResult<H>> searcher,
HibernateSearchMultiReader indexReader, int pageSize
) {
HibernateSearchMultiReader indexReader, int chunkSize) {
this.queryOrchestrator = queryOrchestrator;
this.workFactory = workFactory;
this.searchContext = searchContext;
this.routingKeys = routingKeys;
this.timeoutManager = timeoutManager;
this.searcher = searcher;
this.indexReader = indexReader;
this.pageSize = pageSize;
this.queryFetchSize = pageSize * 4; // Will fetch the topdocs for the first 4 pages initially
this.chunkSize = chunkSize;
this.queryFetchSize = chunkSize * 4; // Will fetch the topdocs for the first 4 pages initially
}

@Override
Expand All @@ -79,7 +78,7 @@ public void close() {
public SearchScrollResult<H> next() {
timeoutManager.start();

if ( search == null || scrollIndex + pageSize > queryFetchSize ) {
if ( search == null || scrollIndex + chunkSize > queryFetchSize ) {
if ( search != null ) {
queryFetchSize *= 2;
}
Expand All @@ -91,7 +90,7 @@ public SearchScrollResult<H> next() {
return new SimpleSearchScrollResult<>( false, Collections.emptyList(), Duration.ZERO, false );
}

int endIndexExclusive = scrollIndex + pageSize;
int endIndexExclusive = scrollIndex + chunkSize;

LuceneLoadableSearchResult<H> loadableSearchResult;
try {
Expand All @@ -114,7 +113,7 @@ public SearchScrollResult<H> next() {
timeoutManager.stop();

// increasing the index for further next(s)
scrollIndex += pageSize;
scrollIndex += chunkSize;
return new SimpleSearchScrollResult<>( true, result.hits(), result.took(), result.timedOut() );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,27 @@ By default, the index reader is refreshed every second,
but this can be customized on the Elasticsearch side through index settings:
see the `refresh_interval` setting on link:{elasticsearchDocUrl}/index-modules.html[this page].

[[backend-elasticsearch-search]]
== Searching

Searching with the Elasticsearch backend relies on the <<search-dsl,same APIs as any other backend>>.

This section details Elasticsearch-specific configuration related to searching.

[[backend-elasticsearch-search-scroll-timeout]]
=== Scroll timeout

With the Elasticsearch backend, <<search-dsl-query-fetching-results-scrolling,scrolls>> are subject to timeout.
If `next()` is not called for a long period of time (default: 60 seconds),
the scroll will be closed automatically and the next call to `next()` will fail.

Use the following configuration property at the backend level to configure the timeout (in seconds):

[source]
----
hibernate.search.backend.scroll_timeout 60 (default)
----

[[backend-elasticsearch-access-client]]
== Retrieving the REST client
// Search 5 anchors backward compatibility
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,57 @@ include::{sourcedir}/org/hibernate/search/documentation/search/query/QueryDslIT.
<1> Set the offset to `40` and the limit to `20`.
====

[NOTE]
====
The index may be modified between the retrieval of two pages.
As a result of that modification, it is possible that some hits change position,
and end up being present on two subsequent pages.

If you're running a batch process and want to avoid this, use <<search-dsl-query-fetching-results-scrolling>>.
====

[[search-dsl-query-fetching-results-scrolling]]
=== Scrolling
// Search 5 anchors backward compatibility
[[_performance_considerations]]

include::todo-placeholder.asciidoc[]
Scrolling is the concept of keeping a cursor on the search query at the lowest level,
and advancing that cursor progressively to collect subsequent "chunks" of search hits.

Scrolling relies on the internal state of the cursor (which must be closed at some point),
and thus is not appropriate for stateless operations such as displaying a page of results to a user in a webpage.
However, thanks to this internal state, scrolling is able to guarantee that all returned hits are consistent:
there is absolutely no way for a given hit to appear twice.

Scrolling is therefore most useful when processing a large result set as small chunks.

Below is an example of using scrolling in Hibernate Search.

CAUTION: `SearchScroll` exposes a `close()` method that *must* be called to avoid resource leaks.

// TODO https://docs.jboss.org/hibernate/search/5.11/reference/en-US/html_single/#_performance_considerations
[NOTE]
====
With the Elasticsearch backend, scrolls can time out and become unusable after some time;
See <<backend-elasticsearch-search-scroll-timeout,here>> for more information.
====

.Scrolling to retrieve search results in small chunks
====
[source, JAVA, indent=0, subs="+callouts"]
----
include::{sourcedir}/org/hibernate/search/documentation/search/query/QueryDslIT.java[tags=fetching-scrolling]
----
<1> Start a scroll that will return chunks of `20` hits.
Note the scroll is used in a `try-with-resource` block to avoid resource leaks.
<2> Retrieve the first chunk by calling `next()`.
Each chunk will include at most 20 hits, since that was the selected chunk size.
<3> Detect the end of the scroll by calling `hasHits()` on the last retrieved chunk,
and retrieve the next chunk by calling `next()` again on the scroll.
<4> Retrieve the hits of a chunk.
<5> Optionally, if using Hibernate ORM and retrieving entities,
you might want to use the link:{hibernateDocUrl}#batch-session-batch-insert[periodic "flush-clear" pattern]
to ensure entities don't stay in the session taking more and more memory.
====

[[search-dsl-query-routing]]
== Routing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.hibernate.search.documentation.search.projection;

import static org.assertj.core.api.Assertions.assertThat;
import static org.hibernate.search.util.impl.integrationtest.common.assertion.SearchHitsAssert.assertThatHits;

import java.util.Arrays;
import java.util.List;
Expand All @@ -28,7 +29,6 @@
import org.hibernate.search.mapper.orm.common.impl.EntityReferenceImpl;
import org.hibernate.search.mapper.orm.scope.SearchScope;
import org.hibernate.search.mapper.orm.session.SearchSession;
import org.hibernate.search.util.impl.integrationtest.common.assertion.SearchHitsAssert;
import org.hibernate.search.util.impl.integrationtest.mapper.orm.OrmUtils;

import org.junit.Before;
Expand Down Expand Up @@ -106,7 +106,7 @@ public void documentReference() {
.where( f -> f.matchAll() )
.fetchHits( 20 );
// end::documentReference[]
SearchHitsAssert.assertThat( hits ).hasDocRefHitsAnyOrder(
assertThatHits( hits ).hasDocRefHitsAnyOrder(
BOOK_INDEX_NAME,
String.valueOf( BOOK1_ID ),
String.valueOf( BOOK2_ID ),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.hibernate.search.util.impl.integrationtest.mapper.orm.ManagedAssert.assertThatManaged;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
Expand All @@ -26,6 +27,8 @@
import org.hibernate.search.documentation.testsupport.DocumentationSetupHelper;
import org.hibernate.search.engine.search.query.SearchQuery;
import org.hibernate.search.engine.search.query.SearchResult;
import org.hibernate.search.engine.search.query.SearchScroll;
import org.hibernate.search.engine.search.query.SearchScrollResult;
import org.hibernate.search.mapper.orm.Search;
import org.hibernate.search.mapper.orm.search.loading.EntityLoadingCacheLookupStrategy;
import org.hibernate.search.mapper.orm.session.SearchSession;
Expand Down Expand Up @@ -247,6 +250,32 @@ public void pagination() {
} );
}

@Test
public void scrolling() {
OrmUtils.withinJPATransaction( entityManagerFactory, entityManager -> {
SearchSession searchSession = Search.session( entityManager );
List<Integer> collectedIds = new ArrayList<>();
// tag::fetching-scrolling[]
try ( SearchScroll<Book> scroll = searchSession.search( Book.class )
.where( f -> f.matchAll() )
.scroll( 20 ) ) { // <1>
for ( SearchScrollResult<Book> chunk = scroll.next(); // <2>
chunk.hasHits(); chunk = scroll.next() ) { // <3>
for ( Book hit : chunk.hits() ) { // <4>
// ... do something with the hits ...
// end::fetching-scrolling[]
collectedIds.add( hit.getId() );
// tag::fetching-scrolling[]
}
entityManager.flush(); // <5>
entityManager.clear(); // <5>
}
}
// end::fetching-scrolling[]
assertThat( collectedIds ).hasSize( 4 );
} );
}

@Test
public void searchQuery() {
OrmUtils.withinJPATransaction( entityManagerFactory, entityManager -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ public interface SearchFetchable<H> {
* <p>
* Useful to process large datasets.
*
* @param pageSize The maximum number of hits to be returned for each call to {@link SearchScroll#next()}
* @param chunkSize The maximum number of hits to be returned for each call to {@link SearchScroll#next()}
* @return The {@link SearchScroll}.
* @throws IllegalArgumentException if passed 0 or less for pageSize.
* @throws IllegalArgumentException if passed 0 or less for {@code chunkSize}.
*/
SearchScroll<H> scroll(Integer pageSize);
SearchScroll<H> scroll(int chunkSize);

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ public interface SearchScroll<H> extends AutoCloseable {
void close();

/**
* Returns the next page, with at most {@code pageSize} hits.
* Returns the next chunk, with at most {@code chunkSize} hits.
* <p>
* May return a result with less than {@code pageSize} elements if only that many hits are left.
* May return a result with less than {@code chunkSize} elements if only that many hits are left.
*
* @return The next {@link SearchScrollResult}.
* @see SearchFetchable#scroll(Integer)
* @see SearchFetchable#scroll(int)
*/
SearchScrollResult<H> next();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ public long fetchTotalHitCount() {
}

@Override
public SearchScroll<H> scroll(Integer pageSize) {
return toQuery().scroll( pageSize );
public SearchScroll<H> scroll(int chunkSize) {
return toQuery().scroll( chunkSize );
}

private void contribute(SearchPredicateBuilderFactory<? super C> factory, SearchPredicate predicate) {
Expand Down
Loading