Skip to content

Commit

Permalink
JAMES-2082 Adding more capability to Stream future handling
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa authored and aduprat committed Jul 10, 2017
1 parent cae6beb commit c00651a
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 28 deletions.
Expand Up @@ -50,6 +50,7 @@
import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox;
import org.apache.james.util.CompletableFutureUtil;
import org.apache.james.util.FluentFutureStream;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
Expand Down Expand Up @@ -141,22 +142,29 @@ public CompletableFuture<Void> delete(CassandraId mailboxId) {
}

public CompletableFuture<Optional<SimpleMailbox>> retrieveMailbox(CassandraId mailboxId) {
return mailbox(mailboxId,
executor.executeSingleRow(readStatement.bind()
.setUUID(ID, mailboxId.asUuid())));
CompletableFuture<MailboxACL> aclCompletableFuture =
new CassandraACLMapper(mailboxId, session, executor, cassandraConfiguration)
.getACL();

CompletableFuture<Optional<SimpleMailbox>> simpleMailboxFuture = executor.executeSingleRow(readStatement.bind()
.setUUID(ID, mailboxId.asUuid()))
.thenApply(rowOptional -> rowOptional.map(this::mailboxFromRow))
.thenApply(mailbox -> addMailboxId(mailboxId, mailbox));

return CompletableFutureUtil.combine(
aclCompletableFuture,
simpleMailboxFuture,
this::addAcl);
}

private Optional<SimpleMailbox> addMailboxId(CassandraId cassandraId, Optional<SimpleMailbox> mailboxOptional) {
mailboxOptional.ifPresent(mailbox -> mailbox.setMailboxId(cassandraId));
return mailboxOptional;
}

