Skip to content

Commit

Permalink
MAILBOX-351 ReIndexerPerformer should internally rely on MailboxId
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa committed Nov 29, 2018
1 parent 13989f9 commit 655d35b
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 186 deletions.
Expand Up @@ -19,9 +19,8 @@

package org.apache.mailbox.tools.indexer;

import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Stream;

import javax.inject.Inject;

Expand All @@ -30,6 +29,7 @@
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxMetaData;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.model.MessageRange;
Expand All @@ -40,20 +40,18 @@
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
import org.apache.james.task.Task;
import org.apache.james.util.OptionalUtils;
import org.apache.james.util.streams.Iterators;
import org.apache.mailbox.tools.indexer.registrations.GlobalRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableList;

public class ReIndexerPerformer {
private static final Logger LOGGER = LoggerFactory.getLogger(ReIndexerPerformer.class);

private static final int NO_LIMIT = 0;
private static final int SINGLE_MESSAGE = 1;
private static final String RE_INDEXING = "re-indexing";

private final MailboxManager mailboxManager;
private final ListeningMessageSearchIndex messageSearchIndex;
Expand All @@ -70,16 +68,19 @@ public ReIndexerPerformer(MailboxManager mailboxManager,

Task.Result reIndex(MailboxPath path, ReprocessingContext reprocessingContext) throws MailboxException {
MailboxSession mailboxSession = mailboxManager.createSystemSession(path.getUser());
return reIndex(path, mailboxSession, reprocessingContext);
Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxByPath(path);
return reIndexSingleMailbox(mailbox.getMailboxId(), reprocessingContext);
}

Task.Result reIndex(ReprocessingContext reprocessingContext) throws MailboxException {
MailboxSession mailboxSession = mailboxManager.createSystemSession("re-indexing");
MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXING);
LOGGER.info("Starting a full reindex");
List<MailboxPath> mailboxPaths = mailboxManager.list(mailboxSession);
Stream<MailboxId> mailboxIds = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list()
.stream()
.map(Mailbox::getMailboxId);

try {
return reIndex(mailboxPaths, mailboxSession, reprocessingContext);
return reIndex(mailboxIds, reprocessingContext);
} finally {
LOGGER.info("Full reindex finished");
}
Expand All @@ -88,14 +89,13 @@ Task.Result reIndex(ReprocessingContext reprocessingContext) throws MailboxExcep
Task.Result reIndex(User user, ReprocessingContext reprocessingContext) throws MailboxException {
MailboxSession mailboxSession = mailboxManager.createSystemSession(user.asString());
LOGGER.info("Starting a reindex for user {}", user.asString());
List<MailboxPath> mailboxPaths = mailboxManager.search(MailboxQuery.privateMailboxesBuilder(mailboxSession)
.build(), mailboxSession)

Stream<MailboxId> mailboxIds = mailboxManager.search(MailboxQuery.privateMailboxesBuilder(mailboxSession).build(), mailboxSession)
.stream()
.map(MailboxMetaData::getPath)
.collect(ImmutableList.toImmutableList());
.map(MailboxMetaData::getId);

try {
return reIndex(mailboxPaths, mailboxSession, reprocessingContext);
return reIndex(mailboxIds, reprocessingContext);
} finally {
LOGGER.info("User {} reindex finished", user.asString());
}
Expand All @@ -109,31 +109,24 @@ Task.Result handleMessageReIndexing(MailboxPath path, MessageUid uid) throws Mai
return handleMessageReIndexing(mailboxSession, mailbox, uid);
}

private Task.Result reIndex(List<MailboxPath> mailboxPaths, MailboxSession mailboxSession, ReprocessingContext reprocessingContext) throws MailboxException {
return wrapInGlobalRegistration(mailboxSession,
globalRegistration -> handleMultiMailboxesReindexingIterations(mailboxPaths, globalRegistration, reprocessingContext));
}

private Task.Result handleMultiMailboxesReindexingIterations(List<MailboxPath> mailboxPaths, GlobalRegistration globalRegistration,
ReprocessingContext reprocessingContext) {
return mailboxPaths.stream()
.map(globalRegistration::getPathToIndex)
.flatMap(OptionalUtils::toStream)
.map(path -> {
private Task.Result reIndex(Stream<MailboxId> mailboxIds, ReprocessingContext reprocessingContext) {
return mailboxIds
.map(mailboxId -> {
try {
return reIndex(path, reprocessingContext);
return reIndexSingleMailbox(mailboxId, reprocessingContext);
} catch (Throwable e) {
LOGGER.error("Error while proceeding to full reindexing on {}", path.asString(), e);
LOGGER.error("Error while proceeding to full reindexing on mailbox with mailboxId {}", mailboxId.serialize(), e);
return Task.Result.PARTIAL;
}
})
.reduce(Task::combine)
.orElse(Task.Result.COMPLETED);
}

private Task.Result reIndex(MailboxPath path, MailboxSession mailboxSession, ReprocessingContext reprocessingContext) throws MailboxException {
LOGGER.info("Intend to reindex {}", path.asString());
Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxByPath(path);
private Task.Result reIndexSingleMailbox(MailboxId mailboxId, ReprocessingContext reprocessingContext) throws MailboxException {
LOGGER.info("Intend to reindex mailbox with mailboxId {}", mailboxId.serialize());
MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXING);
Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
messageSearchIndex.deleteAll(mailboxSession, mailbox);
try {
return Iterators.toStream(
Expand All @@ -145,7 +138,7 @@ private Task.Result reIndex(MailboxPath path, MailboxSession mailboxSession, Rep
.reduce(Task::combine)
.orElse(Task.Result.COMPLETED);
} finally {
LOGGER.info("Finish to reindex {}", path.asString());
LOGGER.info("Finish to reindex mailbox with mailboxId {}", mailboxId.serialize());
}
}

Expand All @@ -166,14 +159,4 @@ private Optional<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession,
.findInMailbox(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full, SINGLE_MESSAGE))
.findFirst();
}

private <T> T wrapInGlobalRegistration(MailboxSession session, Function<GlobalRegistration, T> function) throws MailboxException {
GlobalRegistration globalRegistration = new GlobalRegistration();
mailboxManager.addGlobalListener(globalRegistration, session);
try {
return function.apply(globalRegistration);
} finally {
mailboxManager.removeGlobalListener(globalRegistration, session);
}
}
}

This file was deleted.

This file was deleted.

0 comments on commit 655d35b

Please sign in to comment.