Skip to content

Commit

Permalink
fixed comments, added missing lineparser, removed manual routing
Browse files Browse the repository at this point in the history
  • Loading branch information
Endre Sándor Varga committed Dec 17, 2014
1 parent 8bf52df commit 5dee3a9
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package docs.stream.cookbook

import akka.stream.scaladsl.{ Flow, HeadSink, Source }
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.util.ByteString

import scala.concurrent.Await
Expand All @@ -27,8 +27,8 @@ class RecipeByteStrings extends RecipeSpec {
private def emitChunkOrPull(ctx: Context[ByteString]): Directive = {
if (buffer.isEmpty) ctx.pull()
else {
val emit = buffer.take(chunkSize)
buffer = buffer.drop(chunkSize)
val (emit, nextBuffer) = buffer.splitAt(chunkSize)
buffer = nextBuffer
ctx.push(emit)
}
}
Expand All @@ -37,7 +37,7 @@ class RecipeByteStrings extends RecipeSpec {
//#bytestring-chunker

val chunksFuture = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9)))
.transform(() => new Chunker(2)).grouped(10).runWith(HeadSink())
.transform(() => new Chunker(2)).grouped(10).runWith(Sink.head)

val chunks = Await.result(chunksFuture, 3.seconds)

Expand Down Expand Up @@ -65,11 +65,11 @@ class RecipeByteStrings extends RecipeSpec {

val limiter = Flow[ByteString].transform(() => new ByteLimiter(9))

Await.result(bytes1.via(limiter).grouped(10).runWith(HeadSink()), 3.seconds)
Await.result(bytes1.via(limiter).grouped(10).runWith(Sink.head), 3.seconds)
.fold(ByteString())(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9))

an[IllegalStateException] must be thrownBy {
Await.result(bytes2.via(limiter).grouped(10).runWith(HeadSink()), 3.seconds)
Await.result(bytes2.via(limiter).grouped(10).runWith(Sink.head), 3.seconds)
}
}

