This is akka-stream API for zmq. Currently it only supports receiving data via unidirectional ZMQ sockets of types:
- ZMQ.PULL
- ZMQ.SUB
- Scala 2.11.11+
- Scala 2.12.3+
See jeromq documentation.
Add the following settings to your build.sbt
:
libraryDependencies += "io.github.2gis" %% "reactive-zmq" % "0.5.0"
Create zmq context and Source
:
import org.zeromq.ZMQ
val context = ZMQ.context(1)
val source = ZMQSource(context,
mode = ZMQ.PULL,
timeout = 1 second,
addresses = List("tcp://127.0.0.1:12345")
)
Now you may use source
in your graphs:
implicit val as = ActorSystem()
implicit val m = ActorMaterializer()
source
.map { x: ByteString => println(x); x }
.to(Sink.ignore)
.run()
Full example is available here
To stop the Source
you should use the materialized Control
object:
val (control, finish) = source
.map { x: ByteString => println(x); x }
.toMat(Sink.ignore)(Keep.both)
.run()
The Control
object exposes a gracefulStop
method that closes an underlying ZMQ socket and completes the Source
:
val stopFuture: Future[Unit] = control.gracefulStop()
implicit val ec = as.dispatcher
Future.sequence(Seq(stopFuture, finish)).onComplete { _ =>
as.terminate()
context.close()
}
Add the following settings to your build.sbt
to use a SNAPSHOT version:
resolvers += "Sonatype OSS Snapshots" at "https://s01.oss.sonatype.org/content/repositories/snapshots"
libraryDependencies += "io.github.2gis" %% "reactive-zmq" % "0.6.0-SNAPSHOT"