private CompletableFuture<Optional<SimpleMailbox>> mailbox(CassandraId cassandraId, CompletableFuture<Optional<Row>> rowFuture) {
CompletableFuture<MailboxACL> aclCompletableFuture = new CassandraACLMapper(cassandraId, session, executor, cassandraConfiguration).getACL();
return rowFuture.thenApply(rowOptional -> rowOptional.map(this::mailboxFromRow))
.thenApply(mailboxOptional -> {
mailboxOptional.ifPresent(mailbox -> mailbox.setMailboxId(cassandraId));
return mailboxOptional;
})
.thenCompose(mailboxOptional -> aclCompletableFuture.thenApply(acl -> {
mailboxOptional.ifPresent(mailbox -> mailbox.setACL(acl));
return mailboxOptional;
}));
private Optional<SimpleMailbox> addAcl(MailboxACL acl, Optional<SimpleMailbox> mailboxOptional) {
mailboxOptional.ifPresent(mailbox -> mailbox.setACL(acl));
return mailboxOptional;
}

private SimpleMailbox mailboxFromRow(Row row) {
Expand All @@ -169,10 +177,11 @@ private SimpleMailbox mailboxFromRow(Row row) {
}

public CompletableFuture<Stream<SimpleMailbox>> retrieveAllMailboxes() {
return executor.execute(listStatement.bind())
.thenApply(cassandraUtils::convertToStream)
.thenApply(stream -> stream.map(this::toMailboxWithId))
.thenCompose(stream -> CompletableFutureUtil.allOf(stream.map(this::toMailboxWithAclFuture)));
return FluentFutureStream.of(executor.execute(listStatement.bind())
.thenApply(cassandraUtils::convertToStream))
.map(this::toMailboxWithId)
.thenComposeOnAll(this::toMailboxWithAclFuture)
.completableFuture();
}

private SimpleMailbox toMailboxWithId(Row row) {
Expand Down
Expand Up @@ -43,7 +43,7 @@
import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox;
import org.apache.james.util.CompletableFutureUtil;
import org.apache.james.util.OptionalConverter;
import org.apache.james.util.FluentFutureStream;

import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.InvalidQueryException;
Expand Down Expand Up @@ -111,15 +111,14 @@ public Mailbox findMailboxById(MailboxId id) throws MailboxException {
@Override
public List<Mailbox> findMailboxWithPathLike(MailboxPath path) throws MailboxException {
Pattern regex = Pattern.compile(constructEscapedRegexForMailboxNameMatching(path));
return mailboxPathDAO.listUserMailboxes(path.getNamespace(), path.getUser())
.thenApply(stream -> stream.filter(idAndPath -> regex.matcher(idAndPath.getMailboxPath().getName()).matches()))
.thenApply(stream -> stream.map(CassandraMailboxPathDAO.CassandraIdAndPath::getCassandraId))
.thenApply(stream -> stream.map(mailboxDAO::retrieveMailbox))
.thenCompose(CompletableFutureUtil::allOf)
.thenApply(stream -> stream
.flatMap(OptionalConverter::toStream)
.collect(Guavate.<Mailbox>toImmutableList()))
.join();

return FluentFutureStream.of(mailboxPathDAO.listUserMailboxes(path.getNamespace(), path.getUser()))
.filter(idAndPath -> regex.matcher(idAndPath.getMailboxPath().getName()).matches())
.map(CassandraMailboxPathDAO.CassandraIdAndPath::getCassandraId)
.thenFlatComposeOnOptional(mailboxDAO::retrieveMailbox)
.completableFuture()
.join()
.collect(Guavate.toImmutableList());
}

@Override
Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -33,6 +34,11 @@ public static <T> CompletableFuture<Stream<T>> allOfArray(CompletableFuture<T>..
return allOf(Stream.of(futures));
}

public static <T, U, V> CompletableFuture<V> combine(CompletableFuture<T> t, CompletableFuture<U> u, BiFunction<T,U,V> combiner) {
return t.thenCompose(valueT ->
u.thenApply(valueU -> combiner.apply(valueT, valueU)));
}

public static <T> CompletableFuture<Stream<T>> allOf(Stream<CompletableFuture<T>> futureStream) {
return futureStream
.map((CompletableFuture<T> future) -> future.thenApply(Stream::of))
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;

public class FluentFutureStream<T> {
Expand All @@ -33,6 +34,16 @@ public static <T> FluentFutureStream<T> of(CompletableFuture<Stream<T>> completa
return new FluentFutureStream<>(completableFuture);
}

public static <T> FluentFutureStream<T> ofNestedStreams(Stream<CompletableFuture<Stream<T>>> completableFuture) {
return of(completableFuture)
.flatMap(Function.identity());
}

public static <T> FluentFutureStream<T> ofOptionals(Stream<CompletableFuture<Optional<T>>> completableFuture) {
return of(completableFuture)
.flatMapOptional(Function.identity());
}

public static <T> FluentFutureStream<T> of(Stream<CompletableFuture<T>> completableFutureStream) {
return new FluentFutureStream<>(CompletableFutureUtil.allOf(completableFutureStream));
}
Expand Down Expand Up @@ -69,11 +80,28 @@ public <U> FluentFutureStream<U> thenComposeOnAll(Function<T, CompletableFuture<
CompletableFutureUtil.thenComposeOnAll(completableFuture(), function));
}

public <U> FluentFutureStream<U> thenFlatCompose(Function<T, CompletableFuture<Stream<U>>> function) {
return FluentFutureStream.of(
CompletableFutureUtil.thenComposeOnAll(completableFuture(), function))
.flatMap(Function.identity());
}

public <U> FluentFutureStream<U> thenFlatComposeOnOptional(Function<T, CompletableFuture<Optional<U>>> function) {
return FluentFutureStream.of(
CompletableFutureUtil.thenComposeOnAll(completableFuture(), function))
.flatMapOptional(Function.identity());
}

public <U> FluentFutureStream<U> flatMap(Function<T, Stream<U>> function) {
return FluentFutureStream.of(completableFuture().thenApply(stream ->
stream.flatMap(function)));
}

public <U> FluentFutureStream<U> flatMapOptional(Function<T, Optional<U>> function) {
return map(function)
.flatMap(OptionalConverter::toStream);
}

public <U> FluentFutureStream<U> thenCompose(Function<Stream<T>, CompletableFuture<Stream<U>>> function) {
return FluentFutureStream.of(completableFuture().thenCompose(function));
}
Expand All @@ -82,6 +110,11 @@ public CompletableFuture<Stream<T>> completableFuture() {
return this.completableFuture;
}

public FluentFutureStream<T> filter(Predicate<T> predicate) {
return FluentFutureStream.of(completableFuture
.thenApply(stream -> stream.filter(predicate)));
}

public Stream<T> join() {
return completableFuture().join();
}
Expand Down
Expand Up @@ -51,6 +51,20 @@ public void tearDown() {
executorService.shutdownNow();
}

@Test
public void combineShouldReturnCombinationOfBothSuppliedFutures() {
int value1 = 18;
int value2 = 12;

assertThat(CompletableFutureUtil.combine(
CompletableFuture.completedFuture(value1),
CompletableFuture.completedFuture(value2),
(a, b) -> 2 * a + b)
.join())
.isEqualTo(2 * value1 + value2);

}

@Test
public void allOfShouldUnboxEmptyStream() {
assertThat(
Expand Down

0 comments on commit c00651a

Please sign in to comment.