Expand All @@ -81,7 +81,7 @@ class RecipeByteStrings extends RecipeSpec {
val compacted: Source[ByteString] = data.map(_.compact)
//#compacting-bytestrings

Await.result(compacted.grouped(10).runWith(HeadSink()), 3.seconds).forall(_.isCompact) should be(true)
Await.result(compacted.grouped(10).runWith(Sink.head), 3.seconds).forall(_.isCompact) should be(true)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class RecipeCollectingMetrics extends RecipeSpec {
val tickPub = PublisherProbe[Tick]()
val reportTicks = Source(tickPub)
val loadUpdates = Source(loadPub)
val futureSink = HeadSink[immutable.Seq[String]]()
val futureSink = Sink.head[immutable.Seq[String]]
val sink = Flow[String].grouped(10).to(futureSink)

//#periodic-metrics-collection
Expand Down Expand Up @@ -54,6 +54,8 @@ class RecipeCollectingMetrics extends RecipeSpec {
manualLoad.sendNext(44)
manualLoad.sendNext(54)
manualLoad.sendNext(78)
Thread.sleep(500)

manualTick.sendNext(())

manualTick.sendComplete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package docs.stream.cookbook

import java.security.MessageDigest

import akka.stream.scaladsl.{ HeadSink, Source }
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.ByteString

import scala.concurrent.Await
Expand Down Expand Up @@ -43,7 +43,7 @@ class RecipeDigest extends RecipeSpec {
val digest: Source[ByteString] = data.transform(() => digestCalculator("SHA-256"))
//#calculating-digest

Await.result(digest.runWith(HeadSink()), 3.seconds) should be(
Await.result(digest.runWith(Sink.head), 3.seconds) should be(
ByteString(
0x24, 0x8d, 0x6a, 0x61,
0xd2, 0x06, 0x38, 0xb8,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class RecipeDroppyBroadcast extends RecipeSpec {
val sub2 = SubscriberProbe[Int]()
val mySink1 = Sink(sub1)
val mySink2 = Sink(sub2)
val futureSink = HeadSink[Seq[Int]]()
val futureSink = Sink.head[Seq[Int]]
val mySink3 = Flow[Int].grouped(200).to(futureSink)

//#droppy-bcast
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package docs.stream.cookbook

import akka.stream.scaladsl.{ HeadSink, Source }
import akka.stream.scaladsl.{ Sink, Source }

import scala.collection.immutable
import scala.concurrent.Await
Expand All @@ -17,7 +17,7 @@ class RecipeFlattenSeq extends RecipeSpec {
val flattened: Source[Message] = myData.mapConcat(identity)
//#flattening-seqs

Await.result(flattened.grouped(8).runWith(HeadSink()), 3.seconds) should be(List("1", "2", "3", "4", "5", "6", "7"))
Await.result(flattened.grouped(8).runWith(Sink.head), 3.seconds) should be(List("1", "2", "3", "4", "5", "6", "7"))

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package docs.stream.cookbook

import akka.event.Logging
import akka.stream.scaladsl.{ BlackholeSink, Source, Flow }
import akka.stream.scaladsl.{ Sink, Source, Flow }
import akka.testkit.{ EventFilter, TestProbe }

class RecipeLoggingElements extends RecipeSpec {
Expand All @@ -18,7 +18,7 @@ class RecipeLoggingElements extends RecipeSpec {
val loggedSource = mySource.map { elem => println(elem); elem }
//#println-debug

loggedSource.runWith(BlackholeSink)
loggedSource.runWith(Sink.ignore)
printProbe.expectMsgAllOf("1", "2", "3")
}

Expand Down Expand Up @@ -50,7 +50,7 @@ class RecipeLoggingElements extends RecipeSpec {
//#loggingadapter

EventFilter.debug(start = "Element flowing").intercept {
loggedSource.runWith(BlackholeSink)
loggedSource.runWith(Sink.ignore)
}

}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class RecipeManualTrigger extends RecipeSpec {
val zip = Zip[Message, Trigger]
elements ~> zip.left
triggerSource ~> zip.right
zip.out ~> Flow[(Message, Trigger)].map(_._1) ~> sink
zip.out ~> Flow[(Message, Trigger)].map { case (msg, trigger) => msg } ~> sink
}
//#manually-triggered-stream

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package docs.stream.cookbook

import akka.stream.scaladsl.{ HeadSink, Source }
import akka.stream.scaladsl.{ Sink, Source }

import scala.collection.immutable
import scala.concurrent.Await
Expand Down Expand Up @@ -35,8 +35,8 @@ class RecipeMultiGroupBy extends RecipeSpec {
//#multi-groupby

val result = multiGroups.map {
case (topic, topicMessages) => topicMessages.grouped(10).map(topic.name + _.mkString("[", ", ", "]")).runWith(HeadSink())
}.mapAsync(identity).grouped(10).runWith(HeadSink())
case (topic, topicMessages) => topicMessages.grouped(10).map(topic.name + _.mkString("[", ", ", "]")).runWith(Sink.head)
}.mapAsync(identity).grouped(10).runWith(Sink.head)

Await.result(result, 3.seconds).toSet should be(Set(
"1[1: a, 1: b, all: c, all: d, 1: e]",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,81 @@
package docs.stream.cookbook

import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.ByteString

import scala.annotation.tailrec
import scala.concurrent.Await
import scala.concurrent.duration._

class RecipeParseLines extends RecipeSpec {

"Recipe for parsing line from bytes" must {

"work" in {
val rawData = Source.empty[ByteString]
val rawData = Source(List(
ByteString("Hello World"),
ByteString("\r"),
ByteString("!\r"),
ByteString("\nHello Akka!\r\nHello Streams!"),
ByteString("\r\n")))
import akka.stream.stage._

//#parse-lines
def parseLines(separator: String, maximumLineBytes: Int) = new StatefulStage[ByteString, String] {
var buffer = ByteString.empty
var parsed = List.empty[String]
private val separatorBytes = ByteString(separator)
private val firstSeparatorByte = separatorBytes.head
private var buffer = ByteString.empty
private var nextPossibleMatch = 0

def initial = new State {
override def onPush(chunk: ByteString, ctx: Context[String]): Directive = {
buffer ++= chunk
if (buffer.size > maximumLineBytes)
ctx.fail(new IllegalStateException(s"Read ${buffer.size} bytes which is more than $maximumLineBytes" +
" without seeing a line terminator"))
else emit(doParse(), ctx)
else emit(doParse(Vector.empty).iterator, ctx)
}

private def doParse(): Iterator[String] = {
???
@tailrec
private def doParse(parsedLinesSoFar: Vector[String]): Vector[String] = {
val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch)
if (possibleMatchPos == -1) {
// No matching character, we need to accumulate more bytes into the buffer
nextPossibleMatch = buffer.size
parsedLinesSoFar
} else {
if (possibleMatchPos + separatorBytes.size > buffer.size) {
// We have found a possible match (we found the first character of the terminator sequence)
// but we don't have yet enough bytes. We remember the position to retry from next time.
nextPossibleMatch = possibleMatchPos
parsedLinesSoFar
} else {
if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size) == separatorBytes) {
// Found a match
val parsedLine = buffer.slice(0, possibleMatchPos).utf8String
buffer = buffer.drop(possibleMatchPos + separatorBytes.size)
nextPossibleMatch -= possibleMatchPos + separatorBytes.size
doParse(parsedLinesSoFar :+ parsedLine)
} else {
nextPossibleMatch += 1
doParse(parsedLinesSoFar)
}
}
}

}
}

}

pending
val linesStream = rawData.transform(() => parseLines("\r\n", 100))

//#parse-lines

Await.result(linesStream.grouped(10).runWith(Sink.head), 3.seconds) should be(List(
"Hello World\r!",
"Hello Akka!",
"Hello Streams!"))
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class RecipeReduceByKey extends RecipeSpec {
val counts: Source[(String, Int)] = countedWords.mapAsync(identity)
//#word-count

Await.result(counts.grouped(10).runWith(HeadSink()), 3.seconds).toSet should be(Set(
Await.result(counts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
("hello", 2),
("world", 1),
("and", 1),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package docs.stream.cookbook

import akka.stream.scaladsl.{ HeadSink, Source }
import akka.stream.scaladsl.{ Sink, Source }

import scala.collection.immutable
import scala.concurrent.{ Await, Future }
Expand All @@ -15,7 +15,7 @@ class RecipeToStrict extends RecipeSpec {
val MaxAllowedSeqSize = 100

//#draining-to-seq
val strict: Future[immutable.Seq[Message]] = myData.grouped(MaxAllowedSeqSize).runWith(HeadSink())
val strict: Future[immutable.Seq[Message]] = myData.grouped(MaxAllowedSeqSize).runWith(Sink.head)
//#draining-to-seq

Await.result(strict, 3.seconds) should be(List("1", "2", "3"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class RecipeWorkerPool extends RecipeSpec {
val processedJobs: Source[Result] = myJobs.via(balancer(worker, 3))
//#worker-pool

Await.result(processedJobs.grouped(10).runWith(HeadSink()), 3.seconds).toSet should be(Set(
Await.result(processedJobs.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
"1 done", "2 done", "3 done", "4 done", "5 done"))

}
Expand Down

0 comments on commit 5dee3a9

Please sign in to comment.