Skip to content

Commit

Permalink
MAILBOX-266 Set the size of SearchResponses to 1 in ElasticSearch tests
Browse files Browse the repository at this point in the history
This allows to ensure Scrolls are well performed
  • Loading branch information
chibenwa committed Apr 6, 2016
1 parent 6ba31a6 commit b99d0c7
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 7 deletions.
Expand Up @@ -34,17 +34,26 @@
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;

import com.google.common.annotations.VisibleForTesting;

public class DeleteByQueryPerformer {
public static final int BATCH_SIZE = 100;
public static final int DEFAULT_BATCH_SIZE = 100;
public static final TimeValue TIMEOUT = new TimeValue(60000);

private final ClientProvider clientProvider;
private final ExecutorService executor;
private final int batchSize;

@Inject
public DeleteByQueryPerformer(ClientProvider clientProvider, @Named("AsyncExecutor") ExecutorService executor) {
this(clientProvider, executor, DEFAULT_BATCH_SIZE);
}

@VisibleForTesting
DeleteByQueryPerformer(ClientProvider clientProvider, @Named("AsyncExecutor") ExecutorService executor, int batchSize) {
this.clientProvider = clientProvider;
this.executor = executor;
this.batchSize = batchSize;
}

public void perform(QueryBuilder queryBuilder) {
Expand All @@ -59,7 +68,7 @@ private Void doDeleteByQuery(QueryBuilder queryBuilder) {
.setScroll(TIMEOUT)
.setNoFields()
.setQuery(queryBuilder)
.setSize(BATCH_SIZE));
.setSize(batchSize));
for (SearchResponse searchResponse : scrollIterable) {
deleteRetrievedIds(client, searchResponse);
}
Expand Down
Expand Up @@ -46,14 +46,22 @@
public class ElasticSearchSearcher<Id extends MailboxId> {

private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchSearcher.class);
private static final TimeValue TIMEOUT = new TimeValue(60000);
public static final int DEFAULT_SIZE = 100;

private final ClientProvider clientProvider;
private final QueryConverter queryConverter;
private final int size;

@Inject
public ElasticSearchSearcher(ClientProvider clientProvider, QueryConverter queryConverter) {
this(clientProvider, queryConverter, DEFAULT_SIZE);
}

public ElasticSearchSearcher(ClientProvider clientProvider, QueryConverter queryConverter, int size) {
this.clientProvider = clientProvider;
this.queryConverter = queryConverter;
this.size = size;
}

public Iterator<Long> search(Mailbox<Id> mailbox, SearchQuery searchQuery) throws MailboxException {
Expand All @@ -70,9 +78,9 @@ private SearchRequestBuilder getSearchRequestBuilder(Client client, Mailbox<Id>
.reduce(
client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
.setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
.setScroll(new TimeValue(60000))
.setScroll(TIMEOUT)
.setQuery(queryConverter.from(searchQuery, mailbox.getMailboxId().serialize()))
.setSize(100),
.setSize(size),
(searchBuilder, sort) -> searchBuilder.addSort(SortConverter.convertSort(sort)),
(partialResult1, partialResult2) -> partialResult1);
}
Expand Down
Expand Up @@ -41,6 +41,7 @@

public class ElasticSearchIndexerTest {

public static final int MINIMUM_BATCH_SIZE = 1;
private TemporaryFolder temporaryFolder = new TemporaryFolder();
private EmbeddedElasticSearch embeddedElasticSearch= new EmbeddedElasticSearch(temporaryFolder);

Expand All @@ -55,7 +56,7 @@ public class ElasticSearchIndexerTest {
public void setup() throws IOException {
node = embeddedElasticSearch.getNode();
TestingClientProvider clientProvider = new TestingClientProvider(node);
deleteByQueryPerformer = new DeleteByQueryPerformer(clientProvider, Executors.newSingleThreadExecutor());
deleteByQueryPerformer = new DeleteByQueryPerformer(clientProvider, Executors.newSingleThreadExecutor(), MINIMUM_BATCH_SIZE);
testee = new ElasticSearchIndexer(clientProvider, deleteByQueryPerformer);
}

Expand Down
Expand Up @@ -62,6 +62,8 @@
public class ElasticSearchIntegrationTest {

private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchIntegrationTest.class);
public static final int BATCH_SIZE = 1;
public static final int SEARCH_SIZE = 1;

private TemporaryFolder temporaryFolder = new TemporaryFolder();
private EmbeddedElasticSearch embeddedElasticSearch= new EmbeddedElasticSearch(temporaryFolder);
Expand Down Expand Up @@ -168,8 +170,8 @@ private void initializeMailboxManager() throws Exception {
);
MailboxSessionMapperFactory<InMemoryId> mapperFactory = new InMemoryMailboxSessionMapperFactory();
elasticSearchListeningMessageSearchIndex = new ElasticSearchListeningMessageSearchIndex<>(mapperFactory,
new ElasticSearchIndexer(clientProvider, new DeleteByQueryPerformer(clientProvider, Executors.newSingleThreadExecutor())),
new ElasticSearchSearcher<>(clientProvider, new QueryConverter(new CriterionConverter())),
new ElasticSearchIndexer(clientProvider, new DeleteByQueryPerformer(clientProvider, Executors.newSingleThreadExecutor(), BATCH_SIZE)),
new ElasticSearchSearcher<>(clientProvider, new QueryConverter(new CriterionConverter()), SEARCH_SIZE),
new MessageToElasticSearchJson(new DefaultTextExtractor(), ZoneId.of("Europe/Paris")));
storeMailboxManager = new StoreMailboxManager<>(
mapperFactory,
Expand Down

0 comments on commit b99d0c7

Please sign in to comment.