Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add maxOpenRequests param in DynamoConfig #1545

Merged
merged 3 commits into from Mar 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions dynamodb/src/main/resources/reference.conf
Expand Up @@ -14,6 +14,10 @@ akka.stream.alpakka.dynamodb {
# Max number of in flight requests from the AwsClient - must be a power of 2
parallelism = 32

# Optional max number of outstanding requests to the underlying host connection pool.
# If unspecified, will be the same as the parallelism value. Must be a power of 2
#max-open-requests =

# Optional Credentials. Used to define static credentials rather than use the DefaultAWSCredentialsProviderChain
#credentials {
# access-key-id = "dummy-access-key"
Expand Down
Expand Up @@ -7,13 +7,16 @@ package akka.stream.alpakka.dynamodb
import akka.actor.ActorSystem
import com.amazonaws.auth._
import com.typesafe.config.Config
import java.util.Optional
import scala.compat.java8.OptionConverters._

final class DynamoSettings private (
val region: String,
val host: String,
val port: Int,
val tls: Boolean,
val parallelism: Int,
val maxOpenRequests: Option[Int],
val credentialsProvider: com.amazonaws.auth.AWSCredentialsProvider
) extends AwsClientSettings {

Expand All @@ -27,27 +30,36 @@ final class DynamoSettings private (
def withTls(value: Boolean): DynamoSettings =
if (value == tls) this else copy(tls = value)
def withParallelism(value: Int): DynamoSettings = copy(parallelism = value)
def withMaxOpenRequests(value: Option[Int]): DynamoSettings = copy(maxOpenRequests = value)
def withCredentialsProvider(value: com.amazonaws.auth.AWSCredentialsProvider): DynamoSettings =
copy(credentialsProvider = value)

/** Java Api */
def withMaxOpenRequests(value: Optional[Int]): DynamoSettings = copy(maxOpenRequests = value.asScala)

/** Java Api */
def getMaxOpenRequests(): Optional[Int] = maxOpenRequests.asJava

private def copy(
region: String = region,
host: String = host,
port: Int = port,
tls: Boolean = tls,
parallelism: Int = parallelism,
maxOpenRequests: Option[Int] = maxOpenRequests,
credentialsProvider: com.amazonaws.auth.AWSCredentialsProvider = credentialsProvider
): DynamoSettings = new DynamoSettings(
region = region,
host = host,
port = port,
tls = tls,
parallelism = parallelism,
maxOpenRequests = maxOpenRequests,
credentialsProvider = credentialsProvider
)

override def toString =
s"""DynamoSettings(region=$region,host=$host,port=$port,parallelism=$parallelism,credentialsProvider=$credentialsProvider)"""
s"""DynamoSettings(region=$region,host=$host,port=$port,parallelism=$parallelism,maxOpenRequests=$maxOpenRequests,credentialsProvider=$credentialsProvider)"""
}

object DynamoSettings {
Expand All @@ -70,6 +82,10 @@ object DynamoSettings {
val port = c.getInt("port")
val tls = c.getBoolean("tls")
val parallelism = c.getInt("parallelism")
val maxOpenRequests = if (c.hasPath("max-open-requests")) {
Option(c.getInt("max-open-requests"))
} else None

val awsCredentialsProvider = {
if (c.hasPath("credentials.access-key-id") &&
c.hasPath("credentials.secret-key-id")) {
Expand All @@ -84,6 +100,7 @@ object DynamoSettings {
port,
tls,
parallelism,
maxOpenRequests,
awsCredentialsProvider
)
}
Expand All @@ -109,6 +126,7 @@ object DynamoSettings {
port = 443,
tls = true,
parallelism = 4,
maxOpenRequests = None,
new DefaultAWSCredentialsProviderChain()
)

Expand Down
Expand Up @@ -36,7 +36,7 @@ private[dynamodb] class DynamoClientImpl(
override protected val connection: AwsConnect = {
val poolSettings = ConnectionPoolSettings(system)
.withMaxConnections(settings.parallelism)
.withMaxOpenRequests(settings.parallelism)
.withMaxOpenRequests(settings.maxOpenRequests.getOrElse(settings.parallelism))
if (settings.tls)
Http().cachedHostConnectionPoolHttps[AwsRequestMetadata](settings.host, settings = poolSettings)
else
Expand Down
4 changes: 2 additions & 2 deletions dynamodb/src/test/java/docs/javadsl/ExampleTest.java
Expand Up @@ -100,7 +100,7 @@ public void allowMultipleRequests() throws Exception {
tableArnSource.runWith(Sink.seq(), materializer);
try {
List<String> strings = streamCompletion.toCompletableFuture().get(5, TimeUnit.SECONDS);
fail("expeced missing schema");
fail("expected missing schema");
} catch (ExecutionException expected) {
// expected
}
Expand All @@ -116,7 +116,7 @@ public void paginated() throws Exception {
scanPages.runWith(Sink.seq(), materializer);
try {
List<ScanResult> strings = streamCompletion.toCompletableFuture().get(1, TimeUnit.SECONDS);
fail("expeced missing schema");
fail("expected missing schema");
} catch (ExecutionException expected) {
// expected
}
Expand Down
Expand Up @@ -16,6 +16,7 @@ import org.scalatest.{Matchers, WordSpecLike}

import scala.concurrent.Await
import scala.concurrent.duration._
import java.util.Optional

class DynamoSettingsSpec extends WordSpecLike with Matchers {

Expand All @@ -34,6 +35,7 @@ class DynamoSettingsSpec extends WordSpecLike with Matchers {
settings.host should be("host")
settings.port should be(443)
settings.parallelism should be(4)
settings.maxOpenRequests should be(None)
settings.credentialsProvider shouldBe a[AWSStaticCredentialsProvider]
}

Expand All @@ -45,10 +47,34 @@ class DynamoSettingsSpec extends WordSpecLike with Matchers {
settings.host should be("localhost")
settings.port should be(8001)
settings.parallelism should be(4)
settings.maxOpenRequests should be(None)
settings.credentialsProvider shouldBe a[DefaultAWSCredentialsProviderChain]
Await.result(system.terminate(), 1.second)
}

"allow configuring optional maxOpenRequests" in {
val config = ConfigFactory.parseString("""
|region = "eu-west-1"
|host = "localhost"
|port = 443
|tls = true
|parallelism = 32
|max-open-requests = 64
""".stripMargin)

val settings = DynamoSettings(config)

// Test Scala API
settings.maxOpenRequests shouldBe Some(64)
settings.withMaxOpenRequests(None).maxOpenRequests shouldBe None
settings.withMaxOpenRequests(Some(32)).maxOpenRequests shouldBe Some(32)

// Test Java API
settings.getMaxOpenRequests shouldBe Optional.of(64)
settings.withMaxOpenRequests(Optional.empty[Int]).getMaxOpenRequests shouldBe Optional.empty[Int]
settings.withMaxOpenRequests(Optional.of(32)).getMaxOpenRequests shouldBe Optional.of(32)
}

"use the DefaultAWSCredentialsProviderChain if the config defines an incomplete akka.stream.alpakka.dynamodb.credentials" in {
val config = ConfigFactory.parseString("""
|region = "eu-west-1"
Expand Down