Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNS: Akka 2.6; proper Http client pool shutdown in tests #2522

Merged
merged 1 commit into from
Nov 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/src/main/paradox/sns.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ The table below shows direct dependencies of this module and the second tab show

## Setup

This connector requires an implicit @javadoc[SnsAsyncClient](software.amazon.awssdk.services.sns.SnsAsyncClient) instance to communicate with AWS SQS.
This connector requires an @scala[implicit] @javadoc[SnsAsyncClient](software.amazon.awssdk.services.sns.SnsAsyncClient) instance to communicate with AWS SQS.

It is your code's responsibility to call `close` to free any resources held by the client. In this example it will be called when the actor system is terminated.

Expand All @@ -43,7 +43,7 @@ Java

The example above uses @extref:[Akka HTTP](akka-http:) as the default HTTP client implementation. For more details about the HTTP client, configuring request retrying and best practices for credentials, see @ref[AWS client configuration](aws-shared-configuration.md) for more details.

We will also need an @apidoc[akka.actor.ActorSystem] and an @apidoc[akka.stream.Materializer].
We will also need an @apidoc[akka.actor.ActorSystem].

Scala
: @@snip [snip](/sns/src/test/scala/akka/stream/alpakka/sns/IntegrationTestContext.scala) { #init-system }
Expand Down
29 changes: 14 additions & 15 deletions sns/src/test/java/docs/javadsl/SnsPublisherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@

import akka.Done;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.http.javadsl.Http;
import akka.stream.alpakka.sns.javadsl.SnsPublisher;
import akka.stream.alpakka.testkit.javadsl.LogCapturingJunit4;
import akka.stream.javadsl.Sink;
Expand All @@ -17,7 +16,6 @@
// #init-client
import java.net.URI;
import com.github.matsluni.akkahttpspi.AkkaHttpClient;
import org.junit.Rule;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
Expand All @@ -26,6 +24,7 @@

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import software.amazon.awssdk.services.sns.model.CreateTopicRequest;
import software.amazon.awssdk.services.sns.model.PublishRequest;
Expand All @@ -41,7 +40,6 @@ public class SnsPublisherTest {
@Rule public final LogCapturingJunit4 logCapturing = new LogCapturingJunit4();

static ActorSystem system;
static Materializer materializer;
static SnsAsyncClient snsClient;
static String topicArn;

Expand All @@ -50,7 +48,6 @@ public class SnsPublisherTest {
@BeforeClass
public static void setUpBeforeClass() throws ExecutionException, InterruptedException {
system = ActorSystem.create("SnsPublisherTest");
materializer = ActorMaterializer.create(system);
snsClient = createSnsClient();
topicArn =
snsClient
Expand All @@ -60,8 +57,12 @@ public static void setUpBeforeClass() throws ExecutionException, InterruptedExce
}

@AfterClass
public static void tearDownAfterClass() {
TestKit.shutdownActorSystem(system);
public static void tearDownAfterClass() throws Exception {
Http.get(system)
.shutdownAllConnectionPools()
.thenRun(() -> TestKit.shutdownActorSystem(system))
.toCompletableFuture()
.get(2, TimeUnit.SECONDS);
}

static SnsAsyncClient createSnsClient() {
Expand Down Expand Up @@ -93,16 +94,14 @@ static SnsAsyncClient createSnsClient() {
void documentation() {
// #init-system
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
// #init-system
}

@Test
public void sinkShouldPublishString() throws Exception {
CompletionStage<Done> completion =
// #use-sink
Source.single("message")
.runWith(SnsPublisher.createSink(topicArn, snsClient), materializer);
Source.single("message").runWith(SnsPublisher.createSink(topicArn, snsClient), system);

// #use-sink
assertThat(completion.toCompletableFuture().get(2, TimeUnit.SECONDS), is(Done.getInstance()));
Expand All @@ -113,7 +112,7 @@ public void sinkShouldPublishRequest() throws Exception {
CompletionStage<Done> completion =
// #use-sink
Source.single(PublishRequest.builder().message("message").build())
.runWith(SnsPublisher.createPublishSink(topicArn, snsClient), materializer);
.runWith(SnsPublisher.createPublishSink(topicArn, snsClient), system);

// #use-sink
assertThat(completion.toCompletableFuture().get(2, TimeUnit.SECONDS), is(Done.getInstance()));
Expand All @@ -124,7 +123,7 @@ public void sinkShouldPublishRequestWithDynamicTopic() throws Exception {
CompletionStage<Done> completion =
// #use-sink
Source.single(PublishRequest.builder().message("message").topicArn(topicArn).build())
.runWith(SnsPublisher.createPublishSink(snsClient), materializer);
.runWith(SnsPublisher.createPublishSink(snsClient), system);
// #use-sink
assertThat(completion.toCompletableFuture().get(2, TimeUnit.SECONDS), is(Done.getInstance()));
}
Expand All @@ -135,7 +134,7 @@ public void flowShouldPublishString() throws Exception {
// #use-flow
Source.single("message")
.via(SnsPublisher.createFlow(topicArn, snsClient))
.runWith(Sink.foreach(res -> System.out.println(res.messageId())), materializer);
.runWith(Sink.foreach(res -> System.out.println(res.messageId())), system);

// #use-flow
assertThat(completion.toCompletableFuture().get(2, TimeUnit.SECONDS), is(Done.getInstance()));
Expand All @@ -147,7 +146,7 @@ public void flowShouldPublishRequest() throws Exception {
// #use-flow
Source.single(PublishRequest.builder().message("message").build())
.via(SnsPublisher.createPublishFlow(topicArn, snsClient))
.runWith(Sink.foreach(res -> System.out.println(res.messageId())), materializer);
.runWith(Sink.foreach(res -> System.out.println(res.messageId())), system);

// #use-flow
assertThat(completion.toCompletableFuture().get(2, TimeUnit.SECONDS), is(Done.getInstance()));
Expand All @@ -159,7 +158,7 @@ public void flowShouldPublishRequestWithDynamicTopic() throws Exception {
// #use-flow
Source.single(PublishRequest.builder().message("message").topicArn(topicArn).build())
.via(SnsPublisher.createPublishFlow(snsClient))
.runWith(Sink.foreach(res -> System.out.println(res.messageId())), materializer);
.runWith(Sink.foreach(res -> System.out.println(res.messageId())), system);

// #use-flow
assertThat(completion.toCompletableFuture().get(2, TimeUnit.SECONDS), is(Done.getInstance()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package akka.stream.alpakka.sns

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import org.mockito.Mockito.reset
import org.scalatestplus.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite}
Expand All @@ -17,7 +16,6 @@ import scala.concurrent.duration._
trait DefaultTestContext extends BeforeAndAfterAll with BeforeAndAfterEach with MockitoSugar { this: Suite =>

implicit protected val system: ActorSystem = ActorSystem()
implicit protected val mat: Materializer = ActorMaterializer()
implicit protected val snsClient: SnsAsyncClient = mock[SnsAsyncClient]

override protected def beforeEach(): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@ package akka.stream.alpakka.sns
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import akka.testkit.TestKit
import akka.http.scaladsl.Http
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, Suite}
import software.amazon.awssdk.services.sns.SnsAsyncClient
import software.amazon.awssdk.services.sns.model.CreateTopicRequest

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

trait IntegrationTestContext extends BeforeAndAfterAll with ScalaFutures {
this: Suite =>

//#init-system
implicit val system: ActorSystem = ActorSystem()
implicit val mat: Materializer = ActorMaterializer()
//#init-system

def snsEndpoint: String = s"http://localhost:4100"
Expand All @@ -42,7 +41,12 @@ trait IntegrationTestContext extends BeforeAndAfterAll with ScalaFutures {
topicArn = createTopic()
}

override protected def afterAll(): Unit = TestKit.shutdownActorSystem(system)
override protected def afterAll(): Unit = {
Http()
.shutdownAllConnectionPools()
.flatMap(_ => system.terminate())(ExecutionContext.global)
.futureValue
}

def createAsyncClient(endEndpoint: String): SnsAsyncClient = {
//#init-client
Expand Down