-
Notifications
You must be signed in to change notification settings - Fork 1
/
Kafka4mApp.scala
115 lines (99 loc) · 3.54 KB
/
Kafka4mApp.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package kafka4m
import java.nio.file.Path
import args4c.ConfigApp
import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging
import kafka4m.io.{Base64Writer, FileSource}
import kafka4m.partitions.TimeBucket
import kafka4m.util.{Metrics, Schedulers}
import monix.execution.{Cancelable, CancelableFuture, Scheduler}
import monix.reactive.{Consumer, Observable}
/**
* An ETL entry point to read data into or out of kafka
*/
object Kafka4mApp extends ConfigApp with StrictLogging {
override type Result = Cancelable
// a noddy little hack. the first argument to our main method is an action, which
// we set in the global space here and then invoke in
private var action: Action = null
override def main(args: Array[String]) = {
if (mainDelegate(args).isEmpty) {
println(s"Usage: Kafka4m [read | write] (args...)")
sys.exit(1)
}
}
def mainDelegate(args: Array[String]): Option[Cancelable] = {
args.headOption match {
case Some("read") =>
action = Read
runMain(args.tail)
case Some("write") =>
action = Write
runMain(args.tail)
case _ => None
}
}
def startupLog(action: String, config: Config): String = {
val kafka4mConf = config
.getConfig("kafka4m")
.summary()
.linesIterator
.map { line =>
s"\tkafka4m.${line}"
}
.mkString("\n")
s"Running $action with: \n${kafka4mConf}\n\n"
}
private sealed trait Action {
def run(config: Config): Cancelable
}
private case object Read extends Action {
override def run(config: Config): CancelableFuture[Unit] = {
val s = Schedulers.io()
val paths = readFromKafka(config)(s)
paths.foreach {
case (bucket, dir) =>
logger.info(s"Wrote $bucket to $dir")
}(s)
}
}
private case object Write extends Action {
override def run(config: Config): Cancelable = {
val s = Schedulers.io()
val (reportC, fut) = writeToKafka(config)(s)
Cancelable.collection(reportC, fut)
}
}
/** Read data from kafka to a local disk
* @param config the kafka4m root configuration
* @param scheduler
* @return an observable of the buckets and paths written
*/
def readFromKafka(config: Config)(implicit scheduler: Scheduler): Observable[(TimeBucket, Path)] = {
Base64Writer(config).partition(kafka4m.read(config))
}
private def reportThroughput(perSecond: Int, total: Long): Unit = {
logger.info(s"Wrote $perSecond / second, $total total")
}
/** write data into kafka using the 'kafka4m.etl.intoKafka' config entry
* @param config the root configuration
* @param scheduler
* @return
*/
def writeToKafka(config: Config)(implicit scheduler: Scheduler): (Cancelable, CancelableFuture[Long]) = {
val data: Observable[(String, Array[Byte])] = FileSource(config)
val metrics = new Metrics(reportThroughput)
val future = writeToKafka(config, data.doOnNext(_ => metrics.incThroughput))(scheduler)
val reporter = metrics.start(scheduler)
(reporter, future)
}
def writeToKafka(config: Config, data: Observable[(String, Array[Byte])])(implicit scheduler: Scheduler): CancelableFuture[Long] = {
val writer: Consumer[(String, Array[Byte]), Long] = kafka4m.writeKeyAndBytes(config)
data.consumeWith(writer).runToFuture
}
override def run(config: Config): Cancelable = {
require(action != null, "Hack: This should really only be run via the 'main' entry point.")
logger.info(startupLog(action.toString, config))
action.run(config)
}
}