Skip to content

Commit

Permalink
Refactor GraphDslTest and TcpTest (#30160)
Browse files Browse the repository at this point in the history
  • Loading branch information
Captain1653 committed May 3, 2021
1 parent 4f5e013 commit b3edd0e
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@
import akka.stream.*;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.AkkaSpec;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import scala.collection.Seq;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

public class GraphDslTest extends StreamTest {
Expand Down Expand Up @@ -78,29 +78,33 @@ public void demonstrateBuildSimpleGraph() throws Exception {
@Test
@SuppressWarnings("unused")
public void demonstrateConnectErrors() {
try {
// #simple-graph
final RunnableGraph<NotUsed> g =
RunnableGraph.<NotUsed>fromGraph(
GraphDSL.create(
(b) -> {
final SourceShape<Integer> source1 =
b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5)));
final SourceShape<Integer> source2 =
b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5)));
final FanInShape2<Integer, Integer, Pair<Integer, Integer>> zip =
b.add(Zip.create());
b.from(source1).toInlet(zip.in0());
b.from(source2).toInlet(zip.in1());
return ClosedShape.getInstance();
}));
// unconnected zip.out (!) => "The inlets [] and outlets [] must correspond to the inlets []
// and outlets [ZipWith2.out]"
// #simple-graph
org.junit.Assert.fail("expected IllegalArgumentException");
} catch (IllegalStateException e) {
assertTrue(e != null && e.getMessage() != null && e.getMessage().contains("ZipWith2.out"));
}
IllegalStateException exception =
Assert.assertThrows(
"expected IllegalStateException",
IllegalStateException.class,
() -> {
// #simple-graph
final RunnableGraph<NotUsed> g =
RunnableGraph.fromGraph(
GraphDSL.create(
(b) -> {
final SourceShape<Integer> source1 =
b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5)));
final SourceShape<Integer> source2 =
b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5)));
final FanInShape2<Integer, Integer, Pair<Integer, Integer>> zip =
b.add(Zip.create());
b.from(source1).toInlet(zip.in0());
b.from(source2).toInlet(zip.in1());
return ClosedShape.getInstance();
}));
// unconnected zip.out (!) => "The inlets [] and outlets [] must correspond to the
// inlets []
// and outlets [ZipWith2.out]"
// #simple-graph
});
assertNotNull(exception.getMessage());
assertTrue(exception.getMessage().contains("ZipWith2.out"));
}

@Test
Expand All @@ -112,7 +116,7 @@ public void demonstrateReusingFlowInGraph() throws Exception {
Flow.of(Integer.class).map(elem -> elem * 2);

final RunnableGraph<Pair<CompletionStage<Integer>, CompletionStage<Integer>>> g =
RunnableGraph.<Pair<CompletionStage<Integer>, CompletionStage<Integer>>>fromGraph(
RunnableGraph.fromGraph(
GraphDSL.create(
topHeadSink, // import this sink into the graph
bottomHeadSink, // and this as well
Expand All @@ -137,7 +141,7 @@ public void demonstrateReusingFlowInGraph() throws Exception {
public void demonstrateMatValue() throws Exception {
// #graph-dsl-matvalue
final Sink<Integer, CompletionStage<Integer>> foldSink =
Sink.<Integer, Integer>fold(
Sink.fold(
0,
(a, b) -> {
return a + b;
Expand Down Expand Up @@ -222,10 +226,9 @@ public void beAbleToConstructClosedGraphFromList() throws Exception {
public void canUseMapMaterializedValueOnGraphs() {
Graph<SourceShape<Object>, NotUsed> srcGraph = Source.empty();
Graph<SourceShape<Object>, Pair<NotUsed, NotUsed>> mappedMatValueSrcGraph =
Graph.mapMaterializedValue(
srcGraph, notUsed -> new Pair<NotUsed, NotUsed>(notUsed, notUsed));
Graph.mapMaterializedValue(srcGraph, notUsed -> new Pair<>(notUsed, notUsed));
Sink<Object, CompletionStage<Done>> snk = Sink.ignore();
Pair<NotUsed, NotUsed> pair = Source.fromGraph(mappedMatValueSrcGraph).to(snk).run(system);
assertEquals(pair, new Pair<NotUsed, NotUsed>(NotUsed.getInstance(), NotUsed.getInstance()));
assertEquals(pair, new Pair<>(NotUsed.getInstance(), NotUsed.getInstance()));
}
}
73 changes: 37 additions & 36 deletions akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import akka.testkit.javadsl.TestKit;
import akka.util.ByteString;
import static akka.util.ByteString.emptyByteString;

import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

Expand Down Expand Up @@ -69,7 +71,7 @@ public void apply(IncomingConnection conn) {
}
});

final List<ByteString> testInput = new ArrayList<ByteString>();
final List<ByteString> testInput = new ArrayList<>();

{
for (char c = 'a'; c <= 'z'; c++) {
Expand Down Expand Up @@ -127,49 +129,48 @@ public void mustReportServerBindFailure() throws Exception {
.occurrences(1)
.intercept(
() -> {
try {
binding
.to(echoHandler)
.run(system)
.toCompletableFuture()
.get(5, TimeUnit.SECONDS);
assertTrue("Expected BindFailedException, but nothing was reported", false);
} catch (ExecutionException e) {
if (e.getCause() instanceof BindFailedException) {
} // all good
else throw new AssertionError("failed", e);
// expected
b.unbind();
} catch (Exception e) {
throw new AssertionError("failed", e);
}
ExecutionException executionException =
Assert.assertThrows(
"CompletableFuture.get() should throw ExecutionException",
ExecutionException.class,
() ->
binding
.to(echoHandler)
.run(system)
.toCompletableFuture()
.get(5, TimeUnit.SECONDS));
assertTrue(
"The cause of ExecutionException should be instanceof BindFailedException",
executionException.getCause() instanceof BindFailedException);
b.unbind();
return null;
});
}
};
}

@Test
public void mustReportClientConnectFailure() throws Throwable {
public void mustReportClientConnectFailure() {
final InetSocketAddress serverAddress = SocketUtil.notBoundServerAddress();
try {
try {
Source.from(testInput)
.viaMat(
Tcp.get(system)
.outgoingConnection(serverAddress.getHostString(), serverAddress.getPort()),
Keep.right())
.to(Sink.<ByteString>ignore())
.run(system)
.toCompletableFuture()
.get(5, TimeUnit.SECONDS);
assertTrue("Expected StreamTcpException, but nothing was reported", false);
} catch (ExecutionException e) {
throw e.getCause();
}
} catch (StreamTcpException e) {
// expected
}
ExecutionException executionException =
Assert.assertThrows(
"CompletableFuture.get() should throw ExecutionException",
ExecutionException.class,
() ->
Source.from(testInput)
.viaMat(
Tcp.get(system)
.outgoingConnection(
serverAddress.getHostString(), serverAddress.getPort()),
Keep.right())
.to(Sink.ignore())
.run(system)
.toCompletableFuture()
.get(5, TimeUnit.SECONDS));
assertEquals(
"The cause of ExecutionException should be StreamTcpException",
StreamTcpException.class,
executionException.getCause().getClass());
}

// compile only sample
Expand Down

0 comments on commit b3edd0e

Please sign in to comment.