-
Notifications
You must be signed in to change notification settings - Fork 1
/
TextAppenderObserver.scala
146 lines (127 loc) · 4.32 KB
/
TextAppenderObserver.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package kafka4m.io
import java.io.{BufferedWriter, FileOutputStream, OutputStreamWriter}
import java.nio.file.{Files, Path}
import java.time.ZonedDateTime
import java.util.Base64
import cats.Show
import com.typesafe.scalalogging.StrictLogging
import kafka4m.partitions._
import kafka4m.{Bytes, Key}
import monix.execution.Ack
import monix.reactive.{Notification, Observable, Observer}
import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.concurrent.Future
object TextAppenderObserver {
private val KeyValueR = """(.*?):(.*)""".r
/**
* The 'show' for byte arrays is written to files as:
* {{{
* <key>:<base 64>
* }}}
*
* This extractor will parse a line as taken from an appended file
*/
object Base64Line {
def unapply(line: String): Option[(String, Array[Byte])] = {
line match {
case KeyValueR(key, value) => Option(key -> Base64.getDecoder.decode(value))
case _ => None
}
}
}
implicit object ShowRecord extends Show[ConsumerRecord[Key, Bytes]] {
override def show(record: ConsumerRecord[Key, Bytes]): String = {
val dataString = Base64.getEncoder.encodeToString(record.value)
s"${record.partition}_${record.offset}_${record.timestamp}_${record.timestampType}:${dataString}"
}
}
def fromEvents[A: HasTimestamp: Show](dir: Path, flushEvery: Int, appendEvents: Observable[AppendEvent[A]]): Observable[(TimeBucket, Path)] = {
appendEvents
.scan(GroupState[A](dir, flushEvery, Map.empty) -> Seq.empty[Notification[(TimeBucket, Path)]]) {
case ((st8, _), next) => st8.update(next)
}
.flatMap {
case (_, notifications) => Observable.fromIterable(notifications)
}
.dematerialize
}
private case class GroupState[A: HasTimestamp: Show](dir: Path, flushEvery: Int, byBucket: Map[TimeBucket, TextAppenderObserver]) {
private val NoOp = (this, Nil)
def update(event: AppendEvent[A]): (GroupState[A], Seq[Notification[(TimeBucket, Path)]]) = {
event match {
case AppendData(bucket, data) =>
val text = Show[A].show(data)
byBucket.get(bucket) match {
case Some(appender) =>
appender.appendLine(text)
NoOp
case None =>
val file = dir.resolve(bucket.asFileName(HasTimestamp[A].timestamp(data)))
val appender = new TextAppenderObserver(file, flushEvery)
appender.appendLine(text)
copy(byBucket = byBucket.updated(bucket, appender)) -> Nil
}
case ForceFlushBuckets() =>
val onNexts = byBucket.map {
case (bucket, appender) =>
appender.close()
Notification.OnNext(bucket -> appender.file)
}
copy(byBucket = Map.empty) -> (onNexts.toSeq :+ Notification.OnComplete)
case FlushBucket(bucket) =>
byBucket.get(bucket) match {
case Some(appender) =>
appender.close()
copy(byBucket = byBucket - bucket) -> Seq(Notification.OnNext(bucket -> appender.file))
case None =>
NoOp
}
}
}
}
}
class TextAppenderObserver(val file: Path, flushEvery: Int = 10) extends Observer[String] with AutoCloseable with StrictLogging {
require(flushEvery >= 0)
if (!Files.exists(file)) {
Files.createFile(file)
}
private val writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file.toFile, false)))
private var flushCount = flushEvery
private var written = 0
private var closed = false
def appendLine(line: String): Unit = {
if (written > 0) {
writer.newLine()
}
writer.write(line)
written = written + 1
flushCount = flushCount - 1
if (flushCount <= 0) {
flush()
}
}
def flush(): Unit = {
flushCount = flushEvery
writer.flush()
}
override def onNext(elem: String): Future[Ack] = {
appendLine(elem)
Ack.Continue
}
override def onError(ex: Throwable): Unit = {
logger.error(s"error: $ex", ex)
close()
}
override def onComplete(): Unit = {
logger.info("onComplete")
close()
}
def isClosed(): Boolean = closed
def size(): Int = written
override def close(): Unit = {
logger.info(s"closing ${file}")
writer.flush()
writer.close()
closed = true
}
}