/
RabbitMq.scala
116 lines (93 loc) · 3.38 KB
/
RabbitMq.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package com.softwaremill.mqperf.mq
import java.util.concurrent.ConcurrentLinkedQueue
import com.rabbitmq.client._
import com.softwaremill.mqperf.config.TestConfig
import scala.annotation.tailrec
class RabbitMq(testConfig: TestConfig) extends Mq {
private val QueueName = "quorum.mqperf"
private val cf = new ConnectionFactory()
cf.setHost(testConfig.brokerHosts.head)
cf.setUsername("guest")
cf.setPassword("guest")
private val conn = cf.newConnection()
override type MsgId = Long
def newChannel(): Channel = {
val channel = conn.createChannel()
channel.queueDeclarePassive(QueueName)
channel
}
override def close() {
conn.close()
}
override def createSender() =
new MqSender {
private val channel = newChannel()
channel.confirmSelect() // publisher acks
override def send(msgs: List[String]) {
msgs.foreach { msg =>
channel.basicPublish("", QueueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes)
}
channel.waitForConfirms()
}
}
override def createReceiver() =
new MqReceiver {
private val channel = newChannel()
channel.basicQos(testConfig.mqConfig.getInt("qos"), false) // fair dispatch - up to `qos` unack can be received
// unbounded queue - but we'll only get up to `qos` entries
private val queue = new ConcurrentLinkedQueue[(MsgId, String)]()
private val consumer = new DefaultConsumer(channel) {
override def handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: Array[Byte]
): Unit = {
queue.add((envelope.getDeliveryTag, new String(body, "UTF-8")))
}
}
channel.basicConsume(QueueName, false, consumer)
override def receive(maxMsgCount: Int): List[(Long, String)] = {
@tailrec
def doReceive(acc: List[(MsgId, String)], count: Int): List[(MsgId, String)] = {
if (count == 0) {
acc
} else {
nextMessageFromQueue(waitForMsgs = acc.isEmpty) match {
case None => acc
case Some(m) => doReceive(m :: acc, count - 1)
}
}
}
def nextMessageFromQueue(waitForMsgs: Boolean): Option[(MsgId, String)] = {
@tailrec
def doPoll(waitIterations: Int): Option[(MsgId, String)] = {
val next = queue.poll()
if (next == null) {
if (waitIterations > 0) {
Thread.sleep(100L)
doPoll(waitIterations - 1)
} else None
} else Some(next)
}
doPoll(if (waitForMsgs) 10 else 0)
}
doReceive(Nil, maxMsgCount)
}
private val multipleAck = testConfig.mqConfig.getBoolean("multiple_ack")
override def ack(ids: List[MsgId]): Unit = {
if (multipleAck) {
if (ids.nonEmpty) {
// as on optimization, we acknowledge multiple messages at once (http://www.rabbitmq.com/confirms.html)
// This works as delivery tags are channel scoped (and we use one channel per receiver), and we know
// that all messages in a batch are acknowledged at once.
channel.basicAck(ids.max, true)
}
} else {
ids.foreach { id =>
channel.basicAck(id, false)
}
}
}
}
}