Skip to content

Commit

Permalink
FIFO: Merge + support outputting fifo related attributes in receive r…
Browse files Browse the repository at this point in the history
…equests
  • Loading branch information
simong committed Jun 12, 2018
1 parent 53abd29 commit eef10a7
Show file tree
Hide file tree
Showing 17 changed files with 507 additions and 336 deletions.
19 changes: 9 additions & 10 deletions core/src/main/scala/org/elasticmq/QueueData.scala
Expand Up @@ -3,15 +3,14 @@ package org.elasticmq
import org.joda.time.{Duration, DateTime}

case class QueueData(name: String,
defaultVisibilityTimeout: MillisVisibilityTimeout,
delay: Duration,
receiveMessageWait: Duration,
created: DateTime,
lastModified: DateTime,
deadLettersQueue: Option[DeadLettersQueueData] = None,
maxReceiveCount: Option[Int] = None,
isFifo: Boolean = false,
hasContentBasedDeduplication: Boolean = false
)
defaultVisibilityTimeout: MillisVisibilityTimeout,
delay: Duration,
receiveMessageWait: Duration,
created: DateTime,
lastModified: DateTime,
deadLettersQueue: Option[DeadLettersQueueData] = None,
maxReceiveCount: Option[Int] = None,
isFifo: Boolean = false,
hasContentBasedDeduplication: Boolean = false)

case class DeadLettersQueueData(name: String, maxReceiveCount: Int)
82 changes: 48 additions & 34 deletions core/src/main/scala/org/elasticmq/actor/queue/InternalMessage.scala
Expand Up @@ -5,20 +5,32 @@ import java.util.UUID
import scala.collection.mutable

import org.elasticmq.util.NowProvider
import org.elasticmq.{DeliveryReceipt, MessageAttribute, MessageData, MessageId, MessageStatistics, MillisNextDelivery, NeverReceived, NewMessageData, OnDateTimeReceived, QueueData, Received}
import org.elasticmq.{
DeliveryReceipt,
MessageAttribute,
MessageData,
MessageId,
MessageStatistics,
MillisNextDelivery,
NeverReceived,
NewMessageData,
OnDateTimeReceived,
QueueData,
Received
}
import org.joda.time.DateTime

case class InternalMessage(id: String,
deliveryReceipts: mutable.Buffer[String],
var nextDelivery: Long,
content: String,
messageAttributes: Map[String, MessageAttribute],
created: DateTime,
var firstReceive: Received,
var receiveCount: Int,
isFifo: Boolean,
messageGroupId: Option[String],
messageDeduplicationId: Option[String])
deliveryReceipts: mutable.Buffer[String],
var nextDelivery: Long,
content: String,
messageAttributes: Map[String, MessageAttribute],
created: DateTime,
var firstReceive: Received,
var receiveCount: Int,
isFifo: Boolean,
messageGroupId: Option[String],
messageDeduplicationId: Option[String])
extends Comparable[InternalMessage] {

// Priority queues have biggest elements first
Expand All @@ -34,10 +46,10 @@ case class InternalMessage(id: String,
}

/**
* Keep track of delivering this message to a client
*
* @param nextDeliveryMillis When this message should become available for its next delivery
*/
* Keep track of delivering this message to a client
*
* @param nextDeliveryMillis When this message should become available for its next delivery
*/
def trackDelivery(nextDeliveryMillis: MillisNextDelivery)(implicit nowProvider: NowProvider): Unit = {
deliveryReceipts += DeliveryReceipt.generate(MessageId(id)).receipt
nextDelivery = nextDeliveryMillis.millis
Expand All @@ -48,24 +60,26 @@ case class InternalMessage(id: String,
}
}

def toMessageData = MessageData(
MessageId(id),
deliveryReceipts.lastOption.map(DeliveryReceipt(_)),
content,
messageAttributes,
MillisNextDelivery(nextDelivery),
created,
MessageStatistics(firstReceive, receiveCount),
messageGroupId,
messageDeduplicationId)
def toMessageData =
MessageData(
MessageId(id),
deliveryReceipts.lastOption.map(DeliveryReceipt(_)),
content,
messageAttributes,
MillisNextDelivery(nextDelivery),
created,
MessageStatistics(firstReceive, receiveCount),
messageGroupId,
messageDeduplicationId
)

def toNewMessageData = NewMessageData(
Some(MessageId(id)),
content,
messageAttributes,
MillisNextDelivery(nextDelivery),
messageGroupId,
messageDeduplicationId)
def toNewMessageData =
NewMessageData(Some(MessageId(id)),
content,
messageAttributes,
MillisNextDelivery(nextDelivery),
messageGroupId,
messageDeduplicationId)

def deliverable(deliveryTime: Long): Boolean = nextDelivery <= deliveryTime
}
Expand All @@ -85,9 +99,9 @@ object InternalMessage {
0,
queueData.isFifo,
newMessageData.messageGroupId,
newMessageData.messageDeduplicationId)
newMessageData.messageDeduplicationId
)
}

