Skip to content

Commit

Permalink
MAILBOX-266 ElasticSearch client should be instanciated once
Browse files Browse the repository at this point in the history
Note : this commit includes a complete rework of how James MPT SMPT tests works.
 - Rely on the guice project (which highlight the initialization problem and that it brakes SMPT)
 - Better manage resources

This was needed because ElasticSearch was never truly configured before. Which you can do with a client provider (you end up never instanciating the client)
but not if you rely on a real client.
  • Loading branch information
chibenwa committed Apr 6, 2016
1 parent 7f02ab7 commit 6628e4f
Show file tree
Hide file tree
Showing 24 changed files with 242 additions and 290 deletions.
Expand Up @@ -40,18 +40,18 @@ public class DeleteByQueryPerformer {
public static final int DEFAULT_BATCH_SIZE = 100; public static final int DEFAULT_BATCH_SIZE = 100;
public static final TimeValue TIMEOUT = new TimeValue(60000); public static final TimeValue TIMEOUT = new TimeValue(60000);


private final ClientProvider clientProvider; private final Client client;
private final ExecutorService executor; private final ExecutorService executor;
private final int batchSize; private final int batchSize;


@Inject @Inject
public DeleteByQueryPerformer(ClientProvider clientProvider, @Named("AsyncExecutor") ExecutorService executor) { public DeleteByQueryPerformer(Client client, @Named("AsyncExecutor") ExecutorService executor) {
this(clientProvider, executor, DEFAULT_BATCH_SIZE); this(client, executor, DEFAULT_BATCH_SIZE);
} }


@VisibleForTesting @VisibleForTesting
DeleteByQueryPerformer(ClientProvider clientProvider, @Named("AsyncExecutor") ExecutorService executor, int batchSize) { DeleteByQueryPerformer(Client client, @Named("AsyncExecutor") ExecutorService executor, int batchSize) {
this.clientProvider = clientProvider; this.client = client;
this.executor = executor; this.executor = executor;
this.batchSize = batchSize; this.batchSize = batchSize;
} }
Expand All @@ -62,17 +62,15 @@ public Void perform(QueryBuilder queryBuilder) {
} }


protected void doDeleteByQuery(QueryBuilder queryBuilder) { protected void doDeleteByQuery(QueryBuilder queryBuilder) {
try (Client client = clientProvider.get()) { new ScrollIterable(client,
new ScrollIterable(client, client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX) .setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
.setTypes(ElasticSearchIndexer.MESSAGE_TYPE) .setScroll(TIMEOUT)
.setScroll(TIMEOUT) .setNoFields()
.setNoFields() .setQuery(queryBuilder)
.setQuery(queryBuilder) .setSize(batchSize))
.setSize(batchSize)) .stream()
.stream() .forEach(searchResponse -> deleteRetrievedIds(client, searchResponse));
.forEach(searchResponse -> deleteRetrievedIds(client, searchResponse));
}
} }


