Skip to content

Commit

Permalink
=str #16787 java cookbook
Browse files Browse the repository at this point in the history
  • Loading branch information
ktoso committed Feb 11, 2015
1 parent 8ce8f1d commit 4549429
Show file tree
Hide file tree
Showing 30 changed files with 2,355 additions and 60 deletions.
9 changes: 9 additions & 0 deletions akka-actor/src/main/scala/akka/util/ByteString.scala
Expand Up @@ -7,6 +7,7 @@ package akka.util
import java.nio.{ ByteBuffer, ByteOrder }
import java.lang.{ Iterable JIterable }

import scala.annotation.varargs
import scala.collection.IndexedSeqOptimized
import scala.collection.mutable.{ Builder, WrappedArray }
import scala.collection.immutable
Expand Down Expand Up @@ -52,6 +53,14 @@ object ByteString {
*/
def fromArray(array: Array[Byte]): ByteString = apply(array)

/**
* JAVA API
* Creates a new ByteString by copying an int array by converting from integral numbers to bytes.
*/
@varargs
def fromInts(array: Int*): ByteString =
apply(array:_*)(scala.math.Numeric.IntIsIntegral)

/**
* Creates a new ByteString by copying length bytes starting at offset from
* an Array.
Expand Down
409 changes: 409 additions & 0 deletions akka-docs-dev/rst/java/stream-cookbook.rst

Large diffs are not rendered by default.

Expand Up @@ -44,16 +44,18 @@ class RecipeDigest extends RecipeSpec {
val digest: Source[ByteString] = data.transform(() => digestCalculator("SHA-256"))
//#calculating-digest

val string = ByteString(
0x24, 0x8d, 0x6a, 0x61,
0xd2, 0x06, 0x38, 0xb8,
0xe5, 0xc0, 0x26, 0x93,
0x0c, 0x3e, 0x60, 0x39,
0xa3, 0x3c, 0xe4, 0x59,
0x64, 0xff, 0x21, 0x67,
0xf6, 0xec, 0xed, 0xd4,
0x19, 0xdb, 0x06, 0xc1)
Await.result(digest.runWith(Sink.head), 3.seconds) should be(
ByteString(
0x24, 0x8d, 0x6a, 0x61,
0xd2, 0x06, 0x38, 0xb8,
0xe5, 0xc0, 0x26, 0x93,
0x0c, 0x3e, 0x60, 0x39,
0xa3, 0x3c, 0xe4, 0x59,
0x64, 0xff, 0x21, 0x67,
0xf6, 0xec, 0xed, 0xd4,
0x19, 0xdb, 0x06, 0xc1))
string)


}

Expand Down
Expand Up @@ -85,7 +85,6 @@ class RecipeGlobalRateLimit extends RecipeSpec {
val limiterTriggerFuture = limiter ? Limiter.WantToPass
limiterTriggerFuture.map((_) => element)
}

}
//#global-limiter-flow

Expand Down
@@ -1,7 +1,6 @@
package docs.stream.cookbook

import akka.stream.scaladsl._
import akka.testkit.TestProbe

import scala.concurrent.Await
import scala.concurrent.duration._
Expand All @@ -11,14 +10,14 @@ class RecipeWorkerPool extends RecipeSpec {
"Recipe for a pool of workers" must {

"work" in {
val myJobs = Source(List("1", "2", "3", "4", "5"))
val data = Source(List("1", "2", "3", "4", "5"))
type Result = String

val worker = Flow[String].map(_ + " done")

//#worker-pool
def balancer[In, Out](worker: Flow[In, Out], workerCount: Int): Flow[In, Out] = {
import FlowGraphImplicits._
import akka.stream.scaladsl.FlowGraphImplicits._

Flow[In, Out]() { implicit graphBuilder =>
val jobsIn = UndefinedSource[In]
Expand All @@ -40,7 +39,7 @@ class RecipeWorkerPool extends RecipeSpec {
}
}

val processedJobs: Source[Result] = myJobs.via(balancer(worker, 3))
val processedJobs: Source[Result] = data.via(balancer(worker, 3))
//#worker-pool

Await.result(processedJobs.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
Expand Down
1 change: 1 addition & 0 deletions akka-samples/akka-docs-java-lambda/build.sbt
Expand Up @@ -16,5 +16,6 @@ libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % publishedAkkaVersion,
"com.typesafe.akka" %% "akka-testkit" % publishedAkkaVersion % "test",
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0-SNAPSHOT",
"com.typesafe.akka" %% "akka-stream-testkit-experimental" % "1.0-SNAPSHOT" % "test->test" classifier "tests",
"junit" % "junit" % "4.11" % "test",
"com.novocode" % "junit-interface" % "0.10" % "test")
7 changes: 7 additions & 0 deletions akka-samples/akka-docs-java-lambda/pom.xml
Expand Up @@ -30,6 +30,13 @@
<artifactId>akka-stream-experimental_2.10</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-testkit-experimental_2.10</artifactId>
<version>1.0-SNAPSHOT</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Expand Up @@ -121,16 +121,8 @@ void deliverBuf() {
@Test
public void demonstrateActorPublisherUsage() {
new JavaTestKit(system) {
class MockSystem {
class Println {
public <T> void println(T s) {
getTestActor().tell(s, ActorRef.noSender());
}
}

public final Println out = new Println();
}
private final MockSystem System = new MockSystem();
final SilenceSystemOut.System System = SilenceSystemOut.get(getTestActor());

{
//#actor-publisher-usage
Expand Down
Expand Up @@ -44,16 +44,6 @@

public class IntegrationDocTest {

static class MockSystem {
class Println {
@SuppressWarnings("UnusedParameters")
<T> void println(T s) { /* ignore */ }
}

