Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/feature_request.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ labels: enhancement, needs-triage
<!--- Any alternative solutions or features you've considered -->

## Additional Context
<!--- How has the lack of this feaure affected you? What are you trying to accomplish? -->
<!--- How has the lack of this feature affected you? What are you trying to accomplish? -->
<!--- Providing context helps us come up with a solution that is most useful in the real world -->


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import aws.sdk.kotlin.runtime.http.middleware.UserAgent
import aws.smithy.kotlin.runtime.client.ExecutionContext
import aws.smithy.kotlin.runtime.http.*
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfig
import aws.smithy.kotlin.runtime.http.operation.*
import aws.smithy.kotlin.runtime.http.response.HttpResponse
import aws.smithy.kotlin.runtime.io.Closeable
Expand Down Expand Up @@ -57,8 +56,16 @@ public class ImdsClient private constructor(builder: Builder) : Closeable {
private val tokenTtl: Duration = builder.tokenTTL
private val clock: Clock = builder.clock
private val platformProvider: PlatformProvider = builder.platformProvider
private val httpClient: SdkHttpClient

init {
val engine = builder.engine ?: CrtHttpEngine {
connectTimeout = Duration.seconds(1)
socketReadTimeout = Duration.seconds(1)
}

httpClient = sdkHttpClient(engine)

// validate the override at construction time
if (endpointConfiguration is EndpointConfiguration.Custom) {
val url = endpointConfiguration.endpoint.toUrl()
Expand All @@ -70,9 +77,6 @@ public class ImdsClient private constructor(builder: Builder) : Closeable {
}
}

// TODO connect/socket timeouts
private val httpClient = sdkHttpClient(builder.engine ?: CrtHttpEngine(HttpClientEngineConfig()))

// cached middleware instances
private val middleware: List<Feature> = listOf(
ServiceEndpointResolver.create {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ import aws.smithy.kotlin.runtime.http.request.url
import aws.smithy.kotlin.runtime.http.response.HttpResponse
import aws.smithy.kotlin.runtime.httptest.TestConnection
import aws.smithy.kotlin.runtime.httptest.buildTestConnection
import aws.smithy.kotlin.runtime.time.Instant
import aws.smithy.kotlin.runtime.time.ManualClock
import aws.smithy.kotlin.runtime.time.epochMilliseconds
import io.kotest.matchers.string.shouldContain
import kotlinx.coroutines.withTimeout
import kotlinx.serialization.json.*
import kotlin.test.*
import kotlin.time.Duration
Expand Down Expand Up @@ -203,11 +206,23 @@ class ImdsClientTest {
fail("not implemented yet")
}

@Ignore
@Test
fun testHttpConnectTimeouts() {
// Need a 1 sec connect timeout + other timeouts in imds spec
fail("not implemented yet")
fun testHttpConnectTimeouts(): Unit = runSuspendTest {
// end-to-end real client times out after 1-second
val client = ImdsClient {
// will never resolve
endpointConfiguration = EndpointConfiguration.Custom("http://240.0.0.0".toEndpoint())
}

val start = Instant.now()
assertFails {
withTimeout(3000) {
client.get("/latest/metadata")
}
}.message.shouldContain("timed out")
val elapsed = Instant.now().epochMilliseconds - start.epochMilliseconds
assertTrue(elapsed >= 1000)
assertTrue(elapsed < 2000)
}

data class ImdsConfigTest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,74 @@ package aws.sdk.kotlin.runtime.http.engine.crt

import aws.sdk.kotlin.crt.http.*
import aws.sdk.kotlin.crt.io.*
import aws.sdk.kotlin.runtime.ClientException
import aws.sdk.kotlin.runtime.crt.SdkDefaultIO
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineBase
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfig
import aws.smithy.kotlin.runtime.http.engine.callContext
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.http.response.HttpCall
import aws.smithy.kotlin.runtime.logging.Logger
import aws.smithy.kotlin.runtime.time.Instant
import kotlinx.coroutines.job
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withTimeoutOrNull
import kotlin.time.ExperimentalTime

internal const val DEFAULT_WINDOW_SIZE_BYTES: Int = 16 * 1024

/**
* [HttpClientEngine] based on the AWS Common Runtime HTTP client
*/
public class CrtHttpEngine(public val config: HttpClientEngineConfig) : HttpClientEngineBase("crt") {
// FIXME - use the default TLS context when profile cred provider branch is merged
private val tlsCtx = TlsContext(TlsContextOptions.defaultClient())
@OptIn(ExperimentalTime::class)
public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientEngineBase("crt") {
public constructor() : this(CrtHttpEngineConfig.Default)

public companion object {
public operator fun invoke(block: CrtHttpEngineConfig.Builder.() -> Unit): CrtHttpEngine = CrtHttpEngine(CrtHttpEngineConfig.Builder().apply(block).build())
}
private val logger = Logger.getLogger<CrtHttpEngine>()

private val customTlsContext: TlsContext? = if (config.alpn.isNotEmpty() && config.tlsContext == null) {
val options = TlsContextOptionsBuilder().apply {
verifyPeer = true
alpn = config.alpn.joinToString(separator = ";") { it.protocolId }
}.build()
TlsContext(options)
} else {
null
}

init {
logger.warn { "CrtHttpEngine does not support HttpClientEngineConfig.socketReadTimeout(${config.socketReadTimeout}); ignoring" }
logger.warn { "CrtHttpEngine does not support HttpClientEngineConfig.socketWriteTimeout(${config.socketWriteTimeout}); ignoring" }
}

@OptIn(ExperimentalTime::class)
private val options = HttpClientConnectionManagerOptionsBuilder().apply {
clientBootstrap = SdkDefaultIO.ClientBootstrap
tlsContext = tlsCtx
clientBootstrap = config.clientBootstrap ?: SdkDefaultIO.ClientBootstrap
tlsContext = customTlsContext ?: config.tlsContext ?: SdkDefaultIO.TlsContext
manualWindowManagement = true
socketOptions = SocketOptions()
initialWindowSize = DEFAULT_WINDOW_SIZE_BYTES
// TODO - max connections/timeouts/etc
socketOptions = SocketOptions(
connectTimeoutMs = config.connectTimeout.inWholeMilliseconds.toInt()
)
initialWindowSize = config.initialWindowSizeBytes
maxConnections = config.maxConnections.toInt()
maxConnectionIdleMs = config.connectionIdleTimeout.inWholeMilliseconds
}

// connection managers are per host
private val connManagers = mutableMapOf<String, HttpClientConnectionManager>()
private val mutex = Mutex()

@OptIn(ExperimentalTime::class)
override suspend fun roundTrip(request: HttpRequest): HttpCall {
val callContext = callContext()
val manager = getManagerForUri(request.uri)
val conn = manager.acquireConnection()
val conn = withTimeoutOrNull(config.connectionAcquireTimeout) {
manager.acquireConnection()
} ?: throw ClientException("timed out waiting for an HTTP connection to be acquired from the pool")

try {
val reqTime = Instant.now()
Expand Down Expand Up @@ -78,7 +108,7 @@ public class CrtHttpEngine(public val config: HttpClientEngineConfig) : HttpClie
// close all resources
// SAFETY: shutdown is only invoked once AND only after all requests have completed and no more are coming
connManagers.forEach { entry -> entry.value.close() }
tlsCtx.close()
customTlsContext?.close()
}

private suspend fun getManagerForUri(uri: Uri): HttpClientConnectionManager = mutex.withLock {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

package aws.sdk.kotlin.runtime.http.engine.crt

import aws.sdk.kotlin.crt.io.ClientBootstrap
import aws.sdk.kotlin.crt.io.TlsContext
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfig
import aws.smithy.kotlin.runtime.util.InternalApi

@InternalApi
public class CrtHttpEngineConfig private constructor(builder: Builder) : HttpClientEngineConfig(builder) {
public companion object {
/**
* The default engine config. Most clients should use this.
*/
public val Default: CrtHttpEngineConfig = CrtHttpEngineConfig(Builder())
}

/**
* The amount of data that can be buffered before reading from the socket will cease. Reading will
* resume as data is consumed.
*/
public val initialWindowSizeBytes: Int = builder.initialWindowSizeBytes

/**
* The [ClientBootstrap] to use for the engine. By default it is a shared instance.
*/
public var clientBootstrap: ClientBootstrap? = builder.clientBootstrap

/**
* The TLS context to use. By default it is a shared instance.
*/
public var tlsContext: TlsContext? = builder.tlsContext

public class Builder : HttpClientEngineConfig.Builder() {
/**
* Set the amount of data that can be buffered before reading from the socket will cease. Reading will
* resume as data is consumed.
*/
public var initialWindowSizeBytes: Int = DEFAULT_WINDOW_SIZE_BYTES

/**
* Set the [ClientBootstrap] to use for the engine. By default it is a shared instance.
*/
public var clientBootstrap: ClientBootstrap? = null

/**
* Set the TLS context to use. By default it is a shared instance.
*/
public var tlsContext: TlsContext? = null

internal fun build(): CrtHttpEngineConfig = CrtHttpEngineConfig(this)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package aws.sdk.kotlin.runtime.http.engine.crt
import aws.sdk.kotlin.runtime.testing.runSuspendTest
import aws.smithy.kotlin.runtime.http.HttpMethod
import aws.smithy.kotlin.runtime.http.Protocol
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfig
import aws.smithy.kotlin.runtime.http.readAll
import aws.smithy.kotlin.runtime.http.request.HttpRequestBuilder
import aws.smithy.kotlin.runtime.http.request.url
Expand Down Expand Up @@ -43,7 +42,7 @@ class AsyncStressTest : TestWithLocalServer() {
@Test
fun testConcurrentRequests() = runSuspendTest {
// https://github.com/awslabs/aws-sdk-kotlin/issues/170
val client = sdkHttpClient(CrtHttpEngine(HttpClientEngineConfig()))
val client = sdkHttpClient(CrtHttpEngine())
val request = HttpRequestBuilder().apply {
url {
scheme = Protocol.HTTP
Expand Down Expand Up @@ -78,7 +77,7 @@ class AsyncStressTest : TestWithLocalServer() {
// appropriately and allows requests to proceed (a stream that isn't consumed will be in a stuck state
// if the window is full and never incremented again, this can lead to all connections being consumed
// and the engine to no longer make further requests)
val client = sdkHttpClient(CrtHttpEngine(HttpClientEngineConfig()))
val client = sdkHttpClient(CrtHttpEngine())
val request = HttpRequestBuilder().apply {
url {
scheme = Protocol.HTTP
Expand Down