Skip to content

Commit

Permalink
Merge pull request #16498 from drewhk/wip-15239-stream-cookbook-drewhk
Browse files Browse the repository at this point in the history
Stream cookbook
  • Loading branch information
drewhk committed Dec 19, 2014
2 parents 06c9cc3 + 3a8b662 commit 45cd3c0
Show file tree
Hide file tree
Showing 19 changed files with 1,645 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package docs.stream.cookbook

import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.util.ByteString

import scala.concurrent.Await
import scala.concurrent.duration._

class RecipeByteStrings extends RecipeSpec {

"Recipes for bytestring streams" must {

"have a working chunker" in {
val rawBytes = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9)))
val ChunkLimit = 2

//#bytestring-chunker
import akka.stream.stage._

class Chunker(val chunkSize: Int) extends PushPullStage[ByteString, ByteString] {
private var buffer = ByteString.empty

override def onPush(elem: ByteString, ctx: Context[ByteString]): Directive = {
buffer ++= elem
emitChunkOrPull(ctx)
}

override def onPull(ctx: Context[ByteString]): Directive = emitChunkOrPull(ctx)

private def emitChunkOrPull(ctx: Context[ByteString]): Directive = {
if (buffer.isEmpty) ctx.pull()
else {
val (emit, nextBuffer) = buffer.splitAt(chunkSize)
buffer = nextBuffer
ctx.push(emit)
}
}

}

val chunksStream = rawBytes.transform(() => new Chunker(ChunkLimit))
//#bytestring-chunker

val chunksFuture = chunksStream.grouped(10).runWith(Sink.head)

val chunks = Await.result(chunksFuture, 3.seconds)

chunks.forall(_.size <= 2) should be(true)
chunks.fold(ByteString())(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9))
}

"have a working bytes limiter" in {
val SizeLimit = 9

//#bytes-limiter
import akka.stream.stage._
class ByteLimiter(val maximumBytes: Long) extends PushStage[ByteString, ByteString] {
private var count = 0

override def onPush(chunk: ByteString, ctx: Context[ByteString]): Directive = {
count += chunk.size
if (count > maximumBytes) ctx.fail(new IllegalStateException("Too much bytes"))
else ctx.push(chunk)
}
}

val limiter = Flow[ByteString].transform(() => new ByteLimiter(SizeLimit))
//#bytes-limiter

val bytes1 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9)))
val bytes2 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9, 10)))

Await.result(bytes1.via(limiter).grouped(10).runWith(Sink.head), 3.seconds)
.fold(ByteString())(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9))

an[IllegalStateException] must be thrownBy {
Await.result(bytes2.via(limiter).grouped(10).runWith(Sink.head), 3.seconds)
}
}

"demonstrate compacting" in {

val data = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9)))

//#compacting-bytestrings
val compacted: Source[ByteString] = data.map(_.compact)
//#compacting-bytestrings

Await.result(compacted.grouped(10).runWith(Sink.head), 3.seconds).forall(_.isCompact) should be(true)
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package docs.stream.cookbook

import akka.stream.{ MaterializerSettings, FlowMaterializer }
import akka.stream.scaladsl._
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe }

import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._

