Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

=str #16787 java cookbook #16850

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions akka-actor/src/main/scala/akka/util/ByteString.scala
Expand Up @@ -7,6 +7,7 @@ package akka.util
import java.nio.{ ByteBuffer, ByteOrder }
import java.lang.{ Iterable ⇒ JIterable }

import scala.annotation.varargs
import scala.collection.IndexedSeqOptimized
import scala.collection.mutable.{ Builder, WrappedArray }
import scala.collection.immutable
Expand Down Expand Up @@ -52,6 +53,14 @@ object ByteString {
*/
def fromArray(array: Array[Byte]): ByteString = apply(array)

/**
* JAVA API
* Creates a new ByteString by copying an int array by converting from integral numbers to bytes.
*/
@varargs
def fromInts(array: Int*): ByteString =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should not be changed here, but in release-2.3 branch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, right - thanks!

apply(array:_*)(scala.math.Numeric.IntIsIntegral)

/**
* Creates a new ByteString by copying length bytes starting at offset from
* an Array.
Expand Down
407 changes: 407 additions & 0 deletions akka-docs-dev/rst/java/stream-cookbook.rst

Large diffs are not rendered by default.

Expand Up @@ -44,16 +44,18 @@ class RecipeDigest extends RecipeSpec {
val digest: Source[ByteString] = data.transform(() => digestCalculator("SHA-256"))
//#calculating-digest

val string = ByteString(
0x24, 0x8d, 0x6a, 0x61,
0xd2, 0x06, 0x38, 0xb8,
0xe5, 0xc0, 0x26, 0x93,
0x0c, 0x3e, 0x60, 0x39,
0xa3, 0x3c, 0xe4, 0x59,
0x64, 0xff, 0x21, 0x67,
0xf6, 0xec, 0xed, 0xd4,
0x19, 0xdb, 0x06, 0xc1)
Await.result(digest.runWith(Sink.head), 3.seconds) should be(
ByteString(
0x24, 0x8d, 0x6a, 0x61,
0xd2, 0x06, 0x38, 0xb8,
0xe5, 0xc0, 0x26, 0x93,
0x0c, 0x3e, 0x60, 0x39,
0xa3, 0x3c, 0xe4, 0x59,
0x64, 0xff, 0x21, 0x67,
0xf6, 0xec, 0xed, 0xd4,
0x19, 0xdb, 0x06, 0xc1))
string)


}

Expand Down
Expand Up @@ -85,7 +85,6 @@ class RecipeGlobalRateLimit extends RecipeSpec {
val limiterTriggerFuture = limiter ? Limiter.WantToPass
limiterTriggerFuture.map((_) => element)
}

}
//#global-limiter-flow

Expand Down
@@ -1,7 +1,6 @@
package docs.stream.cookbook

import akka.stream.scaladsl._
import akka.testkit.TestProbe

import scala.concurrent.Await
import scala.concurrent.duration._
Expand All @@ -11,14 +10,14 @@ class RecipeWorkerPool extends RecipeSpec {
"Recipe for a pool of workers" must {

"work" in {
val myJobs = Source(List("1", "2", "3", "4", "5"))
val data = Source(List("1", "2", "3", "4", "5"))
type Result = String

val worker = Flow[String].map(_ + " done")

//#worker-pool
def balancer[In, Out](worker: Flow[In, Out], workerCount: Int): Flow[In, Out] = {
import FlowGraphImplicits._
import akka.stream.scaladsl.FlowGraphImplicits._

Flow[In, Out]() { implicit graphBuilder =>
val jobsIn = UndefinedSource[In]
Expand All @@ -40,7 +39,7 @@ class RecipeWorkerPool extends RecipeSpec {
}
}

val processedJobs: Source[Result] = myJobs.via(balancer(worker, 3))
val processedJobs: Source[Result] = data.via(balancer(worker, 3))
//#worker-pool

Await.result(processedJobs.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
Expand Down
1 change: 1 addition & 0 deletions akka-samples/akka-docs-java-lambda/build.sbt
Expand Up @@ -16,5 +16,6 @@ libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % publishedAkkaVersion,
"com.typesafe.akka" %% "akka-testkit" % publishedAkkaVersion % "test",
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0-SNAPSHOT",
"com.typesafe.akka" %% "akka-stream-testkit-experimental" % "1.0-SNAPSHOT" % "test->test" classifier "tests",
"junit" % "junit" % "4.11" % "test",
"com.novocode" % "junit-interface" % "0.10" % "test")
7 changes: 7 additions & 0 deletions akka-samples/akka-docs-java-lambda/pom.xml
Expand Up @@ -30,6 +30,13 @@
<artifactId>akka-stream-experimental_2.10</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-testkit-experimental_2.10</artifactId>
<version>1.0-SNAPSHOT</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Expand Up @@ -121,16 +121,8 @@ void deliverBuf() {
@Test
public void demonstrateActorPublisherUsage() {
new JavaTestKit(system) {
class MockSystem {
class Println {
public <T> void println(T s) {
getTestActor().tell(s, ActorRef.noSender());
}
}

public final Println out = new Println();
}
private final MockSystem System = new MockSystem();
final SilenceSystemOut.System System = SilenceSystemOut.get(getTestActor());

{
//#actor-publisher-usage
Expand Down
Expand Up @@ -44,16 +44,6 @@

public class IntegrationDocTest {

static class MockSystem {
class Println {
@SuppressWarnings("UnusedParameters")
<T> void println(T s) { /* ignore */ }
}

