Skip to content

Commit

Permalink
[NO_REVIEW] linagora#1011 JAMES-3737 Reactive IMAP CREATE
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa committed May 25, 2022
1 parent 6fed452 commit be27828
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ enum SearchCapabilities {
*/
Optional<MailboxId> createMailbox(MailboxPath mailboxPath, MailboxSession mailboxSession) throws MailboxException;

default Publisher<MailboxId> createMailboxReactive(MailboxPath mailboxPath, MailboxSession mailboxSession) {
return Mono.fromCallable(() -> createMailbox(mailboxPath, mailboxSession))
.handle((optional, sink) -> optional.ifPresent(sink::next));
}

/**
* Delete the mailbox with the name
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@
import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITED;

import java.time.Duration;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;

import javax.inject.Inject;

Expand All @@ -52,6 +50,7 @@
import org.apache.james.mailbox.exception.InsufficientRightsException;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.exception.MailboxExistsException;
import org.apache.james.mailbox.exception.MailboxNameException;
import org.apache.james.mailbox.exception.MailboxNotFoundException;
import org.apache.james.mailbox.exception.SubscriptionException;
import org.apache.james.mailbox.exception.UnsupportedRightException;
Expand Down Expand Up @@ -98,7 +97,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -334,94 +332,98 @@ private boolean userHasLookupRightsOn(Mailbox mailbox, MailboxSession session) {

@Override
public Optional<MailboxId> createMailbox(MailboxPath mailboxPath, MailboxSession mailboxSession) throws MailboxException {
return MailboxReactorUtils.blockOptional(createMailboxReactive(mailboxPath, mailboxSession));
}

public Mono<MailboxId> createMailboxReactive(MailboxPath mailboxPath, MailboxSession mailboxSession) {
LOGGER.debug("createMailbox {}", mailboxPath);

assertMailboxPathBelongToUser(mailboxSession, mailboxPath);
return assertMailboxPathBelongToUserReactive(mailboxSession, mailboxPath)
.then(doCreateMailboxReactive(mailboxPath, mailboxSession));
}

private Mono<MailboxId> doCreateMailboxReactive(MailboxPath mailboxPath, MailboxSession mailboxSession) {
if (mailboxPath.getName().isEmpty()) {
LOGGER.warn("Ignoring mailbox with empty name");
return Mono.empty();
} else {
MailboxPath sanitizedMailboxPath = mailboxPath.sanitize(mailboxSession.getPathDelimiter());
sanitizedMailboxPath.assertAcceptable(mailboxSession.getPathDelimiter());

if (block(mailboxExists(sanitizedMailboxPath, mailboxSession))) {
throw new MailboxExistsException(sanitizedMailboxPath.asString());
}

List<MailboxId> mailboxIds = createMailboxesForPath(mailboxSession, sanitizedMailboxPath);

if (!mailboxIds.isEmpty()) {
return Optional.ofNullable(Iterables.getLast(mailboxIds));
try {
MailboxPath sanitizedMailboxPath = mailboxPath.sanitize(mailboxSession.getPathDelimiter());
sanitizedMailboxPath.assertAcceptable(mailboxSession.getPathDelimiter());

return mailboxExists(sanitizedMailboxPath, mailboxSession)
.flatMap(exists -> {
if (exists) {
return Mono.error(new MailboxExistsException(sanitizedMailboxPath.asString()));
} else {
return createMailboxesForPath(mailboxSession, sanitizedMailboxPath).takeLast(1).next();
}
});
} catch (MailboxNameException e) {
return Mono.error(e);
}
}
return Optional.empty();
}

private List<MailboxId> createMailboxesForPath(MailboxSession mailboxSession, MailboxPath sanitizedMailboxPath) {
private Flux<MailboxId> createMailboxesForPath(MailboxSession mailboxSession, MailboxPath sanitizedMailboxPath) {
// Create parents first
// If any creation fails then the mailbox will not be created
// TODO: transaction
List<MailboxPath> intermediatePaths = sanitizedMailboxPath.getHierarchyLevels(getDelimiter());
boolean isRootPath = intermediatePaths.size() == 1;

return intermediatePaths
.stream()
.flatMap(Throwing.<MailboxPath, Stream<MailboxId>>function(mailboxPath -> manageMailboxCreation(mailboxSession, isRootPath, mailboxPath)).sneakyThrow())
.collect(ImmutableList.toImmutableList());
return Flux.fromIterable(intermediatePaths)
.concatMap(path -> manageMailboxCreation(mailboxSession, isRootPath, path));
}

private Stream<MailboxId> manageMailboxCreation(MailboxSession mailboxSession, boolean isRootPath, MailboxPath mailboxPath) throws MailboxException {
private Mono<MailboxId> manageMailboxCreation(MailboxSession mailboxSession, boolean isRootPath, MailboxPath mailboxPath) {
if (mailboxPath.isInbox()) {
if (block(hasInbox(mailboxSession))) {
return duplicatedINBOXCreation(isRootPath, mailboxPath);
}

return performConcurrentMailboxCreation(mailboxSession, MailboxPath.inbox(mailboxSession)).stream();
return Mono.from(hasInbox(mailboxSession))
.flatMap(hasInbox -> {
if (hasInbox) {
return duplicatedINBOXCreation(isRootPath, mailboxPath);
}
return performConcurrentMailboxCreation(mailboxSession, MailboxPath.inbox(mailboxSession));
});
}

return performConcurrentMailboxCreation(mailboxSession, mailboxPath).stream();
return performConcurrentMailboxCreation(mailboxSession, mailboxPath);
}


private Stream<MailboxId> duplicatedINBOXCreation(boolean isRootPath, MailboxPath mailbox) throws InboxAlreadyCreated {
private Mono<MailboxId> duplicatedINBOXCreation(boolean isRootPath, MailboxPath mailbox) {
if (isRootPath) {
throw new InboxAlreadyCreated(mailbox.getName());
return Mono.error(new InboxAlreadyCreated(mailbox.getName()));
}

return Stream.empty();
return Mono.empty();
}

private List<MailboxId> performConcurrentMailboxCreation(MailboxSession mailboxSession, MailboxPath mailboxPath) throws MailboxException {
List<MailboxId> mailboxIds = new ArrayList<>();
private Mono<MailboxId> performConcurrentMailboxCreation(MailboxSession mailboxSession, MailboxPath mailboxPath) {
MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
locker.executeWithLock(mailboxPath, () ->
block(mapper.executeReactive(mapper.create(mailboxPath, UidValidity.generate())
.doOnNext(mailbox -> mailboxIds.add(mailbox.getMailboxId()))
.flatMap(mailbox ->
// notify listeners
eventBus.dispatch(EventFactory.mailboxAdded()
.randomEventId()
.mailboxSession(mailboxSession)
.mailbox(mailbox)
.build(),
new MailboxIdRegistrationKey(mailbox.getMailboxId()))))
.onErrorResume(e -> {
if (e instanceof MailboxExistsException) {
LOGGER.info("{} mailbox was created concurrently", mailboxPath.asString());
} else if (e instanceof MailboxException) {
return Mono.error(e);
}
return Mono.empty();
})), MailboxPathLocker.LockType.Write);

return mailboxIds;
return Mono.from(locker.executeReactiveWithLockReactive(mailboxPath,
mapper.executeReactive(mapper.create(mailboxPath, UidValidity.generate())
.flatMap(mailbox ->
// notify listeners
eventBus.dispatch(EventFactory.mailboxAdded()
.randomEventId()
.mailboxSession(mailboxSession)
.mailbox(mailbox)
.build(),
new MailboxIdRegistrationKey(mailbox.getMailboxId()))
.thenReturn(mailbox.getMailboxId()))
.onErrorResume(MailboxExistsException.class, e -> {
LOGGER.info("{} mailbox was created concurrently", mailboxPath.asString());
return Mono.empty();
})), MailboxPathLocker.LockType.Write));
}

private void assertMailboxPathBelongToUser(MailboxSession mailboxSession, MailboxPath mailboxPath) throws MailboxException {
private Mono<Void> assertMailboxPathBelongToUserReactive(MailboxSession mailboxSession, MailboxPath mailboxPath) {
if (!mailboxPath.belongsTo(mailboxSession)) {
throw new InsufficientRightsException("mailboxPath '" + mailboxPath.asString() + "'"
+ " does not belong to user '" + mailboxSession.getUser().asString() + "'");
return Mono.error(new InsufficientRightsException("mailboxPath '" + mailboxPath.asString() + "'"
+ " does not belong to user '" + mailboxSession.getUser().asString() + "'"));
}
return Mono.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.james.imap.processor;

import static org.apache.james.util.ReactorUtils.logOnError;

import org.apache.james.imap.api.display.HumanReadableText;
import org.apache.james.imap.api.message.response.StatusResponseFactory;
import org.apache.james.imap.api.process.ImapSession;
Expand All @@ -28,12 +30,13 @@
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.exception.MailboxExistsException;
import org.apache.james.mailbox.exception.TooLongMailboxNameException;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.util.MDCBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.publisher.Mono;

public class CreateProcessor extends AbstractMailboxProcessor<CreateRequest> {
private static final Logger LOGGER = LoggerFactory.getLogger(CreateProcessor.class);

Expand All @@ -43,23 +46,29 @@ public CreateProcessor(MailboxManager mailboxManager, StatusResponseFactory fact
}

@Override
protected void processRequest(CreateRequest request, ImapSession session, Responder responder) {
final MailboxPath mailboxPath = PathConverter.forSession(session).buildFullPath(request.getMailboxName());
try {
final MailboxManager mailboxManager = getMailboxManager();
mailboxManager.createMailbox(mailboxPath, session.getMailboxSession());
unsolicitedResponses(session, responder, false).block();
okComplete(request, responder);
} catch (MailboxExistsException e) {
LOGGER.debug("Create failed for mailbox {} as it already exists", mailboxPath, e);
no(request, responder, HumanReadableText.MAILBOX_EXISTS);
} catch (TooLongMailboxNameException e) {
LOGGER.debug("The mailbox name length is over limit: {}", mailboxPath.getName(), e);
taggedBad(request, responder, HumanReadableText.FAILURE_MAILBOX_NAME);
} catch (MailboxException e) {
LOGGER.error("Create failed for mailbox {}", mailboxPath, e);
no(request, responder, HumanReadableText.GENERIC_FAILURE_DURING_PROCESSING);
}
protected Mono<Void> processRequestReactive(CreateRequest request, ImapSession session, Responder responder) {
MailboxManager mailboxManager = getMailboxManager();

return Mono.fromCallable(() -> PathConverter.forSession(session).buildFullPath(request.getMailboxName()))
.flatMap(mailboxPath -> Mono.from(mailboxManager.createMailboxReactive(mailboxPath, session.getMailboxSession()))
.then(unsolicitedResponses(session, responder, false))
.then(Mono.fromRunnable(() -> okComplete(request, responder)))
.doOnEach(logOnError(MailboxExistsException.class, e -> LOGGER.debug("Create failed for mailbox {} as it already exists", mailboxPath, e)))
.onErrorResume(MailboxExistsException.class, e -> {
no(request, responder, HumanReadableText.MAILBOX_EXISTS);
return Mono.empty();
})
.doOnEach(logOnError(TooLongMailboxNameException.class, e -> LOGGER.debug("The mailbox name length is over limit: {}", mailboxPath.getName(), e)))
.onErrorResume(TooLongMailboxNameException.class, e -> {
taggedBad(request, responder, HumanReadableText.FAILURE_MAILBOX_NAME);
return Mono.empty();
})
.doOnEach(logOnError(MailboxException.class, e -> LOGGER.error("Create failed for mailbox {}", mailboxPath, e)))
.onErrorResume(TooLongMailboxNameException.class, e -> {
no(request, responder, HumanReadableText.GENERIC_FAILURE_DURING_PROCESSING);
return Mono.empty();
})
.then());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private Mono<Result> check(Username username) {
private Mono<MessageManager> retrieveInbox(Username username, MailboxSession session) {
MailboxPath mailboxPath = MailboxPath.inbox(username);
return Mono.from(mailboxManager.getMailboxReactive(mailboxPath, session))
.onErrorResume(MailboxNotFoundException.class, e -> Mono.fromCallable(() -> mailboxManager.createMailbox(mailboxPath, session))
.onErrorResume(MailboxNotFoundException.class, e -> Mono.from(mailboxManager.createMailboxReactive(mailboxPath, session))
.then(Mono.from(mailboxManager.getMailboxReactive(mailboxPath, session))));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.fge.lambdas.Throwing;
import com.google.common.base.Strings;

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class MailboxAppenderImpl implements MailboxAppender {
private static final Logger LOGGER = LoggerFactory.getLogger(MailboxAppenderImpl.class);
Expand Down Expand Up @@ -119,8 +117,7 @@ public long size() throws MailboxException {
private Mono<MessageManager> createMailboxIfNotExist(MailboxSession session, MailboxPath path) {
return Mono.from(mailboxManager.getMailboxReactive(path, session))
.onErrorResume(MailboxNotFoundException.class, e ->
Mono.fromRunnable(Throwing.runnable(() -> mailboxManager.createMailbox(path, session)).sneakyThrow())
.subscribeOn(Schedulers.elastic())
Mono.from(mailboxManager.createMailboxReactive(path, session))
.then(Mono.from(mailboxManager.getMailboxReactive(path, session)))
.onErrorResume(MailboxExistsException.class, e2 -> {
LOGGER.info("Mailbox {} have been created concurrently", path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.james.util.FunctionalUtils.negate;
import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;

import java.util.Optional;
import java.util.function.Function;

import javax.inject.Inject;
Expand All @@ -30,9 +29,7 @@
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.SubscriptionManager;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.exception.MailboxExistsException;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.metrics.api.MetricFactory;
import org.slf4j.Logger;
Expand All @@ -42,7 +39,6 @@

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class DefaultMailboxesProvisioner {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMailboxesProvisioner.class);
Expand All @@ -69,8 +65,7 @@ private Mono<Void> createDefaultMailboxes(MailboxSession session) {
return Flux.fromIterable(DefaultMailboxes.DEFAULT_MAILBOXES)
.map(toMailboxPath(session))
.filterWhen(mailboxPath -> mailboxDoesntExist(mailboxPath, session), DEFAULT_CONCURRENCY)
.concatMap(mailboxPath -> Mono.fromRunnable(() -> createMailbox(mailboxPath, session))
.subscribeOn(Schedulers.elastic()))
.concatMap(mailboxPath -> createMailbox(mailboxPath, session))
.then();
}

Expand All @@ -83,17 +78,9 @@ private Function<String, MailboxPath> toMailboxPath(MailboxSession session) {
return mailbox -> MailboxPath.forUser(session.getUser(), mailbox);
}

private void createMailbox(MailboxPath mailboxPath, MailboxSession session) {
try {
Optional<MailboxId> mailboxId = mailboxManager.createMailbox(mailboxPath, session);
if (mailboxId.isPresent()) {
subscriptionManager.subscribe(session, mailboxPath.getName());
}
LOGGER.info("Provisioning {}. {} created.", mailboxPath, mailboxId);
} catch (MailboxExistsException e) {
LOGGER.info("Mailbox {} have been created concurrently", mailboxPath);
} catch (MailboxException e) {
throw new RuntimeException(e);
}
private Mono<Void> createMailbox(MailboxPath mailboxPath, MailboxSession session) {
return Mono.from(mailboxManager.createMailboxReactive(mailboxPath, session))
.flatMap(id -> Mono.from(subscriptionManager.subscribeReactive(mailboxPath.getName(), session)))
.onErrorContinue(MailboxExistsException.class, (e, o) -> LOGGER.info("Mailbox {} have been created concurrently", mailboxPath));
}
}

0 comments on commit be27828

Please sign in to comment.