Skip to content

Commit

Permalink
Merge pull request #1545 from williamho/max-open-requests
Browse files Browse the repository at this point in the history
Add maxOpenRequests param in DynamoConfig
  • Loading branch information
2m committed Mar 5, 2019
2 parents 0ce92b1 + 87f891f commit c18bfb0
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 4 deletions.
4 changes: 4 additions & 0 deletions dynamodb/src/main/resources/reference.conf
Expand Up @@ -14,6 +14,10 @@ akka.stream.alpakka.dynamodb {
# Max number of in flight requests from the AwsClient - must be a power of 2
parallelism = 32

# Optional max number of outstanding requests to the underlying host connection pool.
# If unspecified, will be the same as the parallelism value. Must be a power of 2
#max-open-requests =

# Optional Credentials. Used to define static credentials rather than use the DefaultAWSCredentialsProviderChain
#credentials {
# access-key-id = "dummy-access-key"
Expand Down
Expand Up @@ -7,13 +7,16 @@ package akka.stream.alpakka.dynamodb
import akka.actor.ActorSystem
import com.amazonaws.auth._
import com.typesafe.config.Config
import java.util.Optional
import scala.compat.java8.OptionConverters._

final class DynamoSettings private (
val region: String,
val host: String,
val port: Int,
val tls: Boolean,
val parallelism: Int,
val maxOpenRequests: Option[Int],
val credentialsProvider: com.amazonaws.auth.AWSCredentialsProvider
) extends AwsClientSettings {

Expand All @@ -27,27 +30,36 @@ final class DynamoSettings private (
def withTls(value: Boolean): DynamoSettings =
if (value == tls) this else copy(tls = value)
def withParallelism(value: Int): DynamoSettings = copy(parallelism = value)
def withMaxOpenRequests(value: Option[Int]): DynamoSettings = copy(maxOpenRequests = value)
def withCredentialsProvider(value: com.amazonaws.auth.AWSCredentialsProvider): DynamoSettings =
copy(credentialsProvider = value)

/** Java Api */
def withMaxOpenRequests(value: Optional[Int]): DynamoSettings = copy(maxOpenRequests = value.asScala)

/** Java Api */
def getMaxOpenRequests(): Optional[Int] = maxOpenRequests.asJava

private def copy(
region: String = region,
host: String = host,
port: Int = port,
tls: Boolean = tls,
parallelism: Int = parallelism,
maxOpenRequests: Option[Int] = maxOpenRequests,
credentialsProvider: com.amazonaws.auth.AWSCredentialsProvider = credentialsProvider
): DynamoSettings = new DynamoSettings(
region = region,
host = host,
port = port,
tls = tls,
parallelism = parallelism,
maxOpenRequests = maxOpenRequests,
credentialsProvider = credentialsProvider
)

override def toString =
s"""DynamoSettings(region=$region,host=$host,port=$port,parallelism=$parallelism,credentialsProvider=$credentialsProvider)"""
s"""DynamoSettings(region=$region,host=$host,port=$port,parallelism=$parallelism,maxOpenRequests=$maxOpenRequests,credentialsProvider=$credentialsProvider)"""
}

object DynamoSettings {
Expand All @@ -70,6 +82,10 @@ object DynamoSettings {
val port = c.getInt("port")
val tls = c.getBoolean("tls")
val parallelism = c.getInt("parallelism")
val maxOpenRequests = if (c.hasPath("max-open-requests")) {
Option(c.getInt("max-open-requests"))
} else None

val awsCredentialsProvider = {
if (c.hasPath("credentials.access-key-id") &&
c.hasPath("credentials.secret-key-id")) {
Expand All @@ -84,6 +100,7 @@ object DynamoSettings {
port,
tls,
parallelism,
maxOpenRequests,
awsCredentialsProvider
)
}
Expand All @@ -109,6 +126,7 @@ object DynamoSettings {
port = 443,
tls = true,
parallelism = 4,
maxOpenRequests = None,
new DefaultAWSCredentialsProviderChain()
)

Expand Down
Expand Up @@ -36,7 +36,7 @@ private[dynamodb] class DynamoClientImpl(
override protected val connection: AwsConnect = {
val poolSettings = ConnectionPoolSettings(system)
.withMaxConnections(settings.parallelism)
.withMaxOpenRequests(settings.parallelism)
.withMaxOpenRequests(settings.maxOpenRequests.getOrElse(settings.parallelism))
if (settings.tls)
Http().cachedHostConnectionPoolHttps[AwsRequestMetadata](settings.host, settings = poolSettings)
else
Expand Down
4 changes: 2 additions & 2 deletions dynamodb/src/test/java/docs/javadsl/ExampleTest.java
Expand Up @@ -100,7 +100,7 @@ public void allowMultipleRequests() throws Exception {
tableArnSource.runWith(Sink.seq(), materializer);
try {
List<String> strings = streamCompletion.toCompletableFuture().get(5, TimeUnit.SECONDS);
fail("expeced missing schema");
fail("expected missing schema");
} catch (ExecutionException expected) {
// expected
}
Expand All @@ -116,7 +116,7 @@ public void paginated() throws Exception {
scanPages.runWith(Sink.seq(), materializer);
try {
List<ScanResult> strings = streamCompletion.toCompletableFuture().get(1, TimeUnit.SECONDS);
fail("expeced missing schema");
fail("expected missing schema");
} catch (ExecutionException expected) {
// expected
}
Expand Down
Expand Up @@ -16,6 +16,7 @@ import org.scalatest.{Matchers, WordSpecLike}

import scala.concurrent.Await
import scala.concurrent.duration._
import java.util.Optional

class DynamoSettingsSpec extends WordSpecLike with Matchers {

Expand All @@ -34,6 +35,7 @@ class DynamoSettingsSpec extends WordSpecLike with Matchers {
settings.host should be("host")
settings.port should be(443)
settings.parallelism should be(4)
settings.maxOpenRequests should be(None)
settings.credentialsProvider shouldBe a[AWSStaticCredentialsProvider]
}

Expand All @@ -45,10 +47,34 @@ class DynamoSettingsSpec extends WordSpecLike with Matchers {
settings.host should be("localhost")
settings.port should be(8001)
settings.parallelism should be(4)
settings.maxOpenRequests should be(None)
settings.credentialsProvider shouldBe a[DefaultAWSCredentialsProviderChain]
Await.result(system.terminate(), 1.second)
}

"allow configuring optional maxOpenRequests" in {
val config = ConfigFactory.parseString("""
|region = "eu-west-1"
|host = "localhost"
|port = 443
|tls = true
|parallelism = 32
|max-open-requests = 64
""".stripMargin)

val settings = DynamoSettings(config)

// Test Scala API
settings.maxOpenRequests shouldBe Some(64)
settings.withMaxOpenRequests(None).maxOpenRequests shouldBe None
settings.withMaxOpenRequests(Some(32)).maxOpenRequests shouldBe Some(32)

// Test Java API
settings.getMaxOpenRequests shouldBe Optional.of(64)
settings.withMaxOpenRequests(Optional.empty[Int]).getMaxOpenRequests shouldBe Optional.empty[Int]
settings.withMaxOpenRequests(Optional.of(32)).getMaxOpenRequests shouldBe Optional.of(32)
}

"use the DefaultAWSCredentialsProviderChain if the config defines an incomplete akka.stream.alpakka.dynamodb.credentials" in {
val config = ConfigFactory.parseString("""
|region = "eu-west-1"
Expand Down

0 comments on commit c18bfb0

Please sign in to comment.