public final Println out = new Println();
}
private static final MockSystem System = new MockSystem();

static ActorSystem system;

@BeforeClass
Expand Down Expand Up @@ -491,17 +481,7 @@ public void illustrateOrderingAndParallelismOfMapAsync() throws Exception {
final TestProbe probe = new TestProbe(system);
final EmailServer emailServer = new EmailServer(probe.ref());

class MockSystem {
class Println {
public <T> void println(T s) {
if (s.toString().startsWith("after:"))
probe.ref().tell(s, ActorRef.noSender());
}
}

public final Println out = new Println();
}
private final MockSystem System = new MockSystem();
final SilenceSystemOut.System System = SilenceSystemOut.get((String m) -> m.startsWith("after"), probe.ref());

{
//#sometimes-slow-mapAsync
Expand Down Expand Up @@ -537,17 +517,7 @@ public void illustrateOrderingAndParallelismOfMapAsyncUnordered() throws Excepti
final TestProbe probe = new TestProbe(system);
final EmailServer emailServer = new EmailServer(probe.ref());

class MockSystem {
class Println {
public <T> void println(T s) {
if (s.toString().startsWith("after:"))
probe.ref().tell(s, ActorRef.noSender());
}
}

public final Println out = new Println();
}
private final MockSystem System = new MockSystem();
final SilenceSystemOut.System System = SilenceSystemOut.get((String m) -> m.startsWith("after"), probe.ref());

{
//#sometimes-slow-mapAsyncUnordered
Expand Down
@@ -0,0 +1,56 @@
package docs.stream;

import akka.actor.ActorRef;

import java.util.function.Predicate;

/**
* Acts as if `System.out.println()` yet swallows all messages.
* Useful for putting printlines in examples yet without poluting the build with them.
*/
public class SilenceSystemOut {

private SilenceSystemOut() {}

public static System get() {
return new System(new System.Println() {
@Override
public void println(String s) {
// ignore
}
});
}

public static System get(ActorRef probe) {
return new System(new System.Println() {
@Override
public void println(String s) {
probe.tell(s, ActorRef.noSender());
}
});
}

public static System get(Predicate<String> filter, ActorRef probe) {
return new System(new System.Println() {
@Override
public void println(String s) {
if (filter.test(s)) probe.tell(s, ActorRef.noSender());
}
});
}

public static class System {
public final Println out;

public System(Println out) {
this.out = out;
}

public static abstract class Println {
public abstract void println(String s);
public void println(Object s) { println(s.toString()); }
}

}

}