Skip to content

Commit

Permalink
akka-zeromq: Add support for the rest of the socket configurations
Browse files Browse the repository at this point in the history
While at it, add support for accessing values of 'ZMQ_FD' and
'ZMQ_RCVMORE' socket options. Note that accessing 'ZMQ_HWM' is not
currently supported as the implementation of the JZMQ like API is
missing an accessor method. This will be fixed in the next version of
the ZeroMQ Scala Binding. Also, remove some of the trailing whitespaces.

Signed-off-by: Karim Osman <karim@iki.fi>
  • Loading branch information
kro committed Dec 5, 2011
1 parent d160ee4 commit 7fa972e
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 6 deletions.
88 changes: 82 additions & 6 deletions akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,100 @@ private[zeromq] class ConcurrentSocketActor(params: SocketParameters, dispatcher
case Send(frames) =>
sendFrames(frames)
pollAndReceiveFrames()
case ZMQMessage(frames) =>
case ZMQMessage(frames) =>
sendFrames(frames)
pollAndReceiveFrames()
case Connect(endpoint) =>
socket.connect(endpoint)
notifyListener(Connecting)
case Bind(endpoint) =>
case Bind(endpoint) =>
socket.bind(endpoint)
case Subscribe(topic) =>
case Subscribe(topic) =>
socket.subscribe(topic.toArray)
case Unsubscribe(topic) =>
case Unsubscribe(topic) =>
socket.unsubscribe(topic.toArray)
case ReceiveTimeout =>
pollAndReceiveFrames()
case Linger(value) =>
socket.setLinger(value)
case Linger =>
self.reply(socket.getLinger)
case ReconnectIVL =>
self.reply(socket.getReconnectIVL)
case ReconnectIVL(value) =>
socket.setReconnectIVL(value)
case Backlog =>
self.reply(socket.getBacklog)
case Backlog(value) =>
socket.setBacklog(value)
case ReconnectIVLMax =>
self.reply(socket.getReconnectIVLMax)
case ReconnectIVLMax(value) =>
socket.setReconnectIVLMax(value)
case MaxMsgSize =>
self.reply(socket.getMaxMsgSize)
case MaxMsgSize(value) =>
socket.setMaxMsgSize(value)
case SndHWM =>
self.reply(socket.getSndHWM)
case SndHWM(value) =>
socket.setSndHWM(value)
case RcvHWM =>
self.reply(socket.getRcvHWM)
case RcvHWM(value) =>
socket.setRcvHWM(value)
/* case HWM =>
self.reply(socket.getHWM) */
case HWM(value) =>
socket.setHWM(value)
case Swap =>
self.reply(socket.getSwap)
case Swap(value) =>
socket.setSwap(value)
case Affinity =>
self.reply(socket.getAffinity)
case Affinity(value) =>
socket.setAffinity(value)
case Identity =>
self.reply(socket.getIdentity)
case Identity(value) =>
socket.setIdentity(value)
case Rate =>
self.reply(socket.getRate)
case Rate(value) =>
socket.setRate(value)
case RecoveryInterval =>
self.reply(socket.getRecoveryInterval)
case RecoveryInterval(value) =>
socket.setRecoveryInterval(value)
case MulticastLoop =>
self.reply(socket.hasMulticastLoop)
case MulticastLoop(value) =>
socket.setMulticastLoop(value)
case MulticastHops =>
self.reply(socket.getMulticastHops)
case MulticastHops(value) =>
socket.setMulticastHops(value)
case ReceiveTimeOut =>
self.reply(socket.getReceiveTimeOut)
case ReceiveTimeOut(value) =>
socket.setReceiveTimeOut(value)
case SendTimeOut =>
self.reply(socket.getSendTimeOut)
case SendTimeOut(value) =>
socket.setSendTimeOut(value)
case SendBufferSize =>
self.reply(socket.getSendBufferSize)
case SendBufferSize(value) =>
socket.setSendBufferSize(value)
case ReceiveBufferSize =>
self.reply(socket.getReceiveBufferSize)
case ReceiveBufferSize(value) =>
socket.setReceiveBufferSize(value)
case ReceiveMore =>
self.reply(socket.hasReceiveMore)
case FileDescriptor =>
self.reply(socket.getFD)
}
override def preStart {
poller.register(socket, Poller.POLLIN)
Expand All @@ -49,7 +125,7 @@ private[zeromq] class ConcurrentSocketActor(params: SocketParameters, dispatcher
def sendBytes(bytes: Seq[Byte], flags: Int) {
socket.send(bytes.toArray, flags)
}
val iter = frames.iterator
val iter = frames.iterator
while (iter.hasNext) {
val payload = iter.next.payload
val flags = if (iter.hasNext) JZMQ.SNDMORE else 0
Expand Down Expand Up @@ -89,7 +165,7 @@ private[zeromq] class ConcurrentSocketActor(params: SocketParameters, dispatcher
if (listener.isShutdown)
self.stop
else
listener ! message
listener ! message
}
}
}
57 changes: 57 additions & 0 deletions akka-zeromq/src/main/scala/akka/zeromq/Requests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,60 @@ object ZMQMessage {

case class Linger(value: Long) extends SocketOption
object Linger extends SocketOptionQuery

case class ReconnectIVL(value: Long) extends SocketOption
object ReconnectIVL extends SocketOptionQuery

case class Backlog(value: Long) extends SocketOption
object Backlog extends SocketOptionQuery

case class ReconnectIVLMax(value: Long) extends SocketOption
object ReconnectIVLMax extends SocketOptionQuery

case class MaxMsgSize(value: Long) extends SocketOption
object MaxMsgSize extends SocketOptionQuery

case class SndHWM(value: Long) extends SocketOption
object SndHWM extends SocketOptionQuery

case class RcvHWM(value: Long) extends SocketOption
object RcvHWM extends SocketOptionQuery

case class HWM(value: Long) extends SocketOption
/* object HWM extends SocketOptionQuery */

case class Swap(value: Long) extends SocketOption
object Swap extends SocketOptionQuery

case class Affinity(value: Long) extends SocketOption
object Affinity extends SocketOptionQuery

case class Identity(value: Array[Byte]) extends SocketOption
object Identity extends SocketOptionQuery

case class Rate(value: Long) extends SocketOption
object Rate extends SocketOptionQuery

case class RecoveryInterval(value: Long) extends SocketOption
object RecoveryInterval extends SocketOptionQuery

case class MulticastLoop(value: Boolean) extends SocketOption
object MulticastLoop extends SocketOptionQuery

case class MulticastHops(value: Long) extends SocketOption
object MulticastHops extends SocketOptionQuery

case class ReceiveTimeOut(value: Long) extends SocketOption
object ReceiveTimeOut extends SocketOptionQuery

case class SendTimeOut(value: Long) extends SocketOption
object SendTimeOut extends SocketOptionQuery

case class SendBufferSize(value: Long) extends SocketOption
object SendBufferSize extends SocketOptionQuery

case class ReceiveBufferSize(value: Long) extends SocketOption
object ReceiveBufferSize extends SocketOptionQuery

object ReceiveMore extends SocketOptionQuery
object FileDescriptor extends SocketOptionQuery

4 comments on commit 7fa972e

@casualjim
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't requiring messages for the config opening the door to get unintentional errors.

For example the HWM must be set before connecting or binding the socket. when i then have code like this:

val socket = newSocket(config)
socket ! HWM(100L)
socket ! Bind("tcp://dkkdk:3394")

There are no guarantees that the message HWM will actually be processed before the Bind message unless you put the throughput of the dispatcher to 1.

I also have a question regarding the polling. Because there is a pollReceiveTimeout set the zeromq poller will block the actor for that amount of time and then iterate once to block again.
Having that poller in the same actor instead of in a future or a child actor doesn't that lead to messages you send to the socket actor are only processed once per poll-cycle (and with the amount of messages being processed per cycle not being larger than the throughput setting of the dispatcher) ?

@rkuhn
Copy link
Contributor

@rkuhn rkuhn commented on 7fa972e Dec 5, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The “no guarantees” sentence is not correct: Akka does not guarantee delivery of any one message, but in practice it cannot fail within the same JVM (if you are sensible enough to configure the JVM to exit upon OutOfMemoryError).

BUT: messages are never re-ordered, and this does not depend on the throughput setting!

@viktorklang
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What? order of messages are retained on a per-sender basis

@kro
Copy link
Author

@kro kro commented on 7fa972e Dec 5, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, HWM will be processed before Bind in your code.

Please sign in to comment.