Skip to content

Commit

Permalink
MAILBOX-351 Write a PreviousFailuresReIndexationTask
Browse files Browse the repository at this point in the history
Taking the list of previous indexing failures, this task will attempt to
reIndex all of the aforementioned messages.
  • Loading branch information
chibenwa committed May 30, 2019
1 parent 39db516 commit 5f6ab60
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 4 deletions.
@@ -0,0 +1,76 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.mailbox.tools.indexer;

import java.util.Optional;

import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;

public class PreviousFailuresReIndexationTask implements Task {
public static final String PREVIOUS_FAILURES_INDEXING = "ReIndexPreviousFailures";

public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
private final ReprocessingContext reprocessingContext;

AdditionalInformation(ReprocessingContext reprocessingContext) {
this.reprocessingContext = reprocessingContext;
}

public int getSuccessfullyReprocessMailCount() {
return reprocessingContext.successfullyReprocessedMailCount();
}

public int getFailedReprocessedMailCount() {
return reprocessingContext.failedReprocessingMailCount();
}

public ReIndexingExecutionFailures failures() {
return reprocessingContext.failures();
}
}

private final ReIndexerPerformer reIndexerPerformer;
private final AdditionalInformation additionalInformation;
private final ReprocessingContext reprocessingContext;
private final ReIndexingExecutionFailures previousFailures;

public PreviousFailuresReIndexationTask(ReIndexerPerformer reIndexerPerformer, ReIndexingExecutionFailures previousFailures) {
this.reIndexerPerformer = reIndexerPerformer;
this.previousFailures = previousFailures;
this.reprocessingContext = new ReprocessingContext();
this.additionalInformation = new AdditionalInformation(reprocessingContext);
}

@Override
public Result run() {
return reIndexerPerformer.reIndex(reprocessingContext, previousFailures);
}

@Override
public String type() {
return PREVIOUS_FAILURES_INDEXING;
}

@Override
public Optional<TaskExecutionDetails.AdditionalInformation> details() {
return Optional.of(additionalInformation);
}
}
Expand Up @@ -83,6 +83,26 @@ Task.Result reIndex(MailboxId mailboxId, ReprocessingContext reprocessingContext
}
}

Task.Result reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures previousReIndexingFailures) {
return previousReIndexingFailures.failures()
.stream()
.map(previousFailure -> reIndex(reprocessingContext, previousFailure))
.reduce(Task::combine)
.orElse(Task.Result.COMPLETED);
}

private Task.Result reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures.ReIndexingFailure previousReIndexingFailure) {
MailboxId mailboxId = previousReIndexingFailure.getMailboxId();
MessageUid uid = previousReIndexingFailure.getUid();
try {
return handleMessageReIndexing(mailboxId, uid, reprocessingContext);
} catch (MailboxException e) {
LOGGER.warn("ReIndexing failed for {} {}", mailboxId, uid, e);
reprocessingContext.recordFailureDetailsForMessage(mailboxId, uid);
return Task.Result.PARTIAL;
}
}

Task.Result reIndex(ReprocessingContext reprocessingContext) throws MailboxException {
MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXING);
LOGGER.info("Starting a full reindex");
Expand Down
Expand Up @@ -25,8 +25,10 @@
import org.apache.james.mailbox.model.MailboxId;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;

public class ReIndexingExecutionFailures {
Expand All @@ -40,13 +42,22 @@ public ReIndexingFailure(MailboxId mailboxId, MessageUid uid) {
}

@JsonIgnore
public String getMailboxId() {
public String getSerializedMailboxId() {
return mailboxId.serialize();
}

public long getUid() {
public MailboxId getMailboxId() {
return mailboxId;
}

@JsonProperty("uid")
public long getSerializedUid() {
return uid.asLong();
}

public MessageUid getUid() {
return uid;
}
}

private final List<ReIndexingFailure> failures;
Expand All @@ -56,8 +67,13 @@ public ReIndexingExecutionFailures(List<ReIndexingFailure> failures) {
}

@JsonValue
public Multimap<String, ReIndexingFailure> failures() {
public Multimap<String, ReIndexingFailure> serializedFailures() {
return failures.stream()
.collect(Guavate.toImmutableListMultimap(ReIndexingFailure::getMailboxId));
.collect(Guavate.toImmutableListMultimap(ReIndexingFailure::getSerializedMailboxId));
}

@JsonIgnore
public List<ReIndexingFailure> failures() {
return ImmutableList.copyOf(failures);
}
}

0 comments on commit 5f6ab60

Please sign in to comment.