Skip to content

Commit

Permalink
MAILBOX-351 Remove mailboxPath information from reIndexing tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa committed Nov 29, 2018
1 parent c88e353 commit 3957215
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 72 deletions.
Expand Up @@ -30,7 +30,6 @@
import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.store.MailboxSessionMapperFactory; import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.apache.james.task.Task; import org.apache.james.task.Task;


/** /**
Expand All @@ -57,16 +56,18 @@ public ReIndexerImpl(ReIndexerPerformer reIndexerPerformer, MailboxManager mailb


@Override @Override
public Task reIndex(MailboxPath path) throws MailboxException { public Task reIndex(MailboxPath path) throws MailboxException {
MailboxSession mailboxSession = mailboxManager.createSystemSession("ReIndexingImap"); MailboxSession mailboxSession = mailboxManager.createSystemSession(path.getUser());
Mailbox mailbox = mapperFactory.getMailboxMapper(mailboxSession).findMailboxByPath(path);
return new SingleMailboxReindexingTask(reIndexerPerformer, mailbox); MailboxId mailboxId = mailboxManager.getMailbox(path, mailboxSession).getId();

return new SingleMailboxReindexingTask(reIndexerPerformer, mailboxId);
} }


@Override @Override
public Task reIndex(MailboxId mailboxId) throws MailboxException { public Task reIndex(MailboxId mailboxId) throws MailboxException {
MailboxSession mailboxSession = mailboxManager.createSystemSession("ReIndexingImap"); validateIdExists(mailboxId);
Mailbox mailbox = mapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
return new SingleMailboxReindexingTask(reIndexerPerformer, mailbox); return new SingleMailboxReindexingTask(reIndexerPerformer, mailboxId);
} }


@Override @Override
Expand All @@ -81,15 +82,22 @@ public Task reIndex(User user) {


@Override @Override
public Task reIndex(MailboxPath path, MessageUid uid) throws MailboxException { public Task reIndex(MailboxPath path, MessageUid uid) throws MailboxException {
MailboxSession mailboxSession = mailboxManager.createSystemSession("ReIndexingImap"); MailboxSession mailboxSession = mailboxManager.createSystemSession(path.getUser());
Mailbox mailbox = mapperFactory.getMailboxMapper(mailboxSession).findMailboxByPath(path);
return new SingleMessageReindexingTask(reIndexerPerformer, mailbox, uid); MailboxId mailboxId = mailboxManager.getMailbox(path, mailboxSession).getId();

return new SingleMessageReindexingTask(reIndexerPerformer, mailboxId, uid);
} }


@Override @Override
public Task reIndex(MailboxId mailboxId, MessageUid uid) throws MailboxException { public Task reIndex(MailboxId mailboxId, MessageUid uid) throws MailboxException {
validateIdExists(mailboxId);

return new SingleMessageReindexingTask(reIndexerPerformer, mailboxId, uid);
}

private void validateIdExists(MailboxId mailboxId) throws MailboxException {
MailboxSession mailboxSession = mailboxManager.createSystemSession("ReIndexingImap"); MailboxSession mailboxSession = mailboxManager.createSystemSession("ReIndexingImap");
Mailbox mailbox = mapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId); mapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
return new SingleMessageReindexingTask(reIndexerPerformer, mailbox, uid);
} }
} }
Expand Up @@ -65,8 +65,23 @@ public ReIndexerPerformer(MailboxManager mailboxManager,
this.mailboxSessionMapperFactory = mailboxSessionMapperFactory; this.mailboxSessionMapperFactory = mailboxSessionMapperFactory;
} }


Task.Result reIndex(Mailbox mailbox, ReprocessingContext reprocessingContext) throws MailboxException { Task.Result reIndex(MailboxId mailboxId, ReprocessingContext reprocessingContext) throws MailboxException {
return reIndexSingleMailbox(mailbox.getMailboxId(), reprocessingContext); LOGGER.info("Intend to reindex mailbox with mailboxId {}", mailboxId.serialize());
MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXING);
Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
messageSearchIndex.deleteAll(mailboxSession, mailbox);
try {
return Iterators.toStream(
mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
.findInMailbox(mailbox, MessageRange.all(), MessageMapper.FetchType.Metadata, NO_LIMIT))
.map(MailboxMessage::getUid)
.map(uid -> handleMessageReIndexing(mailboxSession, mailbox, uid))
.peek(reprocessingContext::updateAccordingToReprocessingResult)
.reduce(Task::combine)
.orElse(Task.Result.COMPLETED);
} finally {
LOGGER.info("Finish to reindex mailbox with mailboxId {}", mailboxId.serialize());
}
} }


Task.Result reIndex(ReprocessingContext reprocessingContext) throws MailboxException { Task.Result reIndex(ReprocessingContext reprocessingContext) throws MailboxException {
Expand Down Expand Up @@ -98,17 +113,18 @@ Task.Result reIndex(User user, ReprocessingContext reprocessingContext) throws M
} }
} }


Task.Result handleMessageReIndexing(Mailbox mailbox, MessageUid uid) throws MailboxException { Task.Result handleMessageReIndexing(MailboxId mailboxId, MessageUid uid) throws MailboxException {
MailboxSession mailboxSession = mailboxManager.createSystemSession(mailbox.getUser()); MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXING);


Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
return handleMessageReIndexing(mailboxSession, mailbox, uid); return handleMessageReIndexing(mailboxSession, mailbox, uid);
} }


private Task.Result reIndex(Stream<MailboxId> mailboxIds, ReprocessingContext reprocessingContext) { private Task.Result reIndex(Stream<MailboxId> mailboxIds, ReprocessingContext reprocessingContext) {
return mailboxIds return mailboxIds
.map(mailboxId -> { .map(mailboxId -> {
try { try {
return reIndexSingleMailbox(mailboxId, reprocessingContext); return reIndex(mailboxId, reprocessingContext);
} catch (Throwable e) { } catch (Throwable e) {
LOGGER.error("Error while proceeding to full reindexing on mailbox with mailboxId {}", mailboxId.serialize(), e); LOGGER.error("Error while proceeding to full reindexing on mailbox with mailboxId {}", mailboxId.serialize(), e);
return Task.Result.PARTIAL; return Task.Result.PARTIAL;
Expand All @@ -118,25 +134,6 @@ private Task.Result reIndex(Stream<MailboxId> mailboxIds, ReprocessingContext re
.orElse(Task.Result.COMPLETED); .orElse(Task.Result.COMPLETED);
} }


private Task.Result reIndexSingleMailbox(MailboxId mailboxId, ReprocessingContext reprocessingContext) throws MailboxException {
LOGGER.info("Intend to reindex mailbox with mailboxId {}", mailboxId.serialize());
MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXING);
Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
messageSearchIndex.deleteAll(mailboxSession, mailbox);
try {
return Iterators.toStream(
mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
.findInMailbox(mailbox, MessageRange.all(), MessageMapper.FetchType.Metadata, NO_LIMIT))
.map(MailboxMessage::getUid)
.map(uid -> handleMessageReIndexing(mailboxSession, mailbox, uid))
.peek(reprocessingContext::updateAccordingToReprocessingResult)
.reduce(Task::combine)
.orElse(Task.Result.COMPLETED);
} finally {
LOGGER.info("Finish to reindex mailbox with mailboxId {}", mailboxId.serialize());
}
}

private Task.Result handleMessageReIndexing(MailboxSession mailboxSession, Mailbox mailbox, MessageUid uid) { private Task.Result handleMessageReIndexing(MailboxSession mailboxSession, Mailbox mailbox, MessageUid uid) {
try { try {
Optional.of(uid) Optional.of(uid)
Expand Down
Expand Up @@ -24,7 +24,7 @@
import javax.inject.Inject; import javax.inject.Inject;


import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.store.mail.model.Mailbox; import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.task.Task; import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails; import org.apache.james.task.TaskExecutionDetails;


Expand All @@ -33,20 +33,17 @@ public class SingleMailboxReindexingTask implements Task {
public static final String MAILBOX_RE_INDEXING = "mailboxReIndexing"; public static final String MAILBOX_RE_INDEXING = "mailboxReIndexing";


public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation { public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
private final Mailbox mailbox; private final MailboxId mailboxId;
private final ReprocessingContext reprocessingContext; private final ReprocessingContext reprocessingContext;


AdditionalInformation(Mailbox mailbox, ReprocessingContext reprocessingContext) { AdditionalInformation(MailboxId mailboxId, ReprocessingContext reprocessingContext) {
this.mailbox = mailbox; this.mailboxId = mailboxId;
this.reprocessingContext = reprocessingContext; this.reprocessingContext = reprocessingContext;
} }


public String getMailboxPath() {
return mailbox.generateAssociatedPath().asString();
}


public String getMailboxId() { public String getMailboxId() {
return mailbox.getMailboxId().serialize(); return mailboxId.serialize();
} }


public int getSuccessfullyReprocessMailCount() { public int getSuccessfullyReprocessMailCount() {
Expand All @@ -59,22 +56,22 @@ public int getFailedReprocessedMailCount() {
} }


private final ReIndexerPerformer reIndexerPerformer; private final ReIndexerPerformer reIndexerPerformer;
private final Mailbox mailbox; private final MailboxId mailboxId;
private final AdditionalInformation additionalInformation; private final AdditionalInformation additionalInformation;
private final ReprocessingContext reprocessingContext; private final ReprocessingContext reprocessingContext;


@Inject @Inject
public SingleMailboxReindexingTask(ReIndexerPerformer reIndexerPerformer, Mailbox mailbox) { public SingleMailboxReindexingTask(ReIndexerPerformer reIndexerPerformer, MailboxId mailboxId) {
this.reIndexerPerformer = reIndexerPerformer; this.reIndexerPerformer = reIndexerPerformer;
this.mailbox = mailbox; this.mailboxId = mailboxId;
this.reprocessingContext = new ReprocessingContext(); this.reprocessingContext = new ReprocessingContext();
this.additionalInformation = new AdditionalInformation(mailbox, reprocessingContext); this.additionalInformation = new AdditionalInformation(mailboxId, reprocessingContext);
} }


@Override @Override
public Result run() { public Result run() {
try { try {
return reIndexerPerformer.reIndex(mailbox, reprocessingContext); return reIndexerPerformer.reIndex(mailboxId, reprocessingContext);
} catch (MailboxException e) { } catch (MailboxException e) {
return Result.PARTIAL; return Result.PARTIAL;
} }
Expand Down
Expand Up @@ -25,7 +25,7 @@


import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.store.mail.model.Mailbox; import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.task.Task; import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails; import org.apache.james.task.TaskExecutionDetails;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand All @@ -37,20 +37,16 @@ public class SingleMessageReindexingTask implements Task {
public static final String MESSAGE_RE_INDEXING = "messageReIndexing"; public static final String MESSAGE_RE_INDEXING = "messageReIndexing";


public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation { public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
private final Mailbox mailbox; private final MailboxId mailboxId;
private final MessageUid uid; private final MessageUid uid;


AdditionalInformation(Mailbox mailbox, MessageUid uid) { AdditionalInformation(MailboxId mailboxId, MessageUid uid) {
this.mailbox = mailbox; this.mailboxId = mailboxId;
this.uid = uid; this.uid = uid;
} }


public String getMailboxPath() {
return mailbox.generateAssociatedPath().asString();
}

public String getMailboxId() { public String getMailboxId() {
return mailbox.getMailboxId().serialize(); return mailboxId.serialize();
} }


public long getUid() { public long getUid() {
Expand All @@ -59,24 +55,24 @@ public long getUid() {
} }


private final ReIndexerPerformer reIndexerPerformer; private final ReIndexerPerformer reIndexerPerformer;
private final Mailbox mailbox; private final MailboxId mailboxId;
private final MessageUid uid; private final MessageUid uid;
private final AdditionalInformation additionalInformation; private final AdditionalInformation additionalInformation;


@Inject @Inject
public SingleMessageReindexingTask(ReIndexerPerformer reIndexerPerformer, Mailbox mailbox, MessageUid uid) { public SingleMessageReindexingTask(ReIndexerPerformer reIndexerPerformer, MailboxId mailboxId, MessageUid uid) {
this.reIndexerPerformer = reIndexerPerformer; this.reIndexerPerformer = reIndexerPerformer;
this.mailbox = mailbox; this.mailboxId = mailboxId;
this.uid = uid; this.uid = uid;
this.additionalInformation = new AdditionalInformation(mailbox, uid); this.additionalInformation = new AdditionalInformation(mailboxId, uid);
} }


@Override @Override
public Result run() { public Result run() {
try { try {
return reIndexerPerformer.handleMessageReIndexing(mailbox, uid); return reIndexerPerformer.handleMessageReIndexing(mailboxId, uid);
} catch (MailboxException e) { } catch (MailboxException e) {
LOGGER.warn("Error encounteres while reindexing {} {} : {}", mailbox.getMailboxId(), mailbox.generateAssociatedPath(), uid, e); LOGGER.warn("Error encounteres while reindexing {} : {}",mailboxId, uid, e);
return Result.PARTIAL; return Result.PARTIAL;
} }
} }
Expand Down
Expand Up @@ -65,8 +65,7 @@ void setUp(CassandraCluster cassandra) {
MailboxSessionMapperFactory mailboxSessionMapperFactory = mailboxManager.getMapperFactory(); MailboxSessionMapperFactory mailboxSessionMapperFactory = mailboxManager.getMapperFactory();
messageSearchIndex = mock(ListeningMessageSearchIndex.class); messageSearchIndex = mock(ListeningMessageSearchIndex.class);
reIndexer = new ReIndexerImpl(new ReIndexerPerformer(mailboxManager, messageSearchIndex, mailboxSessionMapperFactory), reIndexer = new ReIndexerImpl(new ReIndexerPerformer(mailboxManager, messageSearchIndex, mailboxSessionMapperFactory),
mailboxManager, mailboxManager, mailboxSessionMapperFactory);
mailboxSessionMapperFactory);
} }


@Test @Test
Expand Down
Expand Up @@ -63,8 +63,7 @@ void setUp() throws MailboxException {
MailboxSessionMapperFactory mailboxSessionMapperFactory = mailboxManager.getMapperFactory(); MailboxSessionMapperFactory mailboxSessionMapperFactory = mailboxManager.getMapperFactory();
messageSearchIndex = mock(ListeningMessageSearchIndex.class); messageSearchIndex = mock(ListeningMessageSearchIndex.class);
reIndexer = new ReIndexerImpl(new ReIndexerPerformer(mailboxManager, messageSearchIndex, mailboxSessionMapperFactory), reIndexer = new ReIndexerImpl(new ReIndexerPerformer(mailboxManager, messageSearchIndex, mailboxSessionMapperFactory),
mailboxManager, mailboxManager, mailboxSessionMapperFactory);
mailboxSessionMapperFactory);
} }


@Test @Test
Expand Down
Expand Up @@ -443,7 +443,6 @@ void mailboxReprocessingShouldNotFailWhenNoMail() throws Exception {
.body("status", is("completed")) .body("status", is("completed"))
.body("taskId", is(notNullValue())) .body("taskId", is(notNullValue()))
.body("type", is(SingleMailboxReindexingTask.MAILBOX_RE_INDEXING)) .body("type", is(SingleMailboxReindexingTask.MAILBOX_RE_INDEXING))
.body("additionalInformation.mailboxPath", is("#private:benwa@apache.org:INBOX"))
.body("additionalInformation.mailboxId", is(mailboxId.serialize())) .body("additionalInformation.mailboxId", is(mailboxId.serialize()))
.body("additionalInformation.successfullyReprocessMailCount", is(0)) .body("additionalInformation.successfullyReprocessMailCount", is(0))
.body("additionalInformation.failedReprocessedMailCount", is(0)) .body("additionalInformation.failedReprocessedMailCount", is(0))
Expand Down Expand Up @@ -474,7 +473,6 @@ void mailboxReprocessingShouldReturnTaskDetailsWhenMail() throws Exception {
.body("status", is("completed")) .body("status", is("completed"))
.body("taskId", is(notNullValue())) .body("taskId", is(notNullValue()))
.body("type", is(SingleMailboxReindexingTask.MAILBOX_RE_INDEXING)) .body("type", is(SingleMailboxReindexingTask.MAILBOX_RE_INDEXING))
.body("additionalInformation.mailboxPath", is("#private:benwa@apache.org:INBOX"))
.body("additionalInformation.mailboxId", is(mailboxId.serialize())) .body("additionalInformation.mailboxId", is(mailboxId.serialize()))
.body("additionalInformation.successfullyReprocessMailCount", is(1)) .body("additionalInformation.successfullyReprocessMailCount", is(1))
.body("additionalInformation.failedReprocessedMailCount", is(0)) .body("additionalInformation.failedReprocessedMailCount", is(0))
Expand Down Expand Up @@ -610,7 +608,6 @@ void messageReprocessingShouldNotFailWhenUidNotFound() throws Exception {
.body("status", is("completed")) .body("status", is("completed"))
.body("taskId", is(notNullValue())) .body("taskId", is(notNullValue()))
.body("type", is(SingleMessageReindexingTask.MESSAGE_RE_INDEXING)) .body("type", is(SingleMessageReindexingTask.MESSAGE_RE_INDEXING))
.body("additionalInformation.mailboxPath", is("#private:benwa@apache.org:INBOX"))
.body("additionalInformation.mailboxId", is(mailboxId.serialize())) .body("additionalInformation.mailboxId", is(mailboxId.serialize()))
.body("additionalInformation.uid", is(1)) .body("additionalInformation.uid", is(1))
.body("startedDate", is(notNullValue())) .body("startedDate", is(notNullValue()))
Expand Down Expand Up @@ -641,7 +638,6 @@ void messageReprocessingShouldReturnTaskDetailsWhenMail() throws Exception {
.body("status", is("completed")) .body("status", is("completed"))
.body("taskId", is(notNullValue())) .body("taskId", is(notNullValue()))
.body("type", is(SingleMessageReindexingTask.MESSAGE_RE_INDEXING)) .body("type", is(SingleMessageReindexingTask.MESSAGE_RE_INDEXING))
.body("additionalInformation.mailboxPath", is("#private:benwa@apache.org:INBOX"))
.body("additionalInformation.mailboxId", is(mailboxId.serialize())) .body("additionalInformation.mailboxId", is(mailboxId.serialize()))
.body("additionalInformation.uid", is((int) composedMessageId.getUid().asLong())) .body("additionalInformation.uid", is((int) composedMessageId.getUid().asLong()))
.body("startedDate", is(notNullValue())) .body("startedDate", is(notNullValue()))
Expand Down

0 comments on commit 3957215

Please sign in to comment.