Skip to content

Commit

Permalink
JAMES-2550 Mailqueue with reactor
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa committed Dec 6, 2018
1 parent e96b5cb commit f121dd8
Show file tree
Hide file tree
Showing 18 changed files with 178 additions and 169 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Expand Up @@ -665,6 +665,13 @@

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Bismuth-RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>apache-james-backends-cassandra</artifactId>
Expand Down
9 changes: 9 additions & 0 deletions server/queue/queue-rabbitmq/pom.xml
Expand Up @@ -170,6 +170,15 @@
<artifactId>feign-slf4j</artifactId>
<version>${feign.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
Expand Down
Expand Up @@ -103,7 +103,7 @@ private ThrowingConsumer<Boolean> ack(long deliveryTag, Mail mail) {
if (success) {
dequeueMetric.increment();
rabbitClient.ack(deliveryTag);
mailQueueView.delete(DeleteCondition.withName(mail.getName())).join();
mailQueueView.delete(DeleteCondition.withName(mail.getName()));
} else {
rabbitClient.nack(deliveryTag);
}
Expand Down
Expand Up @@ -93,12 +93,12 @@ public long flush() {

@Override
public long clear() {
return mailQueueView.delete(DeleteCondition.all()).join();
return mailQueueView.delete(DeleteCondition.all());
}

@Override
public long remove(Type type, String value) {
return mailQueueView.delete(DeleteCondition.from(type, value)).join();
return mailQueueView.delete(DeleteCondition.from(type, value));
}

@Override
Expand Down
Expand Up @@ -36,7 +36,7 @@ interface Factory {

CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem);

CompletableFuture<Long> delete(DeleteCondition deleteCondition);
long delete(DeleteCondition deleteCondition);

CompletableFuture<Boolean> isPresent(Mail mail);

Expand Down
Expand Up @@ -31,8 +31,6 @@

import java.time.Instant;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import javax.inject.Inject;

Expand All @@ -43,6 +41,7 @@
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.annotations.VisibleForTesting;
import reactor.core.publisher.Mono;

public class BrowseStartDAO {

Expand Down Expand Up @@ -79,28 +78,29 @@ private PreparedStatement prepareInsertOne(Session session) {
.value(QUEUE_NAME, bindMarker(QUEUE_NAME)));
}

CompletableFuture<Optional<Instant>> findBrowseStart(MailQueueName queueName) {
Mono<Instant> findBrowseStart(MailQueueName queueName) {
return selectOne(queueName)
.thenApply(optional -> optional.map(this::getBrowseStart));
.map(this::getBrowseStart);
}

CompletableFuture<Void> updateBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
return executor.executeVoid(updateOne.bind()
Mono<Void> updateBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
return Mono.fromCompletionStage(executor.executeVoid(updateOne.bind()
.setTimestamp(BROWSE_START, Date.from(sliceStart))
.setString(QUEUE_NAME, mailQueueName.asString()));
.setString(QUEUE_NAME, mailQueueName.asString())));
}

CompletableFuture<Void> insertInitialBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
return executor.executeVoid(insertOne.bind()
Mono<Void> insertInitialBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
return Mono.fromCompletionStage(executor.executeVoid(insertOne.bind()
.setTimestamp(BROWSE_START, Date.from(sliceStart))
.setString(QUEUE_NAME, mailQueueName.asString()));
.setString(QUEUE_NAME, mailQueueName.asString())));
}

@VisibleForTesting
CompletableFuture<Optional<Row>> selectOne(MailQueueName queueName) {
return executor.executeSingleRow(
Mono<Row> selectOne(MailQueueName queueName) {
return Mono.fromCompletionStage(executor.executeSingleRow(
selectOne.bind()
.setString(QUEUE_NAME, queueName.asString()));
.setString(QUEUE_NAME, queueName.asString())))
.flatMap(Mono::justOrEmpty);
}

private Instant getBrowseStart(Row row) {
Expand Down
Expand Up @@ -21,16 +21,12 @@

import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId;
import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice;
import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice.allSlicesTill;

import java.time.Clock;
import java.time.Instant;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.List;

import javax.inject.Inject;
import javax.mail.MessagingException;
Expand All @@ -44,12 +40,14 @@
import org.apache.james.queue.rabbitmq.MailQueueName;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
import org.apache.james.util.FluentFutureStream;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class CassandraMailQueueBrowser {

Expand Down Expand Up @@ -101,23 +99,24 @@ public ManageableMailQueue.MailQueueItemView next() {
this.clock = clock;
}

CompletableFuture<Stream<ManageableMailQueue.MailQueueItemView>> browse(MailQueueName queueName) {
Flux<ManageableMailQueue.MailQueueItemView> browse(MailQueueName queueName) {
return browseReferences(queueName)
.map(this::toMailFuture, FluentFutureStream::unboxFuture)
.map(ManageableMailQueue.MailQueueItemView::new)
.completableFuture();
.flatMapSequential(this::toMailFuture)
.map(ManageableMailQueue.MailQueueItemView::new);
}

FluentFutureStream<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName) {
return FluentFutureStream.of(browseStartDao.findBrowseStart(queueName)
.thenApply(this::allSlicesStartingAt))
.map(slice -> browseSlice(queueName, slice), FluentFutureStream::unboxFluentFuture);
Flux<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName) {
return browseStartDao.findBrowseStart(queueName)
.flatMapMany(this::allSlicesStartingAt)
.flatMapSequential(slice -> browseSlice(queueName, slice))
.flatMapSequential(Flux::fromIterable)
.subscribeOn(Schedulers.parallel());
}

private CompletableFuture<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
private Mono<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
EnqueuedItem enqueuedItem = enqueuedItemWithSlicingContext.getEnqueuedItem();
return mimeMessageStore.read(enqueuedItem.getPartsId())
.thenApply(mimeMessage -> toMail(enqueuedItem, mimeMessage));
return Mono.fromCompletionStage(mimeMessageStore.read(enqueuedItem.getPartsId()))
.map(mimeMessage -> toMail(enqueuedItem, mimeMessage));
}

private Mail toMail(EnqueuedItem enqueuedItem, MimeMessage mimeMessage) {
Expand All @@ -132,31 +131,25 @@ private Mail toMail(EnqueuedItem enqueuedItem, MimeMessage mimeMessage) {
return mail;
}

private FluentFutureStream<EnqueuedItemWithSlicingContext> browseSlice(MailQueueName queueName, Slice slice) {
return FluentFutureStream.of(
private Mono<List<EnqueuedItemWithSlicingContext>> browseSlice(MailQueueName queueName, Slice slice) {
return
allBucketIds()
.map(bucketId ->
browseBucket(queueName, slice, bucketId).completableFuture()),
FluentFutureStream::unboxStream)
.sorted(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
.flatMap(bucketId -> browseBucket(queueName, slice, bucketId))
.collectSortedList(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
}

private FluentFutureStream<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) {
return FluentFutureStream.of(
enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId))
.thenFilter(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getMailKey()));
private Flux<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) {
return enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId)
.filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getMailKey()));
}

private Stream<Slice> allSlicesStartingAt(Optional<Instant> maybeBrowseStart) {
return maybeBrowseStart
.map(Slice::of)
.map(startSlice -> allSlicesTill(startSlice, clock.instant(), configuration.getSliceWindow()))
.orElse(Stream.empty());
private Flux<Slice> allSlicesStartingAt(Instant browseStart) {
return Flux.fromStream(Slice.of(browseStart).allSlicesTill(clock.instant(), configuration.getSliceWindow()));
}

private Stream<BucketId> allBucketIds() {
return IntStream
private Flux<BucketId> allBucketIds() {
return Flux
.range(0, configuration.getBucketCount())
.mapToObj(BucketId::of);
.map(BucketId::of);
}
}
Expand Up @@ -20,10 +20,7 @@
package org.apache.james.queue.rabbitmq.view.cassandra;

import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Stream;

import javax.inject.Inject;

Expand All @@ -32,6 +29,8 @@
import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
import org.apache.mailet.Mail;

import reactor.core.publisher.Mono;

public class CassandraMailQueueMailDelete {

private final DeletedMailsDAO deletedMailsDao;
Expand All @@ -53,42 +52,40 @@ public class CassandraMailQueueMailDelete {
this.random = random;
}

CompletableFuture<Void> considerDeleted(Mail mail, MailQueueName mailQueueName) {
Mono<Void> considerDeleted(Mail mail, MailQueueName mailQueueName) {
return considerDeleted(MailKey.fromMail(mail), mailQueueName);
}

CompletableFuture<Void> considerDeleted(MailKey mailKey, MailQueueName mailQueueName) {
Mono<Void> considerDeleted(MailKey mailKey, MailQueueName mailQueueName) {
return deletedMailsDao
.markAsDeleted(mailQueueName, mailKey)
.thenRunAsync(() -> maybeUpdateBrowseStart(mailQueueName));
.doOnTerminate(() -> maybeUpdateBrowseStart(mailQueueName));
}

CompletableFuture<Boolean> isDeleted(Mail mail, MailQueueName mailQueueName) {
Mono<Boolean> isDeleted(Mail mail, MailQueueName mailQueueName) {
return deletedMailsDao.isDeleted(mailQueueName, MailKey.fromMail(mail));
}

CompletableFuture<Void> updateBrowseStart(MailQueueName mailQueueName) {
return findNewBrowseStart(mailQueueName)
.thenCompose(newBrowseStart -> updateNewBrowseStart(mailQueueName, newBrowseStart));
void updateBrowseStart(MailQueueName mailQueueName) {
Mono<Instant> newBrowseStart = findNewBrowseStart(mailQueueName);
updateNewBrowseStart(mailQueueName, newBrowseStart);
}

private void maybeUpdateBrowseStart(MailQueueName mailQueueName) {
if (shouldUpdateBrowseStart()) {
updateBrowseStart(mailQueueName).join();
updateBrowseStart(mailQueueName);
}
}

private CompletableFuture<Optional<Instant>> findNewBrowseStart(MailQueueName mailQueueName) {
private Mono<Instant> findNewBrowseStart(MailQueueName mailQueueName) {
return cassandraMailQueueBrowser.browseReferences(mailQueueName)
.map(enqueuedItem -> enqueuedItem.getSlicingContext().getTimeRangeStart())
.completableFuture()
.thenApply(Stream::findFirst);
.next();
}

private CompletableFuture<Void> updateNewBrowseStart(MailQueueName mailQueueName, Optional<Instant> maybeNewBrowseStart) {
private Mono<Void> updateNewBrowseStart(MailQueueName mailQueueName, Mono<Instant> maybeNewBrowseStart) {
return maybeNewBrowseStart
.map(newBrowseStartInstant -> browseStartDao.updateBrowseStart(mailQueueName, newBrowseStartInstant))
.orElse(CompletableFuture.completedFuture(null));
.flatMap(newBrowseStartInstant -> browseStartDao.updateBrowseStart(mailQueueName, newBrowseStartInstant));
}

private boolean shouldUpdateBrowseStart() {
Expand Down
Expand Up @@ -21,7 +21,6 @@

import java.time.Clock;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;

import javax.inject.Inject;

Expand All @@ -32,6 +31,8 @@
import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
import org.apache.mailet.Mail;

import reactor.core.publisher.Mono;

public class CassandraMailQueueMailStore {

private final EnqueuedMailsDAO enqueuedMailsDao;
Expand All @@ -50,13 +51,13 @@ public class CassandraMailQueueMailStore {
this.clock = clock;
}

CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem) {
Mono<Void> storeMail(EnqueuedItem enqueuedItem) {
EnqueuedItemWithSlicingContext enqueuedItemAndSlicing = addSliceContext(enqueuedItem);

return enqueuedMailsDao.insert(enqueuedItemAndSlicing);
}

CompletableFuture<Void> initializeBrowseStart(MailQueueName mailQueueName) {
Mono<Void> initializeBrowseStart(MailQueueName mailQueueName) {
return browseStartDao
.insertInitialBrowseStart(mailQueueName, currentSliceStartInstant());
}
Expand Down

0 comments on commit f121dd8

Please sign in to comment.