Skip to content

Commit

Permalink
MAILBOX-266 Introduce a scroll Iterable
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa committed Apr 6, 2016
1 parent a032429 commit ebeafcf
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 10 deletions.
Expand Up @@ -43,12 +43,16 @@ public class NodeMappingFactory {
public static final String NESTED = "nested";

public static ClientProvider applyMapping(ClientProvider clientProvider) {
return applyMapping(clientProvider, getMappingContent());
}

public static ClientProvider applyMapping(ClientProvider clientProvider, XContentBuilder mappingsSources) {
try (Client client = clientProvider.get()) {
client.admin()
.indices()
.preparePutMapping(ElasticSearchIndexer.MAILBOX_INDEX)
.setType(ElasticSearchIndexer.MESSAGE_TYPE)
.setSource(getMappingContent())
.setSource(mappingsSources)
.execute()
.actionGet();
}
Expand Down
Expand Up @@ -21,8 +21,11 @@

import java.util.Iterator;
import java.util.Optional;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import javax.inject.Inject;

import org.apache.james.mailbox.elasticsearch.ClientProvider;
import org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer;
import org.apache.james.mailbox.elasticsearch.json.JsonMessageConstants;
Expand All @@ -40,8 +43,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;

public class ElasticSearchSearcher<Id extends MailboxId> {

private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchSearcher.class);
Expand All @@ -57,9 +58,9 @@ public ElasticSearchSearcher(ClientProvider clientProvider, QueryConverter query

public Iterator<Long> search(Mailbox<Id> mailbox, SearchQuery searchQuery) throws MailboxException {
try (Client client = clientProvider.get()) {
return transformResponseToUidIterator(getSearchRequestBuilder(client, mailbox, searchQuery)
.get()
);
return new ScrollIterable(client, getSearchRequestBuilder(client, mailbox, searchQuery)).stream()
.flatMap(this::transformResponseToUidStream)
.iterator();
}
}

Expand All @@ -76,13 +77,11 @@ private SearchRequestBuilder getSearchRequestBuilder(Client client, Mailbox<Id>
(partialResult1, partialResult2) -> partialResult1);
}

private Iterator<Long> transformResponseToUidIterator(SearchResponse searchResponse) {
private Stream<Long> transformResponseToUidStream(SearchResponse searchResponse) {
return StreamSupport.stream(searchResponse.getHits().spliterator(), false)
.map(this::extractUidFromHit)
.filter(Optional::isPresent)
.map(Optional::get)
.iterator();

.map(Optional::get);
}

private Optional<Long> extractUidFromHit(SearchHit hit) {
Expand Down
@@ -0,0 +1,81 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.elasticsearch.search;

import java.util.Iterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;

public class ScrollIterable implements Iterable<SearchResponse> {

private static final TimeValue TIMEOUT = new TimeValue(60000);
private final Client client;
private final SearchRequestBuilder searchRequestBuilder;

public ScrollIterable(Client client, SearchRequestBuilder searchRequestBuilder) {
this.client = client;
this.searchRequestBuilder = searchRequestBuilder;
}

@Override
public Iterator<SearchResponse> iterator() {
return new ScrollIterator(client, searchRequestBuilder);
}

public Stream<SearchResponse> stream() {
return StreamSupport.stream(spliterator(), false);
}

public static class ScrollIterator implements Iterator<SearchResponse> {

private final Client client;
private ListenableActionFuture<SearchResponse> searchResponseFuture;

public ScrollIterator(Client client, SearchRequestBuilder searchRequestBuilder) {
this.client = client;
this.searchResponseFuture = searchRequestBuilder.execute();
}

@Override
public boolean hasNext() {
return !allSearchResponsesConsumed(searchResponseFuture.actionGet());
}

@Override
public SearchResponse next() {
SearchResponse result = searchResponseFuture.actionGet();
searchResponseFuture = client.prepareSearchScroll(result.getScrollId())
.setScroll(TIMEOUT)
.execute();
return result;
}

private boolean allSearchResponsesConsumed(SearchResponse searchResponse) {
return searchResponse.getHits().getHits().length == 0;
}
}

}
@@ -0,0 +1,173 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.elasticsearch.search;

import static org.assertj.core.api.Assertions.assertThat;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.james.mailbox.elasticsearch.ClientProvider;
import org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer;
import org.apache.james.mailbox.elasticsearch.EmbeddedElasticSearch;
import org.apache.james.mailbox.elasticsearch.IndexCreationFactory;
import org.apache.james.mailbox.elasticsearch.NodeMappingFactory;
import org.apache.james.mailbox.elasticsearch.utils.TestingClientProvider;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.SearchHit;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;

public class ScrollIterableTest {

public static final TimeValue TIMEOUT = new TimeValue(6000);
public static final int SIZE = 2;
public static final String MESSAGE = "message";

private TemporaryFolder temporaryFolder = new TemporaryFolder();
private EmbeddedElasticSearch embeddedElasticSearch= new EmbeddedElasticSearch(temporaryFolder);

@Rule
public RuleChain ruleChain = RuleChain.outerRule(temporaryFolder).around(embeddedElasticSearch);

private ClientProvider clientProvider;

@Before
public void setUp() throws Exception {
clientProvider = new TestingClientProvider(embeddedElasticSearch.getNode());
IndexCreationFactory.createIndex(clientProvider);
embeddedElasticSearch.awaitForElasticSearch();
NodeMappingFactory.applyMapping(clientProvider, getMappingsSources());
}

private XContentBuilder getMappingsSources() throws IOException {
return jsonBuilder()
.startObject()
.startObject(ElasticSearchIndexer.MESSAGE_TYPE)
.startObject(NodeMappingFactory.PROPERTIES)
.startObject(MESSAGE)
.field(NodeMappingFactory.TYPE, NodeMappingFactory.STRING)
.endObject()
.endObject()
.endObject()
.endObject();
}

@Test
public void scrollIterableShouldWorkWhenEmpty() {
try (Client client = clientProvider.get()) {
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
.setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
.setScroll(TIMEOUT)
.setQuery(matchAllQuery())
.setSize(SIZE);
assertThat(new ScrollIterable(client, searchRequestBuilder)).isEmpty();
}
}

@Test
public void scrollIterableShouldWorkWhenOneElement() {
try (Client client = clientProvider.get()) {
String id = "1";
client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, ElasticSearchIndexer.MESSAGE_TYPE, id)
.setSource(MESSAGE, "Sample message")
.execute();

embeddedElasticSearch.awaitForElasticSearch();

SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
.setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
.setScroll(TIMEOUT)
.setQuery(matchAllQuery())
.setSize(SIZE);
assertThat(convertToIdList(new ScrollIterable(client, searchRequestBuilder))).containsOnly(id);
}
}

@Test
public void scrollIterableShouldWorkWhenSizeElement() {
try (Client client = clientProvider.get()) {
String id1 = "1";
client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, ElasticSearchIndexer.MESSAGE_TYPE, id1)
.setSource(MESSAGE, "Sample message")
.execute();

String id2 = "2";
client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, ElasticSearchIndexer.MESSAGE_TYPE, id2)
.setSource(MESSAGE, "Sample message")
.execute();

embeddedElasticSearch.awaitForElasticSearch();

SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
.setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
.setScroll(TIMEOUT)
.setQuery(matchAllQuery())
.setSize(SIZE);
assertThat(convertToIdList(new ScrollIterable(client, searchRequestBuilder))).containsOnly(id1, id2);
}
}

@Test
public void scrollIterableShouldWorkWhenMoreThanSizeElement() {
try (Client client = clientProvider.get()) {
String id1 = "1";
client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, ElasticSearchIndexer.MESSAGE_TYPE, id1)
.setSource(MESSAGE, "Sample message")
.execute();

String id2 = "2";
client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, ElasticSearchIndexer.MESSAGE_TYPE, id2)
.setSource(MESSAGE, "Sample message")
.execute();

String id3 = "3";
client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, ElasticSearchIndexer.MESSAGE_TYPE, id3)
.setSource(MESSAGE, "Sample message")
.execute();

embeddedElasticSearch.awaitForElasticSearch();

SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
.setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
.setScroll(TIMEOUT)
.setQuery(matchAllQuery())
.setSize(SIZE);
assertThat(convertToIdList(new ScrollIterable(client, searchRequestBuilder))).containsOnly(id1, id2, id3);
}
}

private List<String> convertToIdList(ScrollIterable scrollIterable) {
return scrollIterable.stream()
.flatMap(searchResponse -> Arrays.asList(searchResponse.getHits().getHits()).stream())
.map(SearchHit::getId)
.collect(Collectors.toList());
}
}

0 comments on commit ebeafcf

Please sign in to comment.