Skip to content

Commit

Permalink
JAMES-2398 Modify injections and configuration to allow several index…
Browse files Browse the repository at this point in the history
…es, with different mappings, pointed by different aliases.
  • Loading branch information
chibenwa authored and mbaechler committed May 21, 2018
1 parent 6013f91 commit 3ae482a
Show file tree
Hide file tree
Showing 16 changed files with 281 additions and 265 deletions.
Expand Up @@ -21,9 +21,6 @@


import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;


import javax.inject.Inject;
import javax.inject.Named;

import org.apache.james.backends.es.search.ScrollIterable; import org.apache.james.backends.es.search.ScrollIterable;
import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
Expand All @@ -46,13 +43,13 @@ public class DeleteByQueryPerformer {
private final AliasName aliasName; private final AliasName aliasName;
private final TypeName typeName; private final TypeName typeName;


@Inject public DeleteByQueryPerformer(Client client, ExecutorService executor,
public DeleteByQueryPerformer(Client client, @Named("AsyncExecutor") ExecutorService executor, @Named(ElasticSearchConstants.WRITE_ALIAS) AliasName aliasName, TypeName typeName) { AliasName aliasName, TypeName typeName) {
this(client, executor, DEFAULT_BATCH_SIZE, aliasName, typeName); this(client, executor, DEFAULT_BATCH_SIZE, aliasName, typeName);
} }


@VisibleForTesting @VisibleForTesting
public DeleteByQueryPerformer(Client client, @Named("AsyncExecutor") ExecutorService executor, int batchSize, AliasName aliasName, TypeName typeName) { public DeleteByQueryPerformer(Client client, ExecutorService executor, int batchSize, AliasName aliasName, TypeName typeName) {
this.client = client; this.client = client;
this.executor = executor; this.executor = executor;
this.batchSize = batchSize; this.batchSize = batchSize;
Expand Down

This file was deleted.

Expand Up @@ -20,6 +20,7 @@


import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutorService;


import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
Expand All @@ -31,23 +32,33 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;


public class Indexer { public class ElasticSearchIndexer {
private static int DEBUG_MAX_LENGTH_CONTENT = 1000; private static int DEBUG_MAX_LENGTH_CONTENT = 1000;
private static final int DEFAULT_BATCH_SIZE = 100;


private static final Logger LOGGER = LoggerFactory.getLogger(Indexer.class); private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchIndexer.class);


private final Client client; private final Client client;
private final DeleteByQueryPerformer deleteByQueryPerformer; private final DeleteByQueryPerformer deleteByQueryPerformer;
private final AliasName aliasName; private final AliasName aliasName;
private final TypeName typeName; private final TypeName typeName;


public Indexer(Client client, DeleteByQueryPerformer deleteByQueryPerformer, public ElasticSearchIndexer(Client client, ExecutorService executor,
AliasName aliasName, AliasName aliasName,
TypeName typeName) { TypeName typeName) {
this(client, executor, aliasName, typeName, DEFAULT_BATCH_SIZE);
}

@VisibleForTesting
public ElasticSearchIndexer(Client client, ExecutorService executor,
AliasName aliasName,
TypeName typeName,
int batchSize) {
this.client = client; this.client = client;
this.deleteByQueryPerformer = deleteByQueryPerformer; this.deleteByQueryPerformer = new DeleteByQueryPerformer(client, executor, batchSize, aliasName, typeName);
this.aliasName = aliasName; this.aliasName = aliasName;
this.typeName = typeName; this.typeName = typeName;
} }
Expand Down

This file was deleted.

Expand Up @@ -27,7 +27,6 @@
import org.apache.james.backends.es.utils.TestingClientProvider; import org.apache.james.backends.es.utils.TestingClientProvider;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.junit.Before; import org.junit.Before;
Expand All @@ -38,7 +37,7 @@


import com.google.common.collect.Lists; import com.google.common.collect.Lists;


public class IndexerTest { public class ElasticSearchIndexerTest {


private static final int MINIMUM_BATCH_SIZE = 1; private static final int MINIMUM_BATCH_SIZE = 1;
private static final IndexName INDEX_NAME = new IndexName("index_name"); private static final IndexName INDEX_NAME = new IndexName("index_name");
Expand All @@ -51,7 +50,7 @@ public class IndexerTest {
public RuleChain ruleChain = RuleChain.outerRule(temporaryFolder).around(embeddedElasticSearch); public RuleChain ruleChain = RuleChain.outerRule(temporaryFolder).around(embeddedElasticSearch);


private Node node; private Node node;
private Indexer testee; private ElasticSearchIndexer testee;


@Before @Before
public void setup() { public void setup() {
Expand All @@ -61,17 +60,7 @@ public void setup() {
.useIndex(INDEX_NAME) .useIndex(INDEX_NAME)
.addAlias(ALIAS_NAME) .addAlias(ALIAS_NAME)
.createIndexAndAliases(clientProvider.get()); .createIndexAndAliases(clientProvider.get());
DeleteByQueryPerformer deleteByQueryPerformer = new DeleteByQueryPerformer(clientProvider.get(), testee = new ElasticSearchIndexer(clientProvider.get(), Executors.newSingleThreadExecutor(), ALIAS_NAME, TYPE_NAME, MINIMUM_BATCH_SIZE);
Executors.newSingleThreadExecutor(),
MINIMUM_BATCH_SIZE,
ALIAS_NAME,
TYPE_NAME) {
@Override
public void perform(QueryBuilder queryBuilder) {
doDeleteByQuery(queryBuilder);
}
};
testee = new Indexer(clientProvider.get(), deleteByQueryPerformer, ALIAS_NAME, TYPE_NAME);
} }


@Test @Test
Expand Down
Expand Up @@ -24,6 +24,11 @@
import org.apache.james.backends.es.TypeName; import org.apache.james.backends.es.TypeName;


public interface MailboxElasticSearchConstants { public interface MailboxElasticSearchConstants {

interface InjectionNames {
String MAILBOX = "mailbox";
}

AliasName DEFAULT_MAILBOX_WRITE_ALIAS = new AliasName("mailboxWriteAlias"); AliasName DEFAULT_MAILBOX_WRITE_ALIAS = new AliasName("mailboxWriteAlias");
AliasName DEFAULT_MAILBOX_READ_ALIAS = new AliasName("mailboxReadAlias"); AliasName DEFAULT_MAILBOX_READ_ALIAS = new AliasName("mailboxReadAlias");
IndexName DEFAULT_MAILBOX_INDEX = new IndexName("mailbox_v1"); IndexName DEFAULT_MAILBOX_INDEX = new IndexName("mailbox_v1");
Expand Down

This file was deleted.

Expand Up @@ -30,14 +30,13 @@
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Named; import javax.inject.Named;


import org.apache.james.backends.es.ElasticSearchConstants; import org.apache.james.backends.es.ElasticSearchIndexer;
import org.apache.james.backends.es.Indexer;
import org.apache.james.backends.es.IndexerSupplier;
import org.apache.james.backends.es.UpdatedRepresentation; import org.apache.james.backends.es.UpdatedRepresentation;
import org.apache.james.mailbox.MailboxManager.MessageCapabilities; import org.apache.james.mailbox.MailboxManager.MessageCapabilities;
import org.apache.james.mailbox.MailboxManager.SearchCapabilities; import org.apache.james.mailbox.MailboxManager.SearchCapabilities;
import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.elasticsearch.MailboxElasticSearchConstants;
import org.apache.james.mailbox.elasticsearch.json.JsonMessageConstants; import org.apache.james.mailbox.elasticsearch.json.JsonMessageConstants;
import org.apache.james.mailbox.elasticsearch.json.MessageToElasticSearchJson; import org.apache.james.mailbox.elasticsearch.json.MessageToElasticSearchJson;
import org.apache.james.mailbox.elasticsearch.search.ElasticSearchSearcher; import org.apache.james.mailbox.elasticsearch.search.ElasticSearchSearcher;
Expand All @@ -63,16 +62,16 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchListeningMessageSearchIndex.class); private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchListeningMessageSearchIndex.class);
private static final String ID_SEPARATOR = ":"; private static final String ID_SEPARATOR = ":";


private final Indexer indexer; private final ElasticSearchIndexer elasticSearchIndexer;
private final ElasticSearchSearcher searcher; private final ElasticSearchSearcher searcher;
private final MessageToElasticSearchJson messageToElasticSearchJson; private final MessageToElasticSearchJson messageToElasticSearchJson;


@Inject @Inject
public ElasticSearchListeningMessageSearchIndex(MessageMapperFactory factory, public ElasticSearchListeningMessageSearchIndex(MessageMapperFactory factory,
@Named(ElasticSearchConstants.MAILBOX_INDEX) IndexerSupplier indexer, @Named(MailboxElasticSearchConstants.InjectionNames.MAILBOX) ElasticSearchIndexer indexer,
ElasticSearchSearcher searcher, MessageToElasticSearchJson messageToElasticSearchJson) { ElasticSearchSearcher searcher, MessageToElasticSearchJson messageToElasticSearchJson) {
super(factory); super(factory);
this.indexer = indexer.getIndexer(); this.elasticSearchIndexer = indexer;
this.messageToElasticSearchJson = messageToElasticSearchJson; this.messageToElasticSearchJson = messageToElasticSearchJson;
this.searcher = searcher; this.searcher = searcher;
} }
Expand Down Expand Up @@ -129,7 +128,7 @@ public void add(MailboxSession session, Mailbox mailbox, MailboxMessage message)
mailbox.getMailboxId(), mailbox.getMailboxId(),
session.getUser().getUserName(), session.getUser().getUserName(),
message.getUid()); message.getUid());
indexer.indexMessage(indexIdFor(mailbox, message.getUid()), messageToElasticSearchJson.convertToJson(message, ImmutableList.of(session.getUser()))); elasticSearchIndexer.indexMessage(indexIdFor(mailbox, message.getUid()), messageToElasticSearchJson.convertToJson(message, ImmutableList.of(session.getUser())));
} catch (Exception e) { } catch (Exception e) {
try { try {
LOGGER.warn("Indexing mailbox {}-{} of user {} on message {} without attachments ", LOGGER.warn("Indexing mailbox {}-{} of user {} on message {} without attachments ",
Expand All @@ -138,7 +137,7 @@ public void add(MailboxSession session, Mailbox mailbox, MailboxMessage message)
session.getUser().getUserName(), session.getUser().getUserName(),
message.getUid(), message.getUid(),
e); e);
indexer.indexMessage(indexIdFor(mailbox, message.getUid()), messageToElasticSearchJson.convertToJsonWithoutAttachment(message, ImmutableList.of(session.getUser()))); elasticSearchIndexer.indexMessage(indexIdFor(mailbox, message.getUid()), messageToElasticSearchJson.convertToJsonWithoutAttachment(message, ImmutableList.of(session.getUser())));
} catch (JsonProcessingException e1) { } catch (JsonProcessingException e1) {
LOGGER.error("Error when indexing mailbox {}-{} of user {} on message {} without its attachment", LOGGER.error("Error when indexing mailbox {}-{} of user {} on message {} without its attachment",
mailbox.getName(), mailbox.getName(),
Expand All @@ -153,7 +152,7 @@ public void add(MailboxSession session, Mailbox mailbox, MailboxMessage message)
@Override @Override
public void delete(MailboxSession session, Mailbox mailbox, List<MessageUid> expungedUids) throws MailboxException { public void delete(MailboxSession session, Mailbox mailbox, List<MessageUid> expungedUids) throws MailboxException {
try { try {
indexer.deleteMessages(expungedUids.stream() elasticSearchIndexer.deleteMessages(expungedUids.stream()
.map(uid -> indexIdFor(mailbox, uid)) .map(uid -> indexIdFor(mailbox, uid))
.collect(Collectors.toList())); .collect(Collectors.toList()));
} catch (Exception e) { } catch (Exception e) {
Expand All @@ -166,7 +165,7 @@ public void delete(MailboxSession session, Mailbox mailbox, List<MessageUid> exp
@Override @Override
public void deleteAll(MailboxSession session, Mailbox mailbox) throws MailboxException { public void deleteAll(MailboxSession session, Mailbox mailbox) throws MailboxException {
try { try {
indexer.deleteAllMatchingQuery( elasticSearchIndexer.deleteAllMatchingQuery(
termQuery( termQuery(
JsonMessageConstants.MAILBOX_ID, JsonMessageConstants.MAILBOX_ID,
mailbox.getMailboxId().serialize())); mailbox.getMailboxId().serialize()));
Expand All @@ -178,7 +177,7 @@ public void deleteAll(MailboxSession session, Mailbox mailbox) throws MailboxExc
@Override @Override
public void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) throws MailboxException { public void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) throws MailboxException {
try { try {
indexer.updateMessages(updatedFlagsList.stream() elasticSearchIndexer.updateMessages(updatedFlagsList.stream()
.map(updatedFlags -> createUpdatedDocumentPartFromUpdatedFlags(mailbox, updatedFlags)) .map(updatedFlags -> createUpdatedDocumentPartFromUpdatedFlags(mailbox, updatedFlags))
.collect(Collectors.toList())); .collect(Collectors.toList()));
} catch (Exception e) { } catch (Exception e) {
Expand Down
Expand Up @@ -23,11 +23,7 @@
import java.util.Optional; import java.util.Optional;
import java.util.stream.Stream; import java.util.stream.Stream;


import javax.inject.Inject;
import javax.inject.Named;

import org.apache.james.backends.es.AliasName; import org.apache.james.backends.es.AliasName;
import org.apache.james.backends.es.ElasticSearchConstants;
import org.apache.james.backends.es.TypeName; import org.apache.james.backends.es.TypeName;
import org.apache.james.backends.es.search.ScrollIterable; import org.apache.james.backends.es.search.ScrollIterable;
import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.MessageUid;
Expand All @@ -51,9 +47,10 @@


public class ElasticSearchSearcher { public class ElasticSearchSearcher {


public static final int DEFAULT_SEARCH_SIZE = 100;

private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchSearcher.class); private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchSearcher.class);
private static final TimeValue TIMEOUT = new TimeValue(60000); private static final TimeValue TIMEOUT = new TimeValue(60000);
private static final int DEFAULT_SIZE = 100;


private final Client client; private final Client client;
private final QueryConverter queryConverter; private final QueryConverter queryConverter;
Expand All @@ -63,13 +60,6 @@ public class ElasticSearchSearcher {
private final AliasName aliasName; private final AliasName aliasName;
private final TypeName typeName; private final TypeName typeName;


@Inject
public ElasticSearchSearcher(Client client, QueryConverter queryConverter,
MailboxId.Factory mailboxIdFactory, MessageId.Factory messageIdFactory,
@Named(ElasticSearchConstants.READ_ALIAS) AliasName aliasName, TypeName typeName) {
this(client, queryConverter, DEFAULT_SIZE, mailboxIdFactory, messageIdFactory, aliasName, typeName);
}

public ElasticSearchSearcher(Client client, QueryConverter queryConverter, int size, public ElasticSearchSearcher(Client client, QueryConverter queryConverter, int size,
MailboxId.Factory mailboxIdFactory, MessageId.Factory messageIdFactory, MailboxId.Factory mailboxIdFactory, MessageId.Factory messageIdFactory,
AliasName aliasName, TypeName typeName) { AliasName aliasName, TypeName typeName) {
Expand Down
Expand Up @@ -25,7 +25,7 @@
import java.time.ZoneId; import java.time.ZoneId;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;


import org.apache.james.backends.es.DeleteByQueryPerformer; import org.apache.james.backends.es.ElasticSearchIndexer;
import org.apache.james.backends.es.EmbeddedElasticSearch; import org.apache.james.backends.es.EmbeddedElasticSearch;
import org.apache.james.backends.es.IndexCreationFactory; import org.apache.james.backends.es.IndexCreationFactory;
import org.apache.james.backends.es.NodeMappingFactory; import org.apache.james.backends.es.NodeMappingFactory;
Expand Down Expand Up @@ -109,14 +109,11 @@ protected void initializeMailboxManager() throws Exception {


ElasticSearchListeningMessageSearchIndex elasticSearchListeningMessageSearchIndex = new ElasticSearchListeningMessageSearchIndex( ElasticSearchListeningMessageSearchIndex elasticSearchListeningMessageSearchIndex = new ElasticSearchListeningMessageSearchIndex(
storeMailboxManager.getMapperFactory(), storeMailboxManager.getMapperFactory(),
new MailboxIndexerSupplier(client, new ElasticSearchIndexer(client,
new DeleteByQueryPerformer(client, Executors.newSingleThreadExecutor(),
Executors.newSingleThreadExecutor(),
BATCH_SIZE,
MailboxElasticSearchConstants.DEFAULT_MAILBOX_WRITE_ALIAS,
MailboxElasticSearchConstants.MESSAGE_TYPE),
MailboxElasticSearchConstants.DEFAULT_MAILBOX_WRITE_ALIAS, MailboxElasticSearchConstants.DEFAULT_MAILBOX_WRITE_ALIAS,
MailboxElasticSearchConstants.MESSAGE_TYPE), MailboxElasticSearchConstants.MESSAGE_TYPE,
BATCH_SIZE),
new ElasticSearchSearcher(client, new QueryConverter(new CriterionConverter()), SEARCH_SIZE, new ElasticSearchSearcher(client, new QueryConverter(new CriterionConverter()), SEARCH_SIZE,
new InMemoryId.Factory(), storeMailboxManager.getMessageIdFactory(), new InMemoryId.Factory(), storeMailboxManager.getMessageIdFactory(),
MailboxElasticSearchConstants.DEFAULT_MAILBOX_READ_ALIAS, MailboxElasticSearchConstants.DEFAULT_MAILBOX_READ_ALIAS,
Expand Down

0 comments on commit 3ae482a

Please sign in to comment.