private def generateId() = MessageId(UUID.randomUUID().toString)
}

173 changes: 89 additions & 84 deletions core/src/main/scala/org/elasticmq/actor/queue/MessageQueue.scala
Expand Up @@ -6,66 +6,68 @@ import scala.collection.mutable
sealed trait MessageQueue {

/**
* Add a message onto the queue. Note that this doesn't do any deduplication, that should've happened in an earlier
* step.
*
* @param message The message to add onto the queue
*/
* Add a message onto the queue. Note that this doesn't do any deduplication, that should've happened in an earlier
* step.
*
* @param message The message to add onto the queue
*/
def +=(message: InternalMessage): Unit

/**
* Get the messages indexed by their unique id
*
* @return The messages indexed by their id
*/
* Get the messages indexed by their unique id
*
* @return The messages indexed by their id
*/
def byId: Map[String, InternalMessage]

/**
* Drop all messages on the queue
*/
* Drop all messages on the queue
*/
def clear(): Unit

/**
* Remove the message with the given id
*
* @param messageId The id of the message to remove
*/
* Remove the message with the given id
*
* @param messageId The id of the message to remove
*/
def remove(messageId: String): Unit

/**
* Return a message queue where all the messages on the queue do not match the given predicate function
*
* @param p The predicate function to filter the message by. Any message that does not match the predicate will be
* retained on the new queue
* @return The new message queue
*/
* Return a message queue where all the messages on the queue do not match the given predicate function
*
* @param p The predicate function to filter the message by. Any message that does not match the predicate will be
* retained on the new queue
* @return The new message queue
*/
def filterNot(p: InternalMessage => Boolean): MessageQueue

/**
* Dequeues `count` messages from the queue
*
* @param count The number of messages to dequeue from the queue
* @param deliveryTime The timestamp from which messages should be available (usually, this is the current millis
* since epoch. It is useful to pass in a special value during the tests however.)
* @return The dequeued messages, if any
*/
* Dequeues `count` messages from the queue
*
* @param count The number of messages to dequeue from the queue
* @param deliveryTime The timestamp from which messages should be available (usually, this is the current millis
* since epoch. It is useful to pass in a special value during the tests however.)
* @return The dequeued messages, if any
*/
def dequeue(count: Int, deliveryTime: Long): List[InternalMessage]

/**
* Get the next available message on the given queue
*
* @param priorityQueue The queue for which to get the next available message. It's assumed the messages on this
* queue all belong to the same message group.
* @param deliveryTime The timestamp from which messages should be available
* @param accBatch An accumulator holding the messages that have already been retrieved.
* @param accMessage An accumulator holding the messages that have been dequeued from the priority queue and
* cannot be delivered. These messages should be put back on the queue before returning
* to the caller
* @return
*/
* Get the next available message on the given queue
*
* @param priorityQueue The queue for which to get the next available message. It's assumed the messages on this
* queue all belong to the same message group.
* @param deliveryTime The timestamp from which messages should be available
* @param accBatch An accumulator holding the messages that have already been retrieved.
* @param accMessage An accumulator holding the messages that have been dequeued from the priority queue and
* cannot be delivered. These messages should be put back on the queue before returning
* to the caller
* @return
*/
@tailrec
protected final def nextVisibleMessage(priorityQueue: mutable.PriorityQueue[InternalMessage], deliveryTime: Long,
accBatch: List[InternalMessage], accMessage: Seq[InternalMessage] = Seq.empty): Option[InternalMessage] = {
protected final def nextVisibleMessage(priorityQueue: mutable.PriorityQueue[InternalMessage],
deliveryTime: Long,
accBatch: List[InternalMessage],
accMessage: Seq[InternalMessage] = Seq.empty): Option[InternalMessage] = {
if (priorityQueue.nonEmpty) {
val msg = priorityQueue.dequeue()

Expand Down Expand Up @@ -100,15 +102,16 @@ sealed trait MessageQueue {

object MessageQueue {

def apply(isFifo: Boolean): MessageQueue = if (isFifo) {
new FifoMessageQueue
} else {
new SimpleMessageQueue
}
def apply(isFifo: Boolean): MessageQueue =
if (isFifo) {
new FifoMessageQueue
} else {
new SimpleMessageQueue
}

/**
* A "simple" straightforward message queue. The queue represents the common SQS behaviour
*/
* A "simple" straightforward message queue. The queue represents the common SQS behaviour
*/
class SimpleMessageQueue extends MessageQueue {
protected val messagesById: mutable.HashMap[String, InternalMessage] = mutable.HashMap.empty
protected val messageQueue: mutable.PriorityQueue[InternalMessage] = mutable.PriorityQueue.empty
Expand Down Expand Up @@ -146,15 +149,15 @@ object MessageQueue {
} else {
nextVisibleMessage(messageQueue, deliveryTime, acc) match {
case Some(msg) => dequeue0(count - 1, deliveryTime, acc :+ msg)
case None => acc
case None => acc
}
}
}
}

/**
* A FIFO queue that mimics SQS' FIFO queue implementation
*/
* A FIFO queue that mimics SQS' FIFO queue implementation
*/
class FifoMessageQueue extends SimpleMessageQueue {
private val messagesbyMessageGroupId = mutable.HashMap.empty[String, mutable.PriorityQueue[InternalMessage]]

Expand Down Expand Up @@ -199,18 +202,19 @@ object MessageQueue {
acc
} else {
dequeueFromFifo(acc, deliveryTime) match {
case Some(msg) => dequeue0(count -1, deliveryTime, acc :+ msg)
case None => acc
case Some(msg) => dequeue0(count - 1, deliveryTime, acc :+ msg)
case None => acc
}
}
}

/**
* Dequeue a message from the fifo queue. Try to dequeue a message from the same message group as the previous
* message before trying other message groups.
*/
private def dequeueFromFifo(accBatch: List[InternalMessage], deliveryTime: Long,
triedMessageGroups: Set[String] = Set.empty): Option[InternalMessage] = {
* Dequeue a message from the fifo queue. Try to dequeue a message from the same message group as the previous
* message before trying other message groups.
*/
private def dequeueFromFifo(accBatch: List[InternalMessage],
deliveryTime: Long,
triedMessageGroups: Set[String] = Set.empty): Option[InternalMessage] = {
val messageGroupIdHint = accBatch.lastOption.map(getMessageGroupIdUnsafe).filterNot(triedMessageGroups.contains)
messageGroupIdHint.orElse(randomMessageGroup(triedMessageGroups)).flatMap { messageGroupId =>
dequeueFromMessageGroup(messageGroupId, deliveryTime, accBatch)
Expand All @@ -219,10 +223,11 @@ object MessageQueue {
}

/**
* Try to dequeue a message from the given message group
*/
private def dequeueFromMessageGroup(messageGroupId: String, deliveryTime: Long,
accBatch: List[InternalMessage]): Option[InternalMessage] = {
* Try to dequeue a message from the given message group
*/
private def dequeueFromMessageGroup(messageGroupId: String,
deliveryTime: Long,
accBatch: List[InternalMessage]): Option[InternalMessage] = {
messagesbyMessageGroupId.get(messageGroupId) match {
case Some(priorityQueue) if priorityQueue.nonEmpty =>
val msg = nextVisibleMessage(priorityQueue, deliveryTime, accBatch)
Expand All @@ -237,38 +242,38 @@ object MessageQueue {
}

/**
* Return a message group id that has at least 1 message active on the queue and that is not part of the given set
* of `triedMessageGroupIds`
*
* @param triedMessageGroupIds The ids of message groups to ignore
* @return The id of a random message group that is not part of `triedMessageGroupIds`
*/
* Return a message group id that has at least 1 message active on the queue and that is not part of the given set
* of `triedMessageGroupIds`
*
* @param triedMessageGroupIds The ids of message groups to ignore
* @return The id of a random message group that is not part of `triedMessageGroupIds`
*/
private def randomMessageGroup(triedMessageGroupIds: Set[String]): Option[String] = {
val remainingMessageGroupIds = messagesbyMessageGroupId.keySet -- triedMessageGroupIds
remainingMessageGroupIds.headOption
}

/**
* Get the message group id from a given message. If the message has no message group id, an
* [[IllegalStateException]] will be thrown.
*
* @param msg The message to get the message group id for
* @return The message group id
* @throws IllegalStateException if the message has no message group id
*/
* Get the message group id from a given message. If the message has no message group id, an
* [[IllegalStateException]] will be thrown.
*
* @param msg The message to get the message group id for
* @return The message group id
* @throws IllegalStateException if the message has no message group id
*/
private def getMessageGroupIdUnsafe(msg: InternalMessage): String =
getMessageGroupIdUnsafe(msg.messageGroupId)

/**
* Get the message group id from an optional string. If the given optional string is empty, an
* [[IllegalStateException]] will be thrown
*
* @param messageGroupId The optional string
* @return The message group id
* @throws IllegalStateException if the optional string holds no message group id
*/
* Get the message group id from an optional string. If the given optional string is empty, an
* [[IllegalStateException]] will be thrown
*
* @param messageGroupId The optional string
* @return The message group id
* @throws IllegalStateException if the optional string holds no message group id
*/
private def getMessageGroupIdUnsafe(messageGroupId: Option[String]) =
messageGroupId.getOrElse(throw new IllegalStateException(
"Messages on a FIFO queue are required to have a message group id"))
messageGroupId.getOrElse(
throw new IllegalStateException("Messages on a FIFO queue are required to have a message group id"))
}
}

0 comments on commit eef10a7

Please sign in to comment.