private ListenableActionFuture<BulkResponse> deleteRetrievedIds(Client client, SearchResponse searchResponse) { private ListenableActionFuture<BulkResponse> deleteRetrievedIds(Client client, SearchResponse searchResponse) {
Expand Down
Expand Up @@ -56,40 +56,34 @@ public String getUpdatedDocumentPart() {
public static final String MAILBOX_INDEX = "mailbox"; public static final String MAILBOX_INDEX = "mailbox";
public static final String MESSAGE_TYPE = "message"; public static final String MESSAGE_TYPE = "message";


private final ClientProvider clientProvider; private final Client client;
private final DeleteByQueryPerformer deleteByQueryPerformer; private final DeleteByQueryPerformer deleteByQueryPerformer;


@Inject @Inject
public ElasticSearchIndexer(ClientProvider clientProvider, DeleteByQueryPerformer deleteByQueryPerformer) { public ElasticSearchIndexer(Client client, DeleteByQueryPerformer deleteByQueryPerformer) {
this.clientProvider = clientProvider; this.client = client;
this.deleteByQueryPerformer = deleteByQueryPerformer; this.deleteByQueryPerformer = deleteByQueryPerformer;
} }


public IndexResponse indexMessage(String id, String content) { public IndexResponse indexMessage(String id, String content) {
checkArgument(content); checkArgument(content);
try (Client client = clientProvider.get()) { return client.prepareIndex(MAILBOX_INDEX, MESSAGE_TYPE, id)
return client.prepareIndex(MAILBOX_INDEX, MESSAGE_TYPE, id) .setSource(content)
.setSource(content) .get();
.get();
}
} }


public BulkResponse updateMessages(List<UpdatedRepresentation> updatedDocumentParts) { public BulkResponse updateMessages(List<UpdatedRepresentation> updatedDocumentParts) {
Preconditions.checkNotNull(updatedDocumentParts); Preconditions.checkNotNull(updatedDocumentParts);
try (Client client = clientProvider.get()) { BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); updatedDocumentParts.forEach(updatedDocumentPart -> bulkRequestBuilder.add(client.prepareUpdate(MAILBOX_INDEX, MESSAGE_TYPE, updatedDocumentPart.getId())
updatedDocumentParts.forEach(updatedDocumentPart -> bulkRequestBuilder.add(client.prepareUpdate(MAILBOX_INDEX, MESSAGE_TYPE, updatedDocumentPart.getId()) .setDoc(updatedDocumentPart.getUpdatedDocumentPart())));
.setDoc(updatedDocumentPart.getUpdatedDocumentPart()))); return bulkRequestBuilder.get();
return bulkRequestBuilder.get();
}
} }

public BulkResponse deleteMessages(List<String> ids) { public BulkResponse deleteMessages(List<String> ids) {
try (Client client = clientProvider.get()) { BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); ids.forEach(id -> bulkRequestBuilder.add(client.prepareDelete(MAILBOX_INDEX, MESSAGE_TYPE, id)));
ids.forEach(id -> bulkRequestBuilder.add(client.prepareDelete(MAILBOX_INDEX, MESSAGE_TYPE, id))); return bulkRequestBuilder.get();
return bulkRequestBuilder.get();
}
} }


public Void deleteAllMatchingQuery(QueryBuilder queryBuilder) { public Void deleteAllMatchingQuery(QueryBuilder queryBuilder) {
Expand Down
Expand Up @@ -36,33 +36,31 @@ public class IndexCreationFactory {
private static final int DEFAULT_NB_REPLICA = 0; private static final int DEFAULT_NB_REPLICA = 0;
public static final String CASE_INSENSITIVE = "case_insensitive"; public static final String CASE_INSENSITIVE = "case_insensitive";


public static ClientProvider createIndex(ClientProvider clientProvider, int nbShards, int nbReplica) { public static Client createIndex(Client client, int nbShards, int nbReplica) {
try { try {
return createIndex(clientProvider, generateSetting(nbShards, nbReplica)); return createIndex(client, generateSetting(nbShards, nbReplica));
} catch (IOException e) { } catch (IOException e) {
LOGGER.error("Error while creating index : ", e); LOGGER.error("Error while creating index : ", e);
return clientProvider; return client;
} }
} }


public static ClientProvider createIndex(ClientProvider clientProvider) { public static Client createIndex(Client client) {
return createIndex(clientProvider, DEFAULT_NB_SHARDS, DEFAULT_NB_REPLICA); return createIndex(client, DEFAULT_NB_SHARDS, DEFAULT_NB_REPLICA);
} }


private static ClientProvider createIndex(ClientProvider clientProvider, XContentBuilder settings) { private static Client createIndex(Client client, XContentBuilder settings) {
try { try {
try (Client client = clientProvider.get()) {
client.admin() client.admin()
.indices() .indices()
.prepareCreate(ElasticSearchIndexer.MAILBOX_INDEX) .prepareCreate(ElasticSearchIndexer.MAILBOX_INDEX)
.setSettings(settings) .setSettings(settings)
.execute() .execute()
.actionGet(); .actionGet();
}
} catch (IndexAlreadyExistsException exception) { } catch (IndexAlreadyExistsException exception) {
LOGGER.info("Index [" + ElasticSearchIndexer.MAILBOX_INDEX + "] already exist"); LOGGER.info("Index [" + ElasticSearchIndexer.MAILBOX_INDEX + "] already exist");
} }
return clientProvider; return client;
} }


private static XContentBuilder generateSetting(int nbShards, int nbReplica) throws IOException { private static XContentBuilder generateSetting(int nbShards, int nbReplica) throws IOException {
Expand Down
Expand Up @@ -45,21 +45,19 @@ public class NodeMappingFactory {
public static final String RAW = "raw"; public static final String RAW = "raw";
public static final String ANALYZER = "analyzer"; public static final String ANALYZER = "analyzer";


public static ClientProvider applyMapping(ClientProvider clientProvider) { public static Client applyMapping(Client client) {
return applyMapping(clientProvider, getMappingContent()); return applyMapping(client, getMappingContent());
} }


public static ClientProvider applyMapping(ClientProvider clientProvider, XContentBuilder mappingsSources) { public static Client applyMapping(Client client, XContentBuilder mappingsSources) {
try (Client client = clientProvider.get()) { client.admin()
client.admin() .indices()
.indices() .preparePutMapping(ElasticSearchIndexer.MAILBOX_INDEX)
.preparePutMapping(ElasticSearchIndexer.MAILBOX_INDEX) .setType(ElasticSearchIndexer.MESSAGE_TYPE)
.setType(ElasticSearchIndexer.MESSAGE_TYPE) .setSource(mappingsSources)
.setSource(mappingsSources) .execute()
.execute() .actionGet();
.actionGet(); return client;
}
return clientProvider;
} }


private static XContentBuilder getMappingContent() { private static XContentBuilder getMappingContent() {
Expand Down
Expand Up @@ -26,7 +26,6 @@


import javax.inject.Inject; import javax.inject.Inject;


import org.apache.james.mailbox.elasticsearch.ClientProvider;
import org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer; import org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer;
import org.apache.james.mailbox.elasticsearch.json.JsonMessageConstants; import org.apache.james.mailbox.elasticsearch.json.JsonMessageConstants;
import org.apache.james.mailbox.elasticsearch.query.QueryConverter; import org.apache.james.mailbox.elasticsearch.query.QueryConverter;
Expand All @@ -49,27 +48,25 @@ public class ElasticSearchSearcher<Id extends MailboxId> {
private static final TimeValue TIMEOUT = new TimeValue(60000); private static final TimeValue TIMEOUT = new TimeValue(60000);
public static final int DEFAULT_SIZE = 100; public static final int DEFAULT_SIZE = 100;


private final ClientProvider clientProvider; private final Client client;
private final QueryConverter queryConverter; private final QueryConverter queryConverter;
private final int size; private final int size;


@Inject @Inject
public ElasticSearchSearcher(ClientProvider clientProvider, QueryConverter queryConverter) { public ElasticSearchSearcher(Client client, QueryConverter queryConverter) {
this(clientProvider, queryConverter, DEFAULT_SIZE); this(client, queryConverter, DEFAULT_SIZE);
} }


public ElasticSearchSearcher(ClientProvider clientProvider, QueryConverter queryConverter, int size) { public ElasticSearchSearcher(Client client, QueryConverter queryConverter, int size) {
this.clientProvider = clientProvider; this.client = client;
this.queryConverter = queryConverter; this.queryConverter = queryConverter;
this.size = size; this.size = size;
} }


public Iterator<Long> search(Mailbox<Id> mailbox, SearchQuery searchQuery) throws MailboxException { public Iterator<Long> search(Mailbox<Id> mailbox, SearchQuery searchQuery) throws MailboxException {
try (Client client = clientProvider.get()) { return new ScrollIterable(client, getSearchRequestBuilder(client, mailbox, searchQuery)).stream()
return new ScrollIterable(client, getSearchRequestBuilder(client, mailbox, searchQuery)).stream() .flatMap(this::transformResponseToUidStream)
.flatMap(this::transformResponseToUidStream) .iterator();
.iterator();
}
} }


private SearchRequestBuilder getSearchRequestBuilder(Client client, Mailbox<Id> mailbox, SearchQuery searchQuery) { private SearchRequestBuilder getSearchRequestBuilder(Client client, Mailbox<Id> mailbox, SearchQuery searchQuery) {
Expand Down
Expand Up @@ -56,14 +56,14 @@ public class ElasticSearchIndexerTest {
public void setup() throws IOException { public void setup() throws IOException {
node = embeddedElasticSearch.getNode(); node = embeddedElasticSearch.getNode();
TestingClientProvider clientProvider = new TestingClientProvider(node); TestingClientProvider clientProvider = new TestingClientProvider(node);
DeleteByQueryPerformer deleteByQueryPerformer = new DeleteByQueryPerformer(clientProvider, Executors.newSingleThreadExecutor(), MINIMUM_BATCH_SIZE) { DeleteByQueryPerformer deleteByQueryPerformer = new DeleteByQueryPerformer(clientProvider.get(), Executors.newSingleThreadExecutor(), MINIMUM_BATCH_SIZE) {
@Override @Override
public Void perform(QueryBuilder queryBuilder) { public Void perform(QueryBuilder queryBuilder) {
doDeleteByQuery(queryBuilder); doDeleteByQuery(queryBuilder);
return null; return null;
} }
}; };
testee = new ElasticSearchIndexer(clientProvider, deleteByQueryPerformer); testee = new ElasticSearchIndexer(clientProvider.get(), deleteByQueryPerformer);
} }


@Test @Test
Expand Down
Expand Up @@ -49,6 +49,7 @@
import org.apache.james.mailbox.store.StoreMailboxManager; import org.apache.james.mailbox.store.StoreMailboxManager;
import org.apache.james.mailbox.store.StoreMessageManager; import org.apache.james.mailbox.store.StoreMessageManager;
import org.apache.james.mailbox.store.mail.model.Mailbox; import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.elasticsearch.client.Client;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
Expand Down Expand Up @@ -165,13 +166,13 @@ public void setUp() throws Exception {
} }


private void initializeMailboxManager() throws Exception { private void initializeMailboxManager() throws Exception {
ClientProvider clientProvider = NodeMappingFactory.applyMapping( Client client = NodeMappingFactory.applyMapping(
IndexCreationFactory.createIndex(new TestingClientProvider(embeddedElasticSearch.getNode())) IndexCreationFactory.createIndex(new TestingClientProvider(embeddedElasticSearch.getNode()).get())
); );
MailboxSessionMapperFactory<InMemoryId> mapperFactory = new InMemoryMailboxSessionMapperFactory(); MailboxSessionMapperFactory<InMemoryId> mapperFactory = new InMemoryMailboxSessionMapperFactory();
elasticSearchListeningMessageSearchIndex = new ElasticSearchListeningMessageSearchIndex<>(mapperFactory, elasticSearchListeningMessageSearchIndex = new ElasticSearchListeningMessageSearchIndex<>(mapperFactory,
new ElasticSearchIndexer(clientProvider, new DeleteByQueryPerformer(clientProvider, Executors.newSingleThreadExecutor(), BATCH_SIZE)), new ElasticSearchIndexer(client, new DeleteByQueryPerformer(client, Executors.newSingleThreadExecutor(), BATCH_SIZE)),
new ElasticSearchSearcher<>(clientProvider, new QueryConverter(new CriterionConverter()), SEARCH_SIZE), new ElasticSearchSearcher<>(client, new QueryConverter(new CriterionConverter()), SEARCH_SIZE),
new MessageToElasticSearchJson(new DefaultTextExtractor(), ZoneId.of("Europe/Paris"))); new MessageToElasticSearchJson(new DefaultTextExtractor(), ZoneId.of("Europe/Paris")));
storeMailboxManager = new InMemoryMailboxManager( storeMailboxManager = new InMemoryMailboxManager(
mapperFactory, mapperFactory,
Expand Down
Expand Up @@ -62,9 +62,9 @@ public class ScrollIterableTest {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
clientProvider = new TestingClientProvider(embeddedElasticSearch.getNode()); clientProvider = new TestingClientProvider(embeddedElasticSearch.getNode());
IndexCreationFactory.createIndex(clientProvider); IndexCreationFactory.createIndex(clientProvider.get());
embeddedElasticSearch.awaitForElasticSearch(); embeddedElasticSearch.awaitForElasticSearch();
NodeMappingFactory.applyMapping(clientProvider, getMappingsSources()); NodeMappingFactory.applyMapping(clientProvider.get(), getMappingsSources());
} }


private XContentBuilder getMappingsSources() throws IOException { private XContentBuilder getMappingsSources() throws IOException {
Expand Down
Expand Up @@ -32,7 +32,6 @@
import org.apache.james.mailbox.acl.MailboxACLResolver; import org.apache.james.mailbox.acl.MailboxACLResolver;
import org.apache.james.mailbox.acl.SimpleGroupMembershipResolver; import org.apache.james.mailbox.acl.SimpleGroupMembershipResolver;
import org.apache.james.mailbox.acl.UnionMailboxACLResolver; import org.apache.james.mailbox.acl.UnionMailboxACLResolver;
import org.apache.james.mailbox.elasticsearch.ClientProvider;
import org.apache.james.mailbox.elasticsearch.DeleteByQueryPerformer; import org.apache.james.mailbox.elasticsearch.DeleteByQueryPerformer;
import org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer; import org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer;
import org.apache.james.mailbox.elasticsearch.EmbeddedElasticSearch; import org.apache.james.mailbox.elasticsearch.EmbeddedElasticSearch;
Expand All @@ -58,6 +57,7 @@
import org.apache.james.mpt.api.ImapFeatures.Feature; import org.apache.james.mpt.api.ImapFeatures.Feature;
import org.apache.james.mpt.host.JamesImapHostSystem; import org.apache.james.mpt.host.JamesImapHostSystem;
import org.apache.james.mpt.imapmailbox.MailboxCreationDelegate; import org.apache.james.mpt.imapmailbox.MailboxCreationDelegate;
import org.elasticsearch.client.Client;


import com.google.common.base.Throwables; import com.google.common.base.Throwables;


Expand Down Expand Up @@ -95,17 +95,17 @@ protected void resetData() throws Exception {
} }


private void initFields() { private void initFields() {
ClientProvider clientProvider = NodeMappingFactory.applyMapping( Client client = NodeMappingFactory.applyMapping(
IndexCreationFactory.createIndex(new TestingClientProvider(embeddedElasticSearch.getNode())) IndexCreationFactory.createIndex(new TestingClientProvider(embeddedElasticSearch.getNode()).get())
); );


userManager = new MockAuthenticator(); userManager = new MockAuthenticator();
InMemoryMailboxSessionMapperFactory factory = new InMemoryMailboxSessionMapperFactory(); InMemoryMailboxSessionMapperFactory factory = new InMemoryMailboxSessionMapperFactory();


ElasticSearchListeningMessageSearchIndex<InMemoryId> searchIndex = new ElasticSearchListeningMessageSearchIndex<>( ElasticSearchListeningMessageSearchIndex<InMemoryId> searchIndex = new ElasticSearchListeningMessageSearchIndex<>(
factory, factory,
new ElasticSearchIndexer(clientProvider, new DeleteByQueryPerformer(clientProvider, Executors.newSingleThreadExecutor())), new ElasticSearchIndexer(client, new DeleteByQueryPerformer(client, Executors.newSingleThreadExecutor())),
new ElasticSearchSearcher<>(clientProvider, new QueryConverter(new CriterionConverter())), new ElasticSearchSearcher<>(client, new QueryConverter(new CriterionConverter())),
new MessageToElasticSearchJson(new DefaultTextExtractor())); new MessageToElasticSearchJson(new DefaultTextExtractor()));


MailboxACLResolver aclResolver = new UnionMailboxACLResolver(); MailboxACLResolver aclResolver = new UnionMailboxACLResolver();
Expand Down

0 comments on commit 6628e4f

Please sign in to comment.