-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
FileOutputStage.scala
107 lines (93 loc) · 3.04 KB
/
FileOutputStage.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/*
* Copyright (C) 2019-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl.io
import java.nio.channels.FileChannel
import java.nio.file.{ OpenOption, Path }
import scala.collection.immutable
import scala.concurrent.{ Future, Promise }
import scala.util.Success
import scala.util.control.NonFatal
import akka.annotation.InternalApi
import akka.stream.{
AbruptStageTerminationException,
Attributes,
IOOperationIncompleteException,
IOResult,
Inlet,
SinkShape
}
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler }
import akka.util.ByteString
import akka.util.ccompat.JavaConverters._
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class FileOutputStage(path: Path, startPosition: Long, openOptions: immutable.Set[OpenOption])
extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[IOResult]] {
val in: Inlet[ByteString] = Inlet("FileSink")
override def shape: SinkShape[ByteString] = SinkShape(in)
override def initialAttributes: Attributes = DefaultAttributes.fileSink
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[IOResult]) = {
val mat = Promise[IOResult]()
val logic = new GraphStageLogic(shape) with InHandler {
private var chan: FileChannel = _
private var bytesWritten: Long = 0
override def preStart(): Unit = {
try {
chan = FileChannel.open(path, openOptions.asJava)
if (startPosition > 0) {
chan.position(startPosition)
}
pull(in)
} catch {
case NonFatal(t) =>
closeFile(Some(new IOOperationIncompleteException(bytesWritten, t)))
failStage(t)
}
}
override def onPush(): Unit = {
val next = grab(in)
try {
bytesWritten += chan.write(next.asByteBuffer)
pull(in)
} catch {
case NonFatal(t) =>
closeFile(Some(new IOOperationIncompleteException(bytesWritten, t)))
failStage(t)
}
}
override def onUpstreamFailure(t: Throwable): Unit = {
closeFile(Some(new IOOperationIncompleteException(bytesWritten, t)))
failStage(t)
}
override def onUpstreamFinish(): Unit = {
closeFile(None)
completeStage()
}
override def postStop(): Unit = {
if (!mat.isCompleted) {
val failure = new AbruptStageTerminationException(this)
closeFile(Some(failure))
mat.tryFailure(failure)
}
}
private def closeFile(failed: Option[Throwable]): Unit = {
try {
if (chan ne null) chan.close()
failed match {
case Some(t) => mat.tryFailure(t)
case None => mat.tryComplete(Success(IOResult(bytesWritten)))
}
} catch {
case NonFatal(t) =>
mat.tryFailure(failed.getOrElse(t))
}
}
setHandler(in, this)
}
(logic, mat.future)
}
}