public final Println out = new Println();
}
private static final MockSystem System = new MockSystem();

static ActorSystem system;

@BeforeClass
Expand Down Expand Up @@ -491,17 +481,7 @@ public void illustrateOrderingAndParallelismOfMapAsync() throws Exception {
final TestProbe probe = new TestProbe(system);
final EmailServer emailServer = new EmailServer(probe.ref());

class MockSystem {
class Println {
public <T> void println(T s) {
if (s.toString().startsWith("after:"))
probe.ref().tell(s, ActorRef.noSender());
}
}

public final Println out = new Println();
}
private final MockSystem System = new MockSystem();
final SilenceSystemOut.System System = SilenceSystemOut.get((String m) -> m.startsWith("after"), probe.ref());

{
//#sometimes-slow-mapAsync
Expand Down Expand Up @@ -537,17 +517,7 @@ public void illustrateOrderingAndParallelismOfMapAsyncUnordered() throws Excepti
final TestProbe probe = new TestProbe(system);
final EmailServer emailServer = new EmailServer(probe.ref());

class MockSystem {
class Println {
public <T> void println(T s) {
if (s.toString().startsWith("after:"))
probe.ref().tell(s, ActorRef.noSender());
}
}

public final Println out = new Println();
}
private final MockSystem System = new MockSystem();
final SilenceSystemOut.System System = SilenceSystemOut.get((String m) -> m.startsWith("after"), probe.ref());

{
//#sometimes-slow-mapAsyncUnordered
Expand Down
@@ -0,0 +1,56 @@
package docs.stream;

import akka.actor.ActorRef;

import java.util.function.Predicate;

/**
* Acts as if `System.out.println()` yet swallows all messages.
* Useful for putting printlines in examples yet without poluting the build with them.
*/
public class SilenceSystemOut {

private SilenceSystemOut() {}

public static System get() {
return new System(new System.Println() {
@Override
public void println(String s) {
// ignore
}
});
}

public static System get(ActorRef probe) {
return new System(new System.Println() {
@Override
public void println(String s) {
probe.tell(s, ActorRef.noSender());
}
});
}

public static System get(Predicate<String> filter, ActorRef probe) {
return new System(new System.Println() {
@Override
public void println(String s) {
if (filter.test(s)) probe.tell(s, ActorRef.noSender());
}
});
}

public static class System {
public final Println out;

public System(Println out) {
this.out = out;
}

public static abstract class Println {
public abstract void println(String s);
public void println(Object s) { println(s.toString()); }
}

}

}

0 comments on commit 4549429

Please sign in to comment.