/
SenderComplete.scala
37 lines (27 loc) · 1.21 KB
/
SenderComplete.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.softwaremill.reactive.complete
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString
import com.softwaremill.reactive._
import scala.concurrent.duration._
object SenderComplete extends App with Logging {
implicit val system = ActorSystem()
val serverConnection = Tcp().outgoingConnection("localhost", 9181)
val projectDir = System.getProperty("user.dir")
val getLines = () => scala.io.Source.fromFile(s"$projectDir/data/2008.csv").getLines()
val linesSource = Source(getLines).map { line => ByteString(line + "\n") }
val logCompleteSink = Sink.onComplete(r => logger.info("Completed with: " + r))
val graph = FlowGraph.closed() { implicit b =>
import FlowGraph.Implicits._
val broadcast = b.add(Broadcast[ByteString](2))
val logWindowFlow = Flow[ByteString]
.groupedWithin(10000, 1.seconds)
.map(group => group.map(_.size).foldLeft(0)(_ + _))
.map(groupSize => logger.info(s"Sent $groupSize bytes"))
linesSource ~> broadcast ~> serverConnection ~> logCompleteSink
broadcast ~> logWindowFlow ~> Sink.ignore
}
implicit val mat = ActorFlowMaterializer()
graph.run()
}