-
Notifications
You must be signed in to change notification settings - Fork 3
/
DataPublisher.scala
51 lines (38 loc) · 1.18 KB
/
DataPublisher.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package pl.zuchos.example.actors
import akka.actor.Actor
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import pl.zuchos.example.actors.DataPublisher.Publish
import scala.collection.mutable
import scala.util.{Failure, Success}
class DataPublisher(val bufferSize: Int) extends ActorPublisher[Data] {
if (bufferSize <= 0) throw new IllegalArgumentException("Buffer should be positive number...")
var queue: mutable.Queue[Data] = mutable.Queue()
override def receive: Actor.Receive = {
case Publish(s) =>
cacheIfPossible(s)
case Request(cnt) =>
publishIfNeeded()
case Cancel => context.stop(self)
case _ =>
}
private def cacheIfPossible(s: Data) {
if (queue.length == bufferSize) {
sender() ! Failure(new BufferOverflow)
} else {
queue.enqueue(s)
sender() ! Success()
publishIfNeeded()
}
}
def publishIfNeeded() = {
while (queue.nonEmpty && isActive && totalDemand > 0) {
onNext(queue.dequeue())
}
}
}
class BufferOverflow extends Exception
object DataPublisher {
case class Publish(data: Data)
}
case class Data(sender: String, body: String)