diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java index fee0e15c9af..c9a136d07c9 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java @@ -50,6 +50,7 @@ import org.apache.james.mailbox.store.mail.model.Mailbox; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox; import org.apache.james.util.CompletableFutureUtil; +import org.apache.james.util.FluentFutureStream; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; @@ -141,22 +142,29 @@ public CompletableFuture delete(CassandraId mailboxId) { } public CompletableFuture> retrieveMailbox(CassandraId mailboxId) { - return mailbox(mailboxId, - executor.executeSingleRow(readStatement.bind() - .setUUID(ID, mailboxId.asUuid()))); + CompletableFuture aclCompletableFuture = + new CassandraACLMapper(mailboxId, session, executor, cassandraConfiguration) + .getACL(); + + CompletableFuture> simpleMailboxFuture = executor.executeSingleRow(readStatement.bind() + .setUUID(ID, mailboxId.asUuid())) + .thenApply(rowOptional -> rowOptional.map(this::mailboxFromRow)) + .thenApply(mailbox -> addMailboxId(mailboxId, mailbox)); + + return CompletableFutureUtil.combine( + aclCompletableFuture, + simpleMailboxFuture, + this::addAcl); + } + + private Optional addMailboxId(CassandraId cassandraId, Optional mailboxOptional) { + mailboxOptional.ifPresent(mailbox -> mailbox.setMailboxId(cassandraId)); + return mailboxOptional; } - private CompletableFuture> mailbox(CassandraId cassandraId, CompletableFuture> rowFuture) { - CompletableFuture aclCompletableFuture = new CassandraACLMapper(cassandraId, session, executor, cassandraConfiguration).getACL(); - return rowFuture.thenApply(rowOptional -> rowOptional.map(this::mailboxFromRow)) - .thenApply(mailboxOptional -> { - mailboxOptional.ifPresent(mailbox -> mailbox.setMailboxId(cassandraId)); - return mailboxOptional; - }) - .thenCompose(mailboxOptional -> aclCompletableFuture.thenApply(acl -> { - mailboxOptional.ifPresent(mailbox -> mailbox.setACL(acl)); - return mailboxOptional; - })); + private Optional addAcl(MailboxACL acl, Optional mailboxOptional) { + mailboxOptional.ifPresent(mailbox -> mailbox.setACL(acl)); + return mailboxOptional; } private SimpleMailbox mailboxFromRow(Row row) { @@ -169,10 +177,11 @@ private SimpleMailbox mailboxFromRow(Row row) { } public CompletableFuture> retrieveAllMailboxes() { - return executor.execute(listStatement.bind()) - .thenApply(cassandraUtils::convertToStream) - .thenApply(stream -> stream.map(this::toMailboxWithId)) - .thenCompose(stream -> CompletableFutureUtil.allOf(stream.map(this::toMailboxWithAclFuture))); + return FluentFutureStream.of(executor.execute(listStatement.bind()) + .thenApply(cassandraUtils::convertToStream)) + .map(this::toMailboxWithId) + .thenComposeOnAll(this::toMailboxWithAclFuture) + .completableFuture(); } private SimpleMailbox toMailboxWithId(Row row) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java index ef0362af97b..0a2a3f0c3b8 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java @@ -43,7 +43,7 @@ import org.apache.james.mailbox.store.mail.model.Mailbox; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox; import org.apache.james.util.CompletableFutureUtil; -import org.apache.james.util.OptionalConverter; +import org.apache.james.util.FluentFutureStream; import com.datastax.driver.core.Session; import com.datastax.driver.core.exceptions.InvalidQueryException; @@ -111,15 +111,14 @@ public Mailbox findMailboxById(MailboxId id) throws MailboxException { @Override public List findMailboxWithPathLike(MailboxPath path) throws MailboxException { Pattern regex = Pattern.compile(constructEscapedRegexForMailboxNameMatching(path)); - return mailboxPathDAO.listUserMailboxes(path.getNamespace(), path.getUser()) - .thenApply(stream -> stream.filter(idAndPath -> regex.matcher(idAndPath.getMailboxPath().getName()).matches())) - .thenApply(stream -> stream.map(CassandraMailboxPathDAO.CassandraIdAndPath::getCassandraId)) - .thenApply(stream -> stream.map(mailboxDAO::retrieveMailbox)) - .thenCompose(CompletableFutureUtil::allOf) - .thenApply(stream -> stream - .flatMap(OptionalConverter::toStream) - .collect(Guavate.toImmutableList())) - .join(); + + return FluentFutureStream.of(mailboxPathDAO.listUserMailboxes(path.getNamespace(), path.getUser())) + .filter(idAndPath -> regex.matcher(idAndPath.getMailboxPath().getName()).matches()) + .map(CassandraMailboxPathDAO.CassandraIdAndPath::getCassandraId) + .thenFlatComposeOnOptional(mailboxDAO::retrieveMailbox) + .completableFuture() + .join() + .collect(Guavate.toImmutableList()); } @Override diff --git a/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java b/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java index 6b5c3128fad..67ee0550bd0 100644 --- a/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java +++ b/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java @@ -21,6 +21,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.function.Supplier; @@ -33,6 +34,11 @@ public static CompletableFuture> allOfArray(CompletableFuture.. return allOf(Stream.of(futures)); } + public static CompletableFuture combine(CompletableFuture t, CompletableFuture u, BiFunction combiner) { + return t.thenCompose(valueT -> + u.thenApply(valueU -> combiner.apply(valueT, valueU))); + } + public static CompletableFuture> allOf(Stream> futureStream) { return futureStream .map((CompletableFuture future) -> future.thenApply(Stream::of)) diff --git a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java index 810f264a429..fa40616c26d 100644 --- a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java +++ b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java @@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.BinaryOperator; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Stream; public class FluentFutureStream { @@ -33,6 +34,16 @@ public static FluentFutureStream of(CompletableFuture> completa return new FluentFutureStream<>(completableFuture); } + public static FluentFutureStream ofNestedStreams(Stream>> completableFuture) { + return of(completableFuture) + .flatMap(Function.identity()); + } + + public static FluentFutureStream ofOptionals(Stream>> completableFuture) { + return of(completableFuture) + .flatMapOptional(Function.identity()); + } + public static FluentFutureStream of(Stream> completableFutureStream) { return new FluentFutureStream<>(CompletableFutureUtil.allOf(completableFutureStream)); } @@ -69,11 +80,28 @@ public FluentFutureStream thenComposeOnAll(Function FluentFutureStream thenFlatCompose(Function>> function) { + return FluentFutureStream.of( + CompletableFutureUtil.thenComposeOnAll(completableFuture(), function)) + .flatMap(Function.identity()); + } + + public FluentFutureStream thenFlatComposeOnOptional(Function>> function) { + return FluentFutureStream.of( + CompletableFutureUtil.thenComposeOnAll(completableFuture(), function)) + .flatMapOptional(Function.identity()); + } + public FluentFutureStream flatMap(Function> function) { return FluentFutureStream.of(completableFuture().thenApply(stream -> stream.flatMap(function))); } + public FluentFutureStream flatMapOptional(Function> function) { + return map(function) + .flatMap(OptionalConverter::toStream); + } + public FluentFutureStream thenCompose(Function, CompletableFuture>> function) { return FluentFutureStream.of(completableFuture().thenCompose(function)); } @@ -82,6 +110,11 @@ public CompletableFuture> completableFuture() { return this.completableFuture; } + public FluentFutureStream filter(Predicate predicate) { + return FluentFutureStream.of(completableFuture + .thenApply(stream -> stream.filter(predicate))); + } + public Stream join() { return completableFuture().join(); } diff --git a/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java b/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java index 3a7e5711a35..b4ea41d901a 100644 --- a/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java +++ b/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java @@ -51,6 +51,20 @@ public void tearDown() { executorService.shutdownNow(); } + @Test + public void combineShouldReturnCombinationOfBothSuppliedFutures() { + int value1 = 18; + int value2 = 12; + + assertThat(CompletableFutureUtil.combine( + CompletableFuture.completedFuture(value1), + CompletableFuture.completedFuture(value2), + (a, b) -> 2 * a + b) + .join()) + .isEqualTo(2 * value1 + value2); + + } + @Test public void allOfShouldUnboxEmptyStream() { assertThat(