Skip to content

Commit

Permalink
JAMES-3687 Removes the mime message blobs
Browse files Browse the repository at this point in the history
- upon acknowledged dequeue
- upon successful filtering after a remove
  • Loading branch information
jeantil authored and chibenwa committed Jan 7, 2022
1 parent 80df025 commit 81223ef
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ package org.apache.james.queue.pulsar
import akka.actor.{Actor, ActorLogging, Props}
import com.sksamuel.pulsar4s.akka.streams.CommittableMessage
import com.sksamuel.pulsar4s.{ConsumerMessage, SequenceId}
import org.apache.james.blob.api.BlobId

import scala.math.Ordered.orderingToOrdered

private[pulsar] class FilterStage extends Actor with ActorLogging {
private[pulsar] class FilterStage(implicit val blobIdFactory:BlobId.Factory) extends Actor with ActorLogging {
private var filters = Set.empty[Filter]
private val name = self.path.name

Expand All @@ -41,10 +42,10 @@ private[pulsar] class FilterStage extends Actor with ActorLogging {
log.debug(s"filtering mail with active filters : $filters , sequence: ${sequenceId} metadata : $metadata")
if (shouldBeFiltered(metadata, sequenceId)) {
log.debug(s"message filtered : sequence: ${sequenceId} metadata : $metadata")
sender() ! (None, cm)
sender() ! (None, Some(metadata.partsId), cm)
} else {
log.debug(s"message not filtered : sequence: ${sequenceId} metadata : $metadata")
sender() ! (Some(metadata), cm)
sender() ! (Some(metadata), None, cm)
}
removeExpiredFilters(sequenceId)

Expand Down Expand Up @@ -78,5 +79,5 @@ private[pulsar] class FilterStage extends Actor with ActorLogging {
}

private[pulsar] object FilterStage {
def props = Props(new FilterStage())
def props(implicit blobIdFactory:BlobId.Factory) = Props(new FilterStage())
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.james.queue.pulsar

import org.apache.james.blob.api.BlobId
import org.apache.james.blob.mail.MimeMessagePartsId
import org.apache.james.queue.pulsar.EnqueueId.EnqueueId
import org.apache.mailet.Mail
Expand Down Expand Up @@ -80,4 +81,12 @@ private[pulsar] case class MailMetadata(enqueueId: String,
remoteHost: String,
perRecipientHeaders: Map[String, Iterable[Header]],
headerBlobId: String,
bodyBlobId: String)
bodyBlobId: String){

def partsId(implicit blobIdFactory: BlobId.Factory): MimeMessagePartsId =
MimeMessagePartsId.builder()
.headerBlobId(blobIdFactory.from(headerBlobId))
.bodyBlobId(blobIdFactory.from(bodyBlobId))
.build()

}
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,16 @@ class PulsarMailQueue(

private val enqueueBufferSize = 10
private val requeueBufferSize = 10
private val awaitTimeout = 10.seconds

gaugeRegistry.register(QUEUE_SIZE_METRIC_NAME_PREFIX + name, () => getSize)
private val dequeueMetrics = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString)
private val enqueueMetric = metricFactory.generate(ENQUEUED_METRIC_NAME_PREFIX + name.asString)

private implicit val implicitSystem: ActorSystem = system
private implicit val ec: ExecutionContextExecutor = system.dispatcher
private implicit val client = PulsarClient(config.brokerUri)
private implicit val implicitBlobIdFactory: BlobId.Factory = blobIdFactory
private implicit val client: PulsarAsyncClient = PulsarClient(config.brokerUri)
private val admin = {
val builder = PulsarAdmin.builder()
builder.serviceHttpUrl(config.adminUri).build()
Expand Down Expand Up @@ -221,20 +223,18 @@ class PulsarMailQueue(
.map(message =>
(Json.fromJson[MailMetadata](Json.parse(message.message.value)).get,
message)
).ask[(Option[MailMetadata], CommittableMessage[String])](filterStage)
).ask[(Option[MailMetadata], Option[MimeMessagePartsId], CommittableMessage[String])](filterStage)
.flatMapConcat {
case (None, committableMessage) =>
case (None, Some(partsId), committableMessage) =>
committableMessage.ack()
Source.empty
case (Some(metadata), committableMessage) =>
val partsId = MimeMessagePartsId.builder()
.headerBlobId(blobIdFactory.from(metadata.headerBlobId))
.bodyBlobId(blobIdFactory.from(metadata.bodyBlobId))
.build()
deleteMimeMessage(partsId)
.flatMapConcat(_ => Source.empty)
case (Some(metadata), _, committableMessage) =>
val partsId = metadata.partsId
Source
.fromPublisher(readMimeMessage(partsId))
.map(message => (readMail(metadata, message), committableMessage))
}.map { case (mail, message) => new PulsarMailQueueItem(mail, message) }
.map(message => (readMail(metadata, message), partsId, committableMessage))
}.map { case (mail, partsId, message) => new PulsarMailQueueItem(mail, partsId, message) }
.map(mailQueueItemDecoratorFactory.decorate(_, name))

.alsoTo(counter)
Expand All @@ -251,15 +251,16 @@ class PulsarMailQueue(
.toMat(Sink.asPublisher[MailQueue.MailQueueItem](true).withAttributes(Attributes.inputBuffer(initial = 1, max = 1)))(Keep.both)
}

class PulsarMailQueueItem(mail: Mail, message: CommittableMessage[String]) extends MailQueueItem {
class PulsarMailQueueItem(mail: Mail, partsId: MimeMessagePartsId, message: CommittableMessage[String]) extends MailQueueItem {
override val getMail: Mail = mail

override def done(success: Boolean): Unit = {
if (success) {
dequeueMetrics.increment()
message.ack(cumulative = false)
Await.ready(message.ack(cumulative = false), awaitTimeout)
deleteMimeMessage(partsId).run()
} else {
message.nack()
Await.ready(message.nack(), awaitTimeout)
}
}
}
Expand Down Expand Up @@ -333,7 +334,7 @@ class PulsarMailQueue(
private def syncEnqueue(mail: Mail, delay: Duration): Unit = {
metricFactory.decorateSupplierWithTimerMetric(
ENQUEUED_TIMER_METRIC_NAME_PREFIX + name.asString,
() => Await.result(internalEnqueue(mail, delay), 10.second)
() => Await.result(internalEnqueue(mail, delay), awaitTimeout)
)
}

Expand Down Expand Up @@ -453,7 +454,7 @@ class PulsarMailQueue(
)

if (reader.hasMessageAvailable)
reader.next(10.seconds)
reader.next(awaitTimeout)
else
None
}
Expand Down Expand Up @@ -584,4 +585,16 @@ class PulsarMailQueue(
throw new MailQueue.MailQueueException("Error while reading blob", e)
}

private def deleteMimeMessage(partsId: MimeMessagePartsId): Source[Void, NotUsed] = {
def doDelete() =
try {
mimeMessageStore.delete(partsId)
} catch {
case e: MessagingException =>
throw new MailQueue.MailQueueException("Error while deleting blob", e)
}

Source.fromPublisher(doDelete())
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ class PulsarMailQueueFactory @Inject()(config: PulsarConfiguration,
private val system: ActorSystem = ActorSystem("pulsar-mailqueue")

@PreDestroy
def stop(): Unit = system.terminate()
def stop(): Unit = {
queues.getAndUpdate(map => {
map.values.foreach(_.close())
map.empty
})
system.terminate()
}

override def getQueue(name: MailQueueName, count: MailQueueFactory.PrefetchCount): Optional[PulsarMailQueue] = {
Try(admin.topics().getInternalInfo(s"persistent://${config.namespace.asString}/James-${name.asString()}")).toOption.map(_ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@

import static org.apache.james.queue.api.Mails.defaultMail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.List;
import java.util.Objects;
import java.util.stream.IntStream;

import javax.mail.MessagingException;
Expand All @@ -48,8 +49,8 @@
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.api.ManageableMailQueueContract;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
import org.apache.james.queue.pulsar.PulsarMailQueue;
import org.apache.james.server.blob.deduplication.PassThroughBlobStore;
import org.apache.mailet.Mail;
import org.apache.mailet.base.MailAddressFixture;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -79,13 +80,14 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
private MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem;
private PulsarConfiguration config;
private ActorSystem system;
private MemoryBlobStoreDAO memoryBlobStore;

@BeforeEach
void setUp(DockerPulsarExtension.DockerPulsar pulsar, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) {
this.metricTestSystem = metricTestSystem;
blobIdFactory = new HashBlobId.Factory();

MemoryBlobStoreDAO memoryBlobStore = new MemoryBlobStoreDAO();
memoryBlobStore = new MemoryBlobStoreDAO();
PassThroughBlobStore blobStore = new PassThroughBlobStore(memoryBlobStore, BucketName.DEFAULT, blobIdFactory);
MimeMessageStore.Factory mimeMessageStoreFactory = new MimeMessageStore.Factory(blobStore);
mimeMessageStore = mimeMessageStoreFactory.mimeMessageStore();
Expand Down Expand Up @@ -183,6 +185,55 @@ void ensureThatDeletionApplyOnBrowsingBothInstancesWithTwoInstancesOfMailQueue(D
.containsExactly("namez");
}

@Test
void queueShouldRemoveMailFromStoreOnAcknowledgedDequeue() throws Exception {
String expectedName = "name";
enQueue(defaultMail()
.name(expectedName)
.build());

MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
mailQueueItem.done(true);

assertThat(mailQueueItem.getMail().getName())
.isEqualTo(expectedName);

Awaitility.await().untilAsserted(() -> assertThatStoreIsEmpty());
}

@Test
void removeShouldRemoveMailFromStoreWhenFilteredOut() throws Exception {
enQueue(defaultMail()
.name("name1")
.build());
enQueue(defaultMail()
.name("name2")
.build());

getManageableMailQueue().remove(ManageableMailQueue.Type.Name, "name2");

awaitRemove();

assertThat(getManageableMailQueue().browse())
.toIterable()
.extracting(ManageableMailQueue.MailQueueItemView::getMail)
.extracting(Mail::getName)
.containsExactly("name1");

MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
mailQueueItem.done(true);
Awaitility.await().untilAsserted(() -> assertThatStoreIsEmpty());
}

private void assertThatStoreIsEmpty() {
var blobIds = Flux.from(memoryBlobStore.listBlobs(BucketName.DEFAULT))
.map(Objects::toString)
.collectList()
.defaultIfEmpty(List.of())
.block();
assertThat(blobIds).isEmpty();
}

@Disabled("this guarantee is too strong for Pulsar implementation and doesn't match any domain requirement")
@Override
public void flushShouldPreserveBrowseOrder() {
Expand Down

0 comments on commit 81223ef

Please sign in to comment.