/
InputStreamPublisher.scala
88 lines (67 loc) · 2.42 KB
/
InputStreamPublisher.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
* Copyright (C) 2015-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl.io
import java.io.InputStream
import akka.Done
import akka.actor.{ ActorLogging, DeadLetterSuppression, Deploy, Props }
import akka.annotation.InternalApi
import akka.stream.actor.ActorPublisherMessage
import akka.stream.IOResult
import akka.util.ByteString
import scala.concurrent.Promise
import scala.util.{ Failure, Success }
/** INTERNAL API */
@InternalApi private[akka] object InputStreamPublisher {
def props(is: InputStream, completionPromise: Promise[IOResult], chunkSize: Int): Props = {
require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)")
Props(classOf[InputStreamPublisher], is, completionPromise, chunkSize).withDeploy(Deploy.local)
}
private final case object Continue extends DeadLetterSuppression
}
/** INTERNAL API */
@InternalApi private[akka] class InputStreamPublisher(is: InputStream, completionPromise: Promise[IOResult], chunkSize: Int)
extends akka.stream.actor.ActorPublisher[ByteString]
with ActorLogging {
// TODO possibly de-duplicate with FilePublisher?
import InputStreamPublisher._
val arr = new Array[Byte](chunkSize)
var readBytesTotal = 0L
def receive = {
case ActorPublisherMessage.Request(elements) ⇒ readAndSignal()
case Continue ⇒ readAndSignal()
case ActorPublisherMessage.Cancel ⇒ context.stop(self)
}
def readAndSignal(): Unit =
if (isActive) {
readAndEmit()
if (totalDemand > 0 && isActive) self ! Continue
}
def readAndEmit(): Unit = if (totalDemand > 0) try {
// blocking read
val readBytes = is.read(arr)
readBytes match {
case -1 ⇒
// had nothing to read into this chunk
log.debug("No more bytes available to read (got `-1` from `read`)")
onCompleteThenStop()
case _ ⇒
readBytesTotal += readBytes
// emit immediately, as this is the only chance to do it before we might block again
onNext(ByteString.fromArray(arr, 0, readBytes))
}
} catch {
case ex: Exception ⇒
onErrorThenStop(ex)
}
override def postStop(): Unit = {
super.postStop()
try {
if (is ne null) is.close()
} catch {
case ex: Exception ⇒
completionPromise.success(IOResult(readBytesTotal, Failure(ex)))
}
completionPromise.trySuccess(IOResult(readBytesTotal, Success(Done)))
}
}