class RecipeCollectingMetrics extends RecipeSpec {
import HoldOps._
implicit val m2 = FlowMaterializer(MaterializerSettings(system).withInputBuffer(1, 1))

"Recipe for periodically collecting metrics" must {

"work" in {
// type Tick = Unit
//
// val loadPub = PublisherProbe[Int]()
// val tickPub = PublisherProbe[Tick]()
// val reportTicks = Source(tickPub)
// val loadUpdates = Source(loadPub)
// val futureSink = Sink.head[immutable.Seq[String]]
// val sink = Flow[String].grouped(10).to(futureSink)
//
// //#periodic-metrics-collection
// val currentLoad = loadUpdates.transform(() => new HoldWithWait)
//
// val graph = FlowGraph { implicit builder =>
// import FlowGraphImplicits._
// val collector = ZipWith[Int, Tick, String](
// (load: Int, tick: Tick) => s"current load is $load")
//
// currentLoad ~> collector.left
// reportTicks ~> collector.right
//
// collector.out ~> sink
// }
// //#periodic-metrics-collection
//
// val reports = graph.run().get(futureSink)
// val manualLoad = new StreamTestKit.AutoPublisher(loadPub)
// val manualTick = new StreamTestKit.AutoPublisher(tickPub)
//
// // Prefetch elimination
// manualTick.sendNext(())
//
// manualLoad.sendNext(53)
// manualLoad.sendNext(61)
// manualTick.sendNext(())
//
// manualLoad.sendNext(44)
// manualLoad.sendNext(54)
// manualLoad.sendNext(78)
// Thread.sleep(500)
//
// manualTick.sendNext(())
//
// manualTick.sendComplete()
//
// Await.result(reports, 3.seconds) should be(List("current load is 53", "current load is 61", "current load is 78"))

// Periodically collect values of metrics expressed as stream of updates
// ---------------------------------------------------------------------
//
// **Situation:** Given performance counters expressed as a stream of updates we want to gather a periodic report of these.
// We do not want to backpressure the counter updates but always take the last value instead. Whenever we don't have a new counter
// value we want to repeat the last value.
//
// This recipe uses the :class:`HoldWithWait` recipe introduced previously. We use this element to gather updates from
// the counter stream and store the final value, and also repeat this final value if no update is received between
// metrics collection rounds.
//
// To finish the recipe, we simply use :class:`ZipWith` to trigger reading the latest value from the ``currentLoad``
// stream whenever a new ``Tick`` arrives on the stream of ticks, ``reportTicks``.
//
// .. includecode:: code/docs/stream/cookbook/RecipeCollectingMetrics.scala#periodic-metrics-collection
//
// .. warning::
// In order for this recipe to work the buffer size for the :class:`ZipWith` must be set to 1. The reason for this is
// explained in the "Buffering" section of the documentation.

// FIXME: This recipe does only work with buffer size of 0, which is only available if graph fusing is implemented
pending
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package docs.stream.cookbook

import java.security.MessageDigest

import akka.stream.scaladsl.{ Sink, Source }
import akka.util.ByteString

import scala.concurrent.Await
import scala.concurrent.duration._

class RecipeDigest extends RecipeSpec {

"Recipe for calculating digest" must {

"work" in {

val data = Source(List(
ByteString("abcdbcdecdef"),
ByteString("defgefghfghighijhijkijkljklmklmnlmnomnopnopq")))

//#calculating-digest
import akka.stream.stage._
def digestCalculator(algorithm: String) = new PushPullStage[ByteString, ByteString] {
val digest = MessageDigest.getInstance(algorithm)

override def onPush(chunk: ByteString, ctx: Context[ByteString]): Directive = {
digest.update(chunk.toArray)
ctx.pull()
}

override def onPull(ctx: Context[ByteString]): Directive = {
if (ctx.isFinishing) ctx.pushAndFinish(ByteString(digest.digest()))
else ctx.pull()
}

override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective = {
// If the stream is finished, we need to emit the last element in the onPull block.
// It is not allowed to directly emit elements from a termination block
// (onUpstreamFinish or onUpstreamFailure)
ctx.absorbTermination()
}
}

val digest: Source[ByteString] = data.transform(() => digestCalculator("SHA-256"))
//#calculating-digest

Await.result(digest.runWith(Sink.head), 3.seconds) should be(
ByteString(
0x24, 0x8d, 0x6a, 0x61,
0xd2, 0x06, 0x38, 0xb8,
0xe5, 0xc0, 0x26, 0x93,
0x0c, 0x3e, 0x60, 0x39,
0xa3, 0x3c, 0xe4, 0x59,
0x64, 0xff, 0x21, 0x67,
0xf6, 0xec, 0xed, 0xd4,
0x19, 0xdb, 0x06, 0xc1))

}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package docs.stream.cookbook

import akka.stream.OverflowStrategy
import akka.stream.scaladsl._
import akka.stream.testkit.StreamTestKit.SubscriberProbe

import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._

class RecipeDroppyBroadcast extends RecipeSpec {

"Recipe for a droppy broadcast" must {
"work" in {
val myElements = Source(immutable.Iterable.tabulate(100)(_ + 1))

val sub1 = SubscriberProbe[Int]()
val sub2 = SubscriberProbe[Int]()
val mySink1 = Sink(sub1)
val mySink2 = Sink(sub2)
val futureSink = Sink.head[Seq[Int]]
val mySink3 = Flow[Int].grouped(200).to(futureSink)

//#droppy-bcast
// Makes a sink drop elements if too slow
def droppySink[T](sink: Sink[T], bufferSize: Int): Sink[T] = {
Flow[T].buffer(bufferSize, OverflowStrategy.dropHead).to(sink)
}

import FlowGraphImplicits._
val graph = FlowGraph { implicit builder =>
val bcast = Broadcast[Int]

myElements ~> bcast

bcast ~> droppySink(mySink1, 10)
bcast ~> droppySink(mySink2, 10)
bcast ~> droppySink(mySink3, 10)
}
//#droppy-bcast

Await.result(graph.run().get(futureSink), 3.seconds).sum should be(5050)

sub1.expectSubscription().request(10)
sub2.expectSubscription().request(10)

for (i <- 91 to 100) {
sub1.expectNext(i)
sub2.expectNext(i)
}

sub1.expectComplete()
sub2.expectComplete()

}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package docs.stream.cookbook

import akka.stream.scaladsl.{ Sink, Source }

import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._

class RecipeFlattenSeq extends RecipeSpec {

"Recipe for flatteing a stream of seqs" must {

"work" in {

val someDataSource = Source(List(List("1"), List("2"), List("3", "4", "5"), List("6", "7")))

//#flattening-seqs
val myData: Source[List[Message]] = someDataSource
val flattened: Source[Message] = myData.mapConcat(identity)
//#flattening-seqs

Await.result(flattened.grouped(8).runWith(Sink.head), 3.seconds) should be(List("1", "2", "3", "4", "5", "6", "7"))

}

}

}

0 comments on commit 45cd3c0

Please sign in to comment.