Skip to content

Commit

Permalink
MAILBOX-304 Makes collecting FluentFutureStream easier
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa committed Sep 6, 2017
1 parent fd1c775 commit 6ad3c7e
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 8 deletions.
Expand Up @@ -28,13 +28,15 @@
import static org.apache.james.mailbox.cassandra.table.CassandraAttachmentTable.SIZE;
import static org.apache.james.mailbox.cassandra.table.CassandraAttachmentTable.TABLE_NAME;
import static org.apache.james.mailbox.cassandra.table.CassandraAttachmentTable.TYPE;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

import javax.inject.Inject;

import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
Expand All @@ -45,15 +47,16 @@
import org.apache.james.mailbox.store.mail.AttachmentMapper;
import org.apache.james.util.FluentFutureStream;
import org.apache.james.util.OptionalUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.github.fge.lambdas.Throwing;
import com.github.fge.lambdas.ThrownByLambdaException;
import com.github.steveash.guavate.Guavate;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;

public class CassandraAttachmentMapper implements AttachmentMapper {

Expand Down Expand Up @@ -99,7 +102,7 @@ public List<Attachment> getAttachments(Collection<AttachmentId> attachmentIds) {
return getAttachmentsAsFuture(attachmentIds).join();
}

public CompletableFuture<List<Attachment>> getAttachmentsAsFuture(Collection<AttachmentId> attachmentIds) {
public CompletableFuture<ImmutableList<Attachment>> getAttachmentsAsFuture(Collection<AttachmentId> attachmentIds) {
Preconditions.checkArgument(attachmentIds != null);

Stream<CompletableFuture<Optional<Attachment>>> attachments = attachmentIds
Expand All @@ -110,9 +113,7 @@ public CompletableFuture<List<Attachment>> getAttachmentsAsFuture(Collection<Att
return FluentFutureStream
.of(attachments)
.flatMap(OptionalUtils::toStream)
.completableFuture()
.thenApply(stream ->
stream.collect(Guavate.toImmutableList()));
.collect(Guavate.toImmutableList());
}

private CompletableFuture<Optional<Attachment>> getAttachmentAsFuture(AttachmentId attachmentId) {
Expand Down
Expand Up @@ -97,8 +97,7 @@ private Stream<SimpleMailboxMessage> findAsStream(List<MessageId> messageIds, Fe
return FluentFutureStream.ofNestedStreams(
messageIds.stream()
.map(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty())))
.completableFuture()
.thenApply(stream -> stream.collect(Guavate.toImmutableList()))
.collect(Guavate.toImmutableList())
.thenCompose(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited()))
.thenApply(stream -> stream
.filter(CassandraMessageDAO.MessageResult::isFound)
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collector;
import java.util.stream.Stream;

public class FluentFutureStream<T> {
Expand Down Expand Up @@ -173,6 +174,14 @@ public CompletableFuture<Stream<T>> completableFuture() {
return this.completableFuture;
}

/**
* Returns the future of the underlying collected stream.
*/
public <C> CompletableFuture<C> collect(Collector<T, ?, C> collector) {
return this.completableFuture
.thenApply(stream -> stream.collect(collector));
}

/**
* Join and returns the underlying stream.
*/
Expand Down
Expand Up @@ -233,4 +233,26 @@ public void thenPerformOnAllShouldGenerateASynchronousSideEffectForAllElementsOf
assertThat(sideEffects).containsOnly(1, 2, 3);
}

@Test
public void collectShouldReturnTheCollectionOfData() {
assertThat(
FluentFutureStream.of(
Stream.of(
CompletableFuture.completedFuture(1),
CompletableFuture.completedFuture(2),
CompletableFuture.completedFuture(3)))
.collect(Guavate.toImmutableList())
.join())
.containsExactly(1, 2, 3);
}

@Test
public void collectShouldReturnEmptyWhenSteamIsEmpty() {
assertThat(
FluentFutureStream.ofFutures()
.collect(Guavate.toImmutableList())
.join())
.isEmpty();
}

}

0 comments on commit 6ad3c7e

Please sign in to comment.