diff --git a/aws-runtime/aws-config/build.gradle.kts b/aws-runtime/aws-config/build.gradle.kts index c9612e1457f..30322c187a6 100644 --- a/aws-runtime/aws-config/build.gradle.kts +++ b/aws-runtime/aws-config/build.gradle.kts @@ -24,7 +24,7 @@ kotlin { implementation("aws.smithy.kotlin:logging:$smithyKotlinVersion") implementation("aws.smithy.kotlin:http:$smithyKotlinVersion") implementation("aws.smithy.kotlin:utils:$smithyKotlinVersion") - implementation(project(":aws-runtime:http-client-engine-crt")) + implementation("aws.smithy.kotlin:http-client-engine-crt:$smithyKotlinVersion") implementation(project(":aws-runtime:aws-http")) // parsing common JSON credentials responses diff --git a/aws-runtime/aws-config/common/src/aws/sdk/kotlin/runtime/auth/credentials/DefaultChainCredentialsProvider.kt b/aws-runtime/aws-config/common/src/aws/sdk/kotlin/runtime/auth/credentials/DefaultChainCredentialsProvider.kt index 705382b7416..5b55424ce46 100644 --- a/aws-runtime/aws-config/common/src/aws/sdk/kotlin/runtime/auth/credentials/DefaultChainCredentialsProvider.kt +++ b/aws-runtime/aws-config/common/src/aws/sdk/kotlin/runtime/auth/credentials/DefaultChainCredentialsProvider.kt @@ -7,8 +7,8 @@ package aws.sdk.kotlin.runtime.auth.credentials import aws.sdk.kotlin.runtime.config.AwsSdkSetting import aws.sdk.kotlin.runtime.config.imds.ImdsClient -import aws.sdk.kotlin.runtime.http.engine.crt.CrtHttpEngine import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine +import aws.smithy.kotlin.runtime.http.engine.crt.CrtHttpEngine import aws.smithy.kotlin.runtime.io.Closeable import aws.smithy.kotlin.runtime.util.Platform import aws.smithy.kotlin.runtime.util.PlatformProvider diff --git a/aws-runtime/aws-config/common/src/aws/sdk/kotlin/runtime/auth/credentials/EcsCredentialsProvider.kt b/aws-runtime/aws-config/common/src/aws/sdk/kotlin/runtime/auth/credentials/EcsCredentialsProvider.kt index 4e0aa66ce8a..fa241a5ef95 100644 --- a/aws-runtime/aws-config/common/src/aws/sdk/kotlin/runtime/auth/credentials/EcsCredentialsProvider.kt +++ b/aws-runtime/aws-config/common/src/aws/sdk/kotlin/runtime/auth/credentials/EcsCredentialsProvider.kt @@ -8,11 +8,11 @@ package aws.sdk.kotlin.runtime.auth.credentials import aws.sdk.kotlin.runtime.config.AwsSdkSetting import aws.sdk.kotlin.runtime.config.AwsSdkSetting.AwsContainerCredentialsRelativeUri import aws.sdk.kotlin.runtime.config.resolve -import aws.sdk.kotlin.runtime.http.engine.crt.CrtHttpEngine import aws.smithy.kotlin.runtime.ServiceException import aws.smithy.kotlin.runtime.client.ExecutionContext import aws.smithy.kotlin.runtime.http.* import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine +import aws.smithy.kotlin.runtime.http.engine.crt.CrtHttpEngine import aws.smithy.kotlin.runtime.http.middleware.ResolveEndpoint import aws.smithy.kotlin.runtime.http.middleware.Retry import aws.smithy.kotlin.runtime.http.operation.* diff --git a/aws-runtime/aws-config/common/src/aws/sdk/kotlin/runtime/config/imds/ImdsClient.kt b/aws-runtime/aws-config/common/src/aws/sdk/kotlin/runtime/config/imds/ImdsClient.kt index 6611c3bc7d3..029666b7c43 100644 --- a/aws-runtime/aws-config/common/src/aws/sdk/kotlin/runtime/config/imds/ImdsClient.kt +++ b/aws-runtime/aws-config/common/src/aws/sdk/kotlin/runtime/config/imds/ImdsClient.kt @@ -8,13 +8,13 @@ package aws.sdk.kotlin.runtime.config.imds import aws.sdk.kotlin.runtime.AwsServiceException import aws.sdk.kotlin.runtime.http.ApiMetadata import aws.sdk.kotlin.runtime.http.AwsUserAgentMetadata -import aws.sdk.kotlin.runtime.http.engine.crt.CrtHttpEngine import aws.sdk.kotlin.runtime.http.middleware.UserAgent import aws.smithy.kotlin.runtime.client.ExecutionContext import aws.smithy.kotlin.runtime.client.SdkClientOption import aws.smithy.kotlin.runtime.client.SdkLogMode import aws.smithy.kotlin.runtime.http.* import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine +import aws.smithy.kotlin.runtime.http.engine.crt.CrtHttpEngine import aws.smithy.kotlin.runtime.http.middleware.ResolveEndpoint import aws.smithy.kotlin.runtime.http.middleware.Retry import aws.smithy.kotlin.runtime.http.operation.* diff --git a/aws-runtime/aws-signing/build.gradle.kts b/aws-runtime/aws-signing/build.gradle.kts index 676dc942ae6..c97120d0c1c 100644 --- a/aws-runtime/aws-signing/build.gradle.kts +++ b/aws-runtime/aws-signing/build.gradle.kts @@ -18,6 +18,7 @@ kotlin { commonMain { dependencies { val crtKotlinVersion: String by project + api(project(":aws-runtime:aws-core")) // signing config uses CredentialsProvider/Credentials api(project(":aws-runtime:aws-types")) // presigner config exposes endpoint resolver @@ -25,7 +26,7 @@ kotlin { // sign() API takes HttpRequest api("aws.smithy.kotlin:http:$smithyKotlinVersion") - implementation(project(":aws-runtime:crt-util")) + implementation("aws.smithy.kotlin:crt-util:$smithyKotlinVersion") implementation("aws.sdk.kotlin.crt:aws-crt-kotlin:$crtKotlinVersion") implementation("aws.smithy.kotlin:logging:$smithyKotlinVersion") } diff --git a/aws-runtime/aws-signing/common/src/aws/sdk/kotlin/runtime/auth/signing/AwsSigV4SigningMiddleware.kt b/aws-runtime/aws-signing/common/src/aws/sdk/kotlin/runtime/auth/signing/AwsSigV4SigningMiddleware.kt index 20e954ef6a0..87008d3b6f4 100644 --- a/aws-runtime/aws-signing/common/src/aws/sdk/kotlin/runtime/auth/signing/AwsSigV4SigningMiddleware.kt +++ b/aws-runtime/aws-signing/common/src/aws/sdk/kotlin/runtime/auth/signing/AwsSigV4SigningMiddleware.kt @@ -9,10 +9,10 @@ import aws.sdk.kotlin.crt.auth.signing.AwsSignedBodyValue import aws.sdk.kotlin.crt.auth.signing.AwsSigner import aws.sdk.kotlin.runtime.InternalSdkApi import aws.sdk.kotlin.runtime.auth.credentials.CredentialsProvider -import aws.sdk.kotlin.runtime.crt.toSignableCrtRequest -import aws.sdk.kotlin.runtime.crt.update import aws.sdk.kotlin.runtime.execution.AuthAttributes import aws.smithy.kotlin.runtime.client.ExecutionContext +import aws.smithy.kotlin.runtime.crt.toSignableCrtRequest +import aws.smithy.kotlin.runtime.crt.update import aws.smithy.kotlin.runtime.http.* import aws.smithy.kotlin.runtime.http.operation.* import aws.smithy.kotlin.runtime.util.get diff --git a/aws-runtime/aws-signing/common/src/aws/sdk/kotlin/runtime/auth/signing/AwsSigning.kt b/aws-runtime/aws-signing/common/src/aws/sdk/kotlin/runtime/auth/signing/AwsSigning.kt index cd508a92a63..c6bea920d7a 100644 --- a/aws-runtime/aws-signing/common/src/aws/sdk/kotlin/runtime/auth/signing/AwsSigning.kt +++ b/aws-runtime/aws-signing/common/src/aws/sdk/kotlin/runtime/auth/signing/AwsSigning.kt @@ -7,8 +7,8 @@ package aws.sdk.kotlin.runtime.auth.signing import aws.sdk.kotlin.crt.auth.signing.AwsSigner import aws.sdk.kotlin.runtime.InternalSdkApi -import aws.sdk.kotlin.runtime.crt.toSignableCrtRequest -import aws.sdk.kotlin.runtime.crt.update +import aws.smithy.kotlin.runtime.crt.toSignableCrtRequest +import aws.smithy.kotlin.runtime.crt.update import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.http.request.toBuilder diff --git a/aws-runtime/aws-signing/common/src/aws/sdk/kotlin/runtime/auth/signing/Presigner.kt b/aws-runtime/aws-signing/common/src/aws/sdk/kotlin/runtime/auth/signing/Presigner.kt index bf1c8239325..bad7a39b61d 100644 --- a/aws-runtime/aws-signing/common/src/aws/sdk/kotlin/runtime/auth/signing/Presigner.kt +++ b/aws-runtime/aws-signing/common/src/aws/sdk/kotlin/runtime/auth/signing/Presigner.kt @@ -12,11 +12,11 @@ import aws.sdk.kotlin.crt.auth.signing.AwsSigner import aws.sdk.kotlin.crt.auth.signing.AwsSigningConfig import aws.sdk.kotlin.runtime.InternalSdkApi import aws.sdk.kotlin.runtime.auth.credentials.CredentialsProvider -import aws.sdk.kotlin.runtime.crt.path -import aws.sdk.kotlin.runtime.crt.queryParameters -import aws.sdk.kotlin.runtime.crt.toCrtHeaders -import aws.sdk.kotlin.runtime.crt.toSdkHeaders import aws.sdk.kotlin.runtime.endpoint.AwsEndpointResolver +import aws.smithy.kotlin.runtime.crt.path +import aws.smithy.kotlin.runtime.crt.queryParameters +import aws.smithy.kotlin.runtime.crt.toCrtHeaders +import aws.smithy.kotlin.runtime.crt.toSdkHeaders import aws.smithy.kotlin.runtime.http.Headers import aws.smithy.kotlin.runtime.http.HttpBody import aws.smithy.kotlin.runtime.http.HttpMethod diff --git a/aws-runtime/build.gradle.kts b/aws-runtime/build.gradle.kts index 93da032de5b..bfd90cad864 100644 --- a/aws-runtime/build.gradle.kts +++ b/aws-runtime/build.gradle.kts @@ -86,12 +86,6 @@ subprojects { } } -// FIXME - resolves build deadlock with aws-core when using composite builds -val topLevelModule = "crt-util" -subprojects.filter { it.name != topLevelModule }.forEach { proj -> - proj.tasks.findByName("generatePomFileForJvmPublication")?.dependsOn(":aws-runtime:$topLevelModule:generatePomFileForJvmPublication") -} - task("rootAllTest"){ destinationDir = File(project.buildDir, "reports/tests/rootAllTest") val rootAllTest = this diff --git a/aws-runtime/crt-util/build.gradle.kts b/aws-runtime/crt-util/build.gradle.kts deleted file mode 100644 index e36428eee00..00000000000 --- a/aws-runtime/crt-util/build.gradle.kts +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -buildscript { - val atomicFuVersion: String by project - repositories { - mavenCentral() - } - - dependencies { - classpath("org.jetbrains.kotlinx:atomicfu-gradle-plugin:$atomicFuVersion") - } -} - -apply(plugin = "kotlinx-atomicfu") - -description = "Utilities for working with AWS CRT Kotlin" -extra["displayName"] = "AWS :: SDK :: Kotlin :: CRT :: Util" -extra["moduleName"] = "aws.sdk.kotlin.runtime.crt" - -val atomicFuVersion: String by project -val coroutinesVersion: String by project -val crtKotlinVersion: String by project -val smithyKotlinVersion: String by project - -kotlin { - sourceSets { - commonMain { - dependencies { - api(project(":aws-runtime:aws-core")) - api("aws.sdk.kotlin.crt:aws-crt-kotlin:$crtKotlinVersion") - api("aws.smithy.kotlin:http:$smithyKotlinVersion") - implementation("org.jetbrains.kotlinx:atomicfu:$atomicFuVersion") - } - } - commonTest { - dependencies { - implementation(project(":aws-runtime:testing")) - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:$coroutinesVersion") - } - } - - all { - languageSettings.optIn("aws.smithy.kotlin.runtime.util.InternalApi") - languageSettings.optIn("aws.sdk.kotlin.runtime.InternalSdkApi") - } - } -} diff --git a/aws-runtime/crt-util/common/src/aws/sdk/kotlin/runtime/crt/Http.kt b/aws-runtime/crt-util/common/src/aws/sdk/kotlin/runtime/crt/Http.kt deleted file mode 100644 index a535c050969..00000000000 --- a/aws-runtime/crt-util/common/src/aws/sdk/kotlin/runtime/crt/Http.kt +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.crt - -import aws.sdk.kotlin.crt.http.HttpRequestBodyStream -import aws.sdk.kotlin.runtime.InternalSdkApi -import aws.smithy.kotlin.runtime.http.* -import aws.smithy.kotlin.runtime.http.request.HttpRequest -import aws.smithy.kotlin.runtime.http.request.HttpRequestBuilder -import aws.smithy.kotlin.runtime.http.util.splitAsQueryParameters -import kotlin.coroutines.coroutineContext -import aws.sdk.kotlin.crt.http.Headers as HeadersCrt -import aws.sdk.kotlin.crt.http.HttpRequest as HttpRequestCrt - -/** - * Convert an [HttpRequestBuilder] into a CRT HttpRequest for the purpose of signing. - */ -@InternalSdkApi -public suspend fun HttpRequestBuilder.toSignableCrtRequest(unsignedPayload: Boolean = false): HttpRequestCrt { - // Streams that implement HttpBody.Streaming and are not replayable are not signable without consuming the stream - // and would need to go through chunked signing or unsigned payload - // see: https://github.com/awslabs/smithy-kotlin/issues/296 - - val bodyStream = if (!unsignedPayload) { - signableBodyStream(body) - } else { - null - } - - return HttpRequestCrt(method.name, url.encodedPath, HttpHeadersCrt(headers), bodyStream) -} - -private suspend fun signableBodyStream(body: HttpBody): HttpRequestBodyStream? = when (body) { - is HttpBody.Bytes -> HttpRequestBodyStream.fromByteArray(body.bytes()) - is HttpBody.Streaming -> if (body.isReplayable) { - // FIXME: this is not particularly efficient since we have to launch a coroutine to fill it. - // see https://github.com/awslabs/smithy-kotlin/issues/436 - ReadChannelBodyStream(body.readFrom(), coroutineContext) - } else { - // can only consume the stream once - null - } - else -> null -} - -/** - * Convert an [HttpRequest] into a CRT HttpRequest for the purposes of signing - */ -@InternalSdkApi -public suspend fun HttpRequest.toSignableCrtRequest(): HttpRequestCrt = - HttpRequestCrt( - method = method.name, - encodedPath = url.encodedPath, - headers = headers.toCrtHeaders(), - body = signableBodyStream(body) - ) - -// proxy the smithy-client-rt version of Headers to CRT (which is based on our client-rt version in the first place) -private class HttpHeadersCrt(val headers: HeadersBuilder) : HeadersCrt { - override fun contains(name: String): Boolean = headers.contains(name) - override fun entries(): Set>> = headers.entries() - override fun getAll(name: String): List? = headers.getAll(name) - override fun isEmpty(): Boolean = headers.isEmpty() - override fun names(): Set = headers.names() -} - -/** - * Update a request builder from a CRT HTTP request (primary use is updating a request builder after signing) - */ -@InternalSdkApi -public fun HttpRequestBuilder.update(crtRequest: HttpRequestCrt) { - crtRequest.headers.entries().forEach { entry -> - headers.appendMissing(entry.key, entry.value) - } - - if (crtRequest.encodedPath.isNotBlank()) { - crtRequest.queryParameters()?.let { - it.forEach { key, values -> - // the crt request has a url encoded path which means - // simply appending missing could result in both the raw and percent-encoded - // value being present. Instead just append new keys added by signing - if (!url.parameters.contains(key)) { - url.parameters.appendAll(key, values) - } - } - } - } -} - -/** - * Get just the query parameters (if any) - * @return the query parameters from the path or null if there weren't any - */ -@InternalSdkApi -public fun HttpRequestCrt.queryParameters(): QueryParameters? { - val idx = encodedPath.indexOf("?") - if (idx < 0 || idx + 1 > encodedPath.length) return null - - val fragmentIdx = encodedPath.indexOf("#", startIndex = idx) - val rawQueryString = if (fragmentIdx > 0) encodedPath.substring(idx + 1, fragmentIdx) else encodedPath.substring(idx + 1) - return rawQueryString.splitAsQueryParameters() -} - -/** - * Get just the encoded path sans any query or fragment - * @return the URI path segment from the encoded path - */ -@InternalSdkApi -public fun HttpRequestCrt.path(): String { - val idx = encodedPath.indexOf("?") - return if (idx > 0) encodedPath.substring(0, idx) else encodedPath -} - -// Convert CRT header type to SDK header type -@InternalSdkApi -public fun aws.sdk.kotlin.crt.http.Headers.toSdkHeaders(): Headers { - val headersBuilder = HeadersBuilder() - - forEach { key, values -> - headersBuilder.appendAll(key, values) - } - - return headersBuilder.build() -} - -// Convert SDK header type to CRT header type -@InternalSdkApi -public fun Headers.toCrtHeaders(): aws.sdk.kotlin.crt.http.Headers { - val headersBuilder = aws.sdk.kotlin.crt.http.HeadersBuilder() - - forEach { key, values -> - headersBuilder.appendAll(key, values) - } - - return headersBuilder.build() -} diff --git a/aws-runtime/crt-util/common/src/aws/sdk/kotlin/runtime/crt/ReadChannelBodyStream.kt b/aws-runtime/crt-util/common/src/aws/sdk/kotlin/runtime/crt/ReadChannelBodyStream.kt deleted file mode 100644 index 80e5c378f13..00000000000 --- a/aws-runtime/crt-util/common/src/aws/sdk/kotlin/runtime/crt/ReadChannelBodyStream.kt +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.crt - -import aws.sdk.kotlin.crt.http.HttpRequestBodyStream -import aws.sdk.kotlin.crt.io.MutableBuffer -import aws.sdk.kotlin.runtime.InternalSdkApi -import aws.smithy.kotlin.runtime.io.SdkByteBuffer -import aws.smithy.kotlin.runtime.io.SdkByteReadChannel -import aws.smithy.kotlin.runtime.io.readAvailable -import kotlinx.atomicfu.atomic -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlin.coroutines.CoroutineContext - -/** - * write as much of [outgoing] to [dest] as possible - */ -internal expect fun transferRequestBody(outgoing: SdkByteBuffer, dest: MutableBuffer) - -/** - * Implement's [HttpRequestBodyStream] which proxies an SDK request body channel [SdkByteReadChannel] - */ -@InternalSdkApi -public class ReadChannelBodyStream( - // the request body channel - private val bodyChan: SdkByteReadChannel, - private val callContext: CoroutineContext -) : HttpRequestBodyStream, CoroutineScope { - - private val producerJob = Job(callContext.job) - override val coroutineContext: CoroutineContext = callContext + producerJob - - private val currBuffer = atomic(null) - private val bufferChan = Channel(Channel.UNLIMITED) - - init { - producerJob.invokeOnCompletion { cause -> - bodyChan.cancel(cause) - } - } - - // lie - CRT tries to control this via normal seek operations (e.g. when they calculate a hash for signing - // they consume the aws_input_stream and then seek to the beginning). Instead we either support creating - // a new read channel or we don't. At this level we don't care, consumers of this type need to understand - // and handle these concerns. - override fun resetPosition(): Boolean = true - - override fun sendRequestBody(buffer: MutableBuffer): Boolean { - return doSendRequestBody(buffer).also { if (it) producerJob.complete() } - } - - @OptIn(ExperimentalCoroutinesApi::class) - private fun doSendRequestBody(buffer: MutableBuffer): Boolean { - // ensure the request context hasn't been cancelled - callContext.ensureActive() - var outgoing = currBuffer.getAndSet(null) ?: bufferChan.tryReceive().getOrNull() - - if (bodyChan.availableForRead > 0 && outgoing == null) { - // NOTE: It is critical that the coroutine launched doesn't actually suspend because it will never - // get a chance to resume. The CRT will consume the dispatcher thread until the data has been read - // completely. We could launch one of the coroutines into a different dispatcher but this won't work - // on platforms (e.g. JS) that don't have multiple threads. Essentially the CRT will starve - // the dispatcher and not allow other coroutines to make progress. - // see: https://github.com/awslabs/aws-sdk-kotlin/issues/282 - // - // TODO - we could get rid of this extra copy + coroutine if readAvailable() had a non-suspend version - // see: https://youtrack.jetbrains.com/issue/KTOR-2772 - // - // To get around this, if there is data to read we launch a coroutine UNDISPATCHED so that it runs - // immediately in the current thread. The coroutine will fill the buffer but won't suspend because - // we know data is available. - launch(start = CoroutineStart.UNDISPATCHED) { - val sdkBuffer = SdkByteBuffer(bodyChan.availableForRead.toULong()) - bodyChan.readAvailable(sdkBuffer) - bufferChan.send(sdkBuffer) - }.invokeOnCompletion { cause -> - if (cause != null) { - producerJob.completeExceptionally(cause) - bufferChan.close(cause) - } - } - } - - if (bodyChan.availableForRead == 0 && bodyChan.isClosedForRead) { - bufferChan.close() - } - - if (outgoing == null) { - if (bufferChan.isClosedForReceive) { - return true - } - - outgoing = bufferChan.tryReceive().getOrNull() ?: return false - } - - transferRequestBody(outgoing, buffer) - - if (outgoing.readRemaining > 0u) { - currBuffer.value = outgoing - } - - return bufferChan.isClosedForReceive && currBuffer.value == null - } -} diff --git a/aws-runtime/crt-util/common/src/aws/sdk/kotlin/runtime/crt/SdkDefaultIO.kt b/aws-runtime/crt-util/common/src/aws/sdk/kotlin/runtime/crt/SdkDefaultIO.kt deleted file mode 100644 index 5f9f510c4d4..00000000000 --- a/aws-runtime/crt-util/common/src/aws/sdk/kotlin/runtime/crt/SdkDefaultIO.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.crt - -import aws.sdk.kotlin.crt.io.* -import aws.sdk.kotlin.runtime.InternalSdkApi - -// FIXME - this should default to number of processors -private const val DEFAULT_EVENT_LOOP_THREAD_COUNT: Int = 1 - -/** - * Default (CRT) IO used by the SDK when not configured manually/directly - */ -@InternalSdkApi -public object SdkDefaultIO { - /** - * The default event loop group to run IO on - */ - public val EventLoop: EventLoopGroup by lazy { - // TODO - can we register shutdown in appropriate runtimes (e.g. jvm: addShutdown, native: atexit(), etc) when/if these lazy block(s) run? - EventLoopGroup(DEFAULT_EVENT_LOOP_THREAD_COUNT) - } - - /** - * The default host resolver to resolve DNS queries with - */ - public val HostResolver: HostResolver by lazy { - HostResolver(EventLoop) - } - - /** - * The default client bootstrap - */ - public val ClientBootstrap: ClientBootstrap by lazy { - ClientBootstrap(EventLoop, HostResolver) - } - - /** - * The default TLS context - */ - public val TlsContext: TlsContext by lazy { - TlsContext(TlsContextOptions.defaultClient()) - } -} diff --git a/aws-runtime/crt-util/common/test/aws/sdk/kotlin/runtime/crt/HttpTest.kt b/aws-runtime/crt-util/common/test/aws/sdk/kotlin/runtime/crt/HttpTest.kt deleted file mode 100644 index 5bafcbdfa3b..00000000000 --- a/aws-runtime/crt-util/common/test/aws/sdk/kotlin/runtime/crt/HttpTest.kt +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.crt - -import aws.smithy.kotlin.runtime.http.HttpMethod -import aws.smithy.kotlin.runtime.http.Protocol -import aws.smithy.kotlin.runtime.http.encodedPath -import aws.smithy.kotlin.runtime.http.parameters -import aws.smithy.kotlin.runtime.http.request.HttpRequestBuilder -import aws.smithy.kotlin.runtime.http.request.headers -import aws.smithy.kotlin.runtime.http.request.url -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertTrue -import aws.sdk.kotlin.crt.http.Headers as HeadersCrt -import aws.sdk.kotlin.crt.http.HttpRequest as HttpRequestCrt - -class HttpTest { - @Test - fun testRequestBuilderUpdate() { - // test updating HttpRequestBuilder from a (signed) crt request - - val builder = HttpRequestBuilder().apply { - method = HttpMethod.POST - url { - scheme = Protocol.HTTPS - host = "test.com" - port = 3000 - path = "/foo/bar/baz" - parameters { - append("foo", "bar") - } - } - - headers { - append("k1", "v1") - append("k2", "v3") - } - } - - // build a slightly modified crt request (e.g. after signing new headers or query params will be present) - val crtHeaders = HeadersCrt.build { - append("k1", "v1") - append("k1", "v2") - append("k2", "v3") - append("k3", "v4") - } - val crtRequest = HttpRequestCrt("POST", "/foo/bar/baz?foo=bar&baz=quux", crtHeaders, null) - - builder.update(crtRequest) - - // crt request doesn't have all the same elements (e.g. host/scheme) since some of them live off - // HttpConnectionManager for instance - // ensure we don't overwrite the originals - assertEquals("test.com", builder.url.host) - assertEquals(Protocol.HTTPS, builder.url.scheme) - - // see that the crt headers are populated in the builder - crtHeaders.entries().forEach { entry -> - entry.value.forEach { value -> - assertTrue(builder.headers.contains(entry.key, value), "expected header pair: ${entry.key}: $value") - } - } - - assertEquals("/foo/bar/baz", builder.url.path) - - assertTrue(builder.url.parameters.contains("foo", "bar")) - assertTrue(builder.url.parameters.contains("baz", "quux")) - } - - @Test - fun testRequestBuilderUpdateNoQuery() { - val builder = HttpRequestBuilder().apply { - method = HttpMethod.POST - url { - scheme = Protocol.HTTPS - host = "test.com" - path = "/foo" - } - } - - // build a slightly modified crt request (e.g. after signing new headers or query params will be present) - val crtHeaders = HeadersCrt.build { append("k1", "v1") } - val crtRequest = HttpRequestCrt("POST", "/foo", crtHeaders, null) - - builder.update(crtRequest) - - // crt request doesn't have all the same elements (e.g. host/scheme) since some of them live off - // HttpConnectionManager for instance - // ensure we don't overwrite the originals - assertEquals("test.com", builder.url.host) - assertEquals(Protocol.HTTPS, builder.url.scheme) - - assertEquals("/foo", builder.url.path) - } - - @Test - fun testEncodedPath() { - // test updating HttpRequestBuilder from a (signed) crt request with a percent-encoded path - - val builder = HttpRequestBuilder().apply { - method = HttpMethod.POST - url { - scheme = Protocol.HTTPS - host = "test.com" - port = 3000 - path = "/foo/bar/baz" - parameters { - append("foo", "/") - } - } - } - - // build a slightly modified crt request (e.g. after signing new headers or query params will be present) - val crtHeaders = HeadersCrt.build { } - val crtRequest = HttpRequestCrt("POST", builder.url.encodedPath, crtHeaders, null) - - builder.update(crtRequest) - - assertEquals("/foo/bar/baz", builder.url.path) - - val values = builder.url.parameters.getAll("foo")!! - assertEquals(1, values.size) - assertEquals("/", values.first()) - } -} diff --git a/aws-runtime/crt-util/common/test/aws/sdk/kotlin/runtime/crt/ReadChannelBodyStreamTest.kt b/aws-runtime/crt-util/common/test/aws/sdk/kotlin/runtime/crt/ReadChannelBodyStreamTest.kt deleted file mode 100644 index 609afb425d2..00000000000 --- a/aws-runtime/crt-util/common/test/aws/sdk/kotlin/runtime/crt/ReadChannelBodyStreamTest.kt +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.crt - -import aws.sdk.kotlin.crt.io.MutableBuffer -import aws.smithy.kotlin.runtime.io.SdkByteChannel -import aws.smithy.kotlin.runtime.io.SdkByteReadChannel -import aws.smithy.kotlin.runtime.io.writeUtf8 -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.Job -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.runTest -import kotlinx.coroutines.yield -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertFailsWith -import kotlin.test.assertFalse -import kotlin.test.assertTrue - -@OptIn(ExperimentalCoroutinesApi::class) -class ReadChannelBodyStreamTest { - private fun mutableBuffer(capacity: Int): Pair { - val dest = ByteArray(capacity) - return MutableBuffer.of(dest) to dest - } - - @Test - fun testClose() = runTest { - val chan = SdkByteChannel() - val (sendBuffer, _) = mutableBuffer(16) - - val stream = ReadChannelBodyStream(chan, coroutineContext) - // let the proxy get started - yield() - chan.close() - - // proxy should resume and signal close - yield() - - assertTrue(stream.sendRequestBody(sendBuffer)) - } - - @Test - fun testCancellation() = runTest { - val chan = SdkByteChannel() - val job = Job() - val stream = ReadChannelBodyStream(chan, coroutineContext + job) - - job.cancel() - - val (sendBuffer, _) = mutableBuffer(16) - assertFailsWith { - stream.sendRequestBody(sendBuffer) - } - } - - @Test - fun testReadFully() = runTest { - val data = byteArrayOf(1, 2, 3, 4, 5) - val chan = SdkByteReadChannel(data) - val stream = ReadChannelBodyStream(chan, coroutineContext) - yield() - - val (sendBuffer, sent) = mutableBuffer(16) - val streamDone = stream.sendRequestBody(sendBuffer) - assertTrue(streamDone) - assertTrue { - sent.sliceArray(data.indices).contentEquals(data) - } - } - - @Test - fun testPartialRead() = runTest { - val chan = SdkByteReadChannel("123456".encodeToByteArray()) - val stream = ReadChannelBodyStream(chan, coroutineContext) - yield() - - val (sendBuffer1, sent1) = mutableBuffer(3) - var streamDone = stream.sendRequestBody(sendBuffer1) - assertFalse(streamDone) - assertEquals("123", sent1.decodeToString()) - assertEquals(0, sendBuffer1.writeRemaining) - - val (sendBuffer2, sent2) = mutableBuffer(3) - streamDone = stream.sendRequestBody(sendBuffer2) - assertTrue(streamDone) - assertEquals("456", sent2.decodeToString()) - } - - @Test - fun testLargeTransfer() = runTest { - val chan = SdkByteChannel() - - val data = "foobar" - val n = 10_000 - launch { - val result = runCatching { - repeat(n) { - chan.writeUtf8(data) - } - } - - chan.close(result.exceptionOrNull()) - } - - val stream = ReadChannelBodyStream(chan, coroutineContext) - yield() - - var totalBytesRead = 0 - val sendSize = 16 - do { - val (sendBuffer, _) = mutableBuffer(sendSize) - val streamDone = stream.sendRequestBody(sendBuffer) - totalBytesRead += sendSize - sendBuffer.writeRemaining - yield() - } while (!streamDone) - - val expected = data.length * n - assertEquals(expected, totalBytesRead) - } -} diff --git a/aws-runtime/crt-util/jvm/src/aws/sdk/kotlin/runtime/crt/RequestUtilsJVM.kt b/aws-runtime/crt-util/jvm/src/aws/sdk/kotlin/runtime/crt/RequestUtilsJVM.kt deleted file mode 100644 index 1c43d9c605a..00000000000 --- a/aws-runtime/crt-util/jvm/src/aws/sdk/kotlin/runtime/crt/RequestUtilsJVM.kt +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.crt - -import aws.sdk.kotlin.crt.io.MutableBuffer -import aws.smithy.kotlin.runtime.io.SdkByteBuffer -import aws.smithy.kotlin.runtime.io.readAvailable - -internal actual fun transferRequestBody(outgoing: SdkByteBuffer, dest: MutableBuffer) { - outgoing.readAvailable(dest.buffer) -} diff --git a/aws-runtime/http-client-engine-crt/build.gradle.kts b/aws-runtime/http-client-engine-crt/build.gradle.kts deleted file mode 100644 index b1d4cb98453..00000000000 --- a/aws-runtime/http-client-engine-crt/build.gradle.kts +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ -buildscript { - val atomicFuVersion: String by project - repositories { - mavenCentral() - } - - dependencies { - classpath("org.jetbrains.kotlinx:atomicfu-gradle-plugin:$atomicFuVersion") - } -} - -apply(plugin = "kotlinx-atomicfu") - -description = "HTTP client engine backed by CRT" -extra["displayName"] = "AWS :: SDK :: Kotlin :: HTTP" -extra["moduleName"] = "aws.sdk.kotlin.runtime.http.engine.crt" - -val smithyKotlinVersion: String by project -val coroutinesVersion: String by project -val atomicFuVersion: String by project - -kotlin { - sourceSets { - commonMain { - dependencies { - api(project(":aws-runtime:aws-core")) - api("aws.smithy.kotlin:http:$smithyKotlinVersion") - implementation("aws.smithy.kotlin:logging:$smithyKotlinVersion") - implementation(project(":aws-runtime:crt-util")) - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion") - - implementation("org.jetbrains.kotlinx:atomicfu:$atomicFuVersion") - } - } - - commonTest { - dependencies { - implementation(project(":aws-runtime:testing")) - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:$coroutinesVersion") - implementation("aws.smithy.kotlin:http-test:$smithyKotlinVersion") - } - } - - all { - languageSettings.optIn("aws.smithy.kotlin.runtime.util.InternalApi") - languageSettings.optIn("aws.sdk.kotlin.runtime.InternalSdkApi") - } - } -} diff --git a/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/AbstractBufferedReadChannel.kt b/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/AbstractBufferedReadChannel.kt deleted file mode 100644 index 27b58bd6833..00000000000 --- a/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/AbstractBufferedReadChannel.kt +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.http.engine.crt - -import aws.sdk.kotlin.crt.io.Buffer -import aws.smithy.kotlin.runtime.io.SdkByteBuffer -import aws.smithy.kotlin.runtime.io.bytes -import kotlinx.atomicfu.AtomicRef -import kotlinx.atomicfu.atomic -import kotlinx.atomicfu.update -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.ClosedReceiveChannelException -import kotlinx.coroutines.sync.Mutex -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException - -internal data class ClosedSentinel(val cause: Throwable?) - -/** - * Abstract base class that platform implementations should inherit from - */ -@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) -internal abstract class AbstractBufferedReadChannel( - // function invoked every time n bytes are read - private val onBytesRead: (n: Int) -> Unit -) : BufferedReadChannel { - - // NOTE: the channel is configured as unlimited but will always be constrained by the window size such - // that there are only ever WINDOW_SIZE _bytes_ in-flight at any given time - private val segments = Channel(Channel.UNLIMITED) - - private val currSegment: AtomicRef = atomic(null) - - private val readOp: AtomicRef?> = atomic(null) - private val readOpMutex = Mutex(locked = false) - - private val _closed: AtomicRef = atomic(null) - protected val closed: ClosedSentinel? - get() = _closed.value - - private val _availableForRead = atomic(0) - - override val isClosedForWrite: Boolean - get() = segments.isClosedForSend - - override val isClosedForRead: Boolean - get() = availableForRead <= 0 && closed != null && segments.isClosedForReceive - - override val availableForRead: Int - get() = _availableForRead.value - - /** - * Suspend reading until at least one byte is available to read or the channel is closed. If there is already at - * least one byte available, this function will return without suspension. - */ - protected suspend fun readSuspend(): Boolean { - // can fulfill immediately without suspension - if (availableForRead > 0) return true - - readOpMutex.lock() - - closed?.let { closed -> - // if already closed - rethrow - closed.cause?.let { rethrowClosed(it) } - - // no more data is coming - readOpMutex.unlock() - return availableForRead > 0 - } - - if (availableForRead > 0) { - readOpMutex.unlock() - return true - } - - return suspendCancellableCoroutine { cont -> - val success = readOp.compareAndSet(null, cont) - readOpMutex.unlock() - check(success) { "Read operation already in progress" } - } - } - - /** - * Decrease the amount of bytes available for reading and notify the callback - */ - @Suppress("NOTHING_TO_INLINE") - private inline fun markBytesConsumed(size: Int) { - // NOTE: +/- operators ARE atomic - _availableForRead -= size - onBytesRead(size) - } - - override suspend fun readRemaining(limit: Int): ByteArray { - val buffer = SdkByteBuffer(minOf(availableForRead, limit).toULong()) - - val consumed = readAsMuchAsPossible(buffer, limit) - - return if (consumed >= limit) { - buffer.bytes() - } else { - readRemainingSuspend(buffer, limit - consumed) - } - } - - protected fun readAsMuchAsPossible(dest: SdkByteBuffer, limit: Int): Int { - var consumed = 0 - var remaining = limit - - while (availableForRead > 0 && remaining > 0) { - val segment = currSegment.getAndSet(null) ?: segments.tryReceive().getOrNull() ?: break - - val rc = segment.copyTo(dest, remaining) - consumed += rc - remaining = limit - consumed - - markBytesConsumed(rc) - - if (segment.readRemaining > 0u) { - currSegment.update { segment } - } - } - - return consumed - } - - private suspend fun readRemainingSuspend(buffer: SdkByteBuffer, limit: Int): ByteArray { - check(currSegment.value == null) { "current segment should be drained already" } - - var consumed = 0 - - for (segment in segments) { - val remaining = limit - consumed - val rc = segment.copyTo(buffer, remaining) - consumed += rc - - markBytesConsumed(rc) - - if (remaining <= 0) { - if (segment.readRemaining > 0u) { - currSegment.update { segment } - } - break - } - } - - return buffer.bytes() - } - - private fun readAsMuchAsPossible(dest: ByteArray, offset: Int, length: Int): Int { - var consumed = 0 - var currOffset = offset - var remaining = length - - while (availableForRead > 0 && remaining > 0) { - val segment = currSegment.getAndSet(null) ?: segments.tryReceive().getOrNull() ?: break - - val rc = segment.copyTo(dest, currOffset, remaining) - consumed += rc - currOffset += rc - remaining = length - consumed - - markBytesConsumed(rc) - - if (segment.readRemaining > 0u) { - currSegment.update { segment } - } - } - - return consumed - } - - override suspend fun readFully(sink: ByteArray, offset: Int, length: Int) { - val rc = readAsMuchAsPossible(sink, offset, length) - if (rc < length) { - readFullySuspend(sink, offset + rc, length - rc) - } - } - - private suspend fun readFullySuspend(dest: ByteArray, offset: Int, length: Int) { - var consumed = 0 - var currOffset = offset - var remaining = length - - do { - if (!readSuspend()) { - throw ClosedReceiveChannelException("Unexpeced EOF: expected $remaining more bytes") - } - - val rc = readAsMuchAsPossible(dest, currOffset, remaining) - consumed += rc - currOffset += rc - remaining = length - consumed - } while (remaining > 0) - } - - override suspend fun readAvailable(sink: ByteArray, offset: Int, length: Int): Int { - val consumed = readAsMuchAsPossible(sink, offset, length) - return when { - consumed == 0 && closed != null -> -1 - consumed > 0 || length == 0 -> consumed - else -> readAvailableSuspend(sink, offset, length) - } - } - - private suspend fun readAvailableSuspend(dest: ByteArray, offset: Int, length: Int): Int { - if (!readSuspend()) { - return -1 - } - return readAvailable(dest, offset, length) - } - - override fun write(data: Buffer) { - // TODO - we could pool these allocations - val bytesIn = ByteArray(data.len) - val wc = data.copyTo(bytesIn) - check(wc == bytesIn.size) { "short read: copied $wc; expected: ${bytesIn.size} " } - - // TODO - only emit full segments or partial when closed? - - val segment = newReadableSegment(bytesIn) - val result = segments.trySend(segment) - check(result.isSuccess) { "failed to queue segment" } - - // advertise bytes available - _availableForRead.getAndAdd(bytesIn.size) - - readOpMutex.withSpinLock { - readOp.getAndSet(null)?.resume(true) - } - } - - private inline fun Mutex.withSpinLock(block: () -> T): T { - while (!tryLock()) { - // spin - } - return try { - block() - } finally { - unlock() - } - } - - override suspend fun awaitContent() { - readSuspend() - } - - override fun cancel(cause: Throwable?): Boolean { - val success = _closed.compareAndSet(null, ClosedSentinel(cause)) - if (!success) return false - - segments.close(cause) - - readOp.getAndSet(null)?.let { cont -> - if (cause != null) { - cont.resumeWithException(cause) - } else { - cont.resume(availableForRead > 0) - } - } - - return true - } - - override fun close() { - cancel(null) - } - - private fun rethrowClosed(cause: Throwable): Nothing { - throw cause - } -} diff --git a/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/BufferedReadChannel.kt b/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/BufferedReadChannel.kt deleted file mode 100644 index cc303fb66c2..00000000000 --- a/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/BufferedReadChannel.kt +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.http.engine.crt - -import aws.sdk.kotlin.crt.io.Buffer -import aws.smithy.kotlin.runtime.io.SdkByteReadChannel - -/** - * Create a new [BufferedReadChannel] that invokes [onBytesRead] as data is consumed - */ -internal expect fun bufferedReadChannel(onBytesRead: (n: Int) -> Unit): BufferedReadChannel - -/** - * A buffered [SdkByteReadChannel] that can always satisfy writing without blocking / suspension - */ -internal interface BufferedReadChannel : SdkByteReadChannel { - /** - * Write the data from the buffer to the channel IMMEDIATELY without blocking or suspension - */ - fun write(data: Buffer) -} diff --git a/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt b/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt deleted file mode 100644 index 6f01ff011d7..00000000000 --- a/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.http.engine.crt - -import aws.sdk.kotlin.crt.http.* -import aws.sdk.kotlin.crt.io.* -import aws.sdk.kotlin.runtime.ClientException -import aws.sdk.kotlin.runtime.crt.SdkDefaultIO -import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine -import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineBase -import aws.smithy.kotlin.runtime.http.engine.callContext -import aws.smithy.kotlin.runtime.http.request.HttpRequest -import aws.smithy.kotlin.runtime.http.response.HttpCall -import aws.smithy.kotlin.runtime.logging.Logger -import aws.smithy.kotlin.runtime.time.Instant -import kotlinx.coroutines.job -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import kotlinx.coroutines.withTimeoutOrNull - -internal const val DEFAULT_WINDOW_SIZE_BYTES: Int = 16 * 1024 - -/** - * [HttpClientEngine] based on the AWS Common Runtime HTTP client - */ -public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientEngineBase("crt") { - public constructor() : this(CrtHttpEngineConfig.Default) - - public companion object { - public operator fun invoke(block: CrtHttpEngineConfig.Builder.() -> Unit): CrtHttpEngine = CrtHttpEngine(CrtHttpEngineConfig.Builder().apply(block).build()) - } - private val logger = Logger.getLogger() - - private val customTlsContext: TlsContext? = if (config.alpn.isNotEmpty() && config.tlsContext == null) { - val options = TlsContextOptionsBuilder().apply { - verifyPeer = true - alpn = config.alpn.joinToString(separator = ";") { it.protocolId } - }.build() - TlsContext(options) - } else { - null - } - - init { - if (config.socketReadTimeout != CrtHttpEngineConfig.Default.socketReadTimeout) { - logger.warn { "CrtHttpEngine does not support socketReadTimeout(${config.socketReadTimeout}); ignoring" } - } - if (config.socketWriteTimeout != CrtHttpEngineConfig.Default.socketWriteTimeout) { - logger.warn { "CrtHttpEngine does not support socketWriteTimeout(${config.socketWriteTimeout}); ignoring" } - } - } - - private val options = HttpClientConnectionManagerOptionsBuilder().apply { - clientBootstrap = config.clientBootstrap ?: SdkDefaultIO.ClientBootstrap - tlsContext = customTlsContext ?: config.tlsContext ?: SdkDefaultIO.TlsContext - manualWindowManagement = true - socketOptions = SocketOptions( - connectTimeoutMs = config.connectTimeout.inWholeMilliseconds.toInt() - ) - initialWindowSize = config.initialWindowSizeBytes - maxConnections = config.maxConnections.toInt() - maxConnectionIdleMs = config.connectionIdleTimeout.inWholeMilliseconds - } - - // connection managers are per host - private val connManagers = mutableMapOf() - private val mutex = Mutex() - - override suspend fun roundTrip(request: HttpRequest): HttpCall { - val callContext = callContext() - val manager = getManagerForUri(request.uri) - - // LIFETIME: connection will be released back to the pool/manager when - // the response completes OR on exception (both handled by the completion handler registered on the stream - // handler) - val conn = withTimeoutOrNull(config.connectionAcquireTimeout) { - manager.acquireConnection() - } ?: throw ClientException("timed out waiting for an HTTP connection to be acquired from the pool") - - val respHandler = SdkStreamResponseHandler(conn) - callContext.job.invokeOnCompletion { - logger.trace { "completing handler; cause=$it" } - // ensures the stream is driven to completion regardless of what the downstream consumer does - respHandler.complete() - } - - val reqTime = Instant.now() - val engineRequest = request.toCrtRequest(callContext) - - val stream = conn.makeRequest(engineRequest, respHandler) - stream.activate() - - val resp = respHandler.waitForResponse() - - return HttpCall(request, resp, reqTime, Instant.now(), callContext) - } - - override fun shutdown() { - // close all resources - // SAFETY: shutdown is only invoked once AND only after all requests have completed and no more are coming - connManagers.forEach { entry -> entry.value.close() } - customTlsContext?.close() - } - - private suspend fun getManagerForUri(uri: Uri): HttpClientConnectionManager = mutex.withLock { - connManagers.getOrPut(uri.authority) { - HttpClientConnectionManager(options.apply { this.uri = uri }.build()) - } - } -} diff --git a/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/CrtHttpEngineConfig.kt b/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/CrtHttpEngineConfig.kt deleted file mode 100644 index 1d47c913949..00000000000 --- a/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/CrtHttpEngineConfig.kt +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.http.engine.crt - -import aws.sdk.kotlin.crt.io.ClientBootstrap -import aws.sdk.kotlin.crt.io.TlsContext -import aws.sdk.kotlin.runtime.InternalSdkApi -import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfig - -@InternalSdkApi -public class CrtHttpEngineConfig private constructor(builder: Builder) : HttpClientEngineConfig(builder) { - public companion object { - /** - * The default engine config. Most clients should use this. - */ - public val Default: CrtHttpEngineConfig = CrtHttpEngineConfig(Builder()) - - public operator fun invoke(block: Builder.() -> Unit): CrtHttpEngineConfig = - Builder().apply(block).build() - } - - /** - * The amount of data that can be buffered before reading from the socket will cease. Reading will - * resume as data is consumed. - */ - public val initialWindowSizeBytes: Int = builder.initialWindowSizeBytes - - /** - * The [ClientBootstrap] to use for the engine. By default it is a shared instance. - */ - public var clientBootstrap: ClientBootstrap? = builder.clientBootstrap - - /** - * The TLS context to use. By default it is a shared instance. - */ - public var tlsContext: TlsContext? = builder.tlsContext - - public class Builder : HttpClientEngineConfig.Builder() { - /** - * Set the amount of data that can be buffered before reading from the socket will cease. Reading will - * resume as data is consumed. - */ - public var initialWindowSizeBytes: Int = DEFAULT_WINDOW_SIZE_BYTES - - /** - * Set the [ClientBootstrap] to use for the engine. By default it is a shared instance. - */ - public var clientBootstrap: ClientBootstrap? = null - - /** - * Set the TLS context to use. By default it is a shared instance. - */ - public var tlsContext: TlsContext? = null - - internal fun build(): CrtHttpEngineConfig = CrtHttpEngineConfig(this) - } -} diff --git a/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/RequestUtil.kt b/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/RequestUtil.kt deleted file mode 100644 index 060a7f7aba6..00000000000 --- a/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/RequestUtil.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.http.engine.crt - -import aws.sdk.kotlin.crt.http.HeadersBuilder -import aws.sdk.kotlin.crt.http.HttpRequestBodyStream -import aws.sdk.kotlin.crt.io.Protocol -import aws.sdk.kotlin.crt.io.Uri -import aws.sdk.kotlin.crt.io.UserInfo -import aws.sdk.kotlin.runtime.crt.ReadChannelBodyStream -import aws.smithy.kotlin.runtime.http.HttpBody -import aws.smithy.kotlin.runtime.http.request.HttpRequest -import kotlin.coroutines.CoroutineContext - -private const val CONTENT_LENGTH_HEADER: String = "Content-Length" - -internal val HttpRequest.uri: Uri - get() { - val sdkUrl = this.url - return Uri.build { - scheme = Protocol.createOrDefault(sdkUrl.scheme.protocolName) - host = sdkUrl.host - port = sdkUrl.port - userInfo = sdkUrl.userInfo?.let { UserInfo(it.username, it.password) } - // the rest is part of each individual request, manager only needs the host info - } - } - -internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.kotlin.crt.http.HttpRequest { - val body = this.body - val bodyStream = when (body) { - is HttpBody.Streaming -> ReadChannelBodyStream(body.readFrom(), callContext) - is HttpBody.Bytes -> HttpRequestBodyStream.fromByteArray(body.bytes()) - else -> null - } - - val crtHeaders = HeadersBuilder() - with(crtHeaders) { - headers.forEach { key, values -> appendAll(key, values) } - } - - val bodyLen = body.contentLength - val contentLength = when { - bodyLen != null -> if (bodyLen > 0) bodyLen.toString() else null - else -> headers[CONTENT_LENGTH_HEADER] - } - contentLength?.let { crtHeaders.append(CONTENT_LENGTH_HEADER, it) } - - return aws.sdk.kotlin.crt.http.HttpRequest(method.name, url.encodedPath, crtHeaders.build(), bodyStream) -} diff --git a/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/SdkStreamResponseHandler.kt b/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/SdkStreamResponseHandler.kt deleted file mode 100644 index 2497bec4a42..00000000000 --- a/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/SdkStreamResponseHandler.kt +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.http.engine.crt - -import aws.sdk.kotlin.crt.CRT -import aws.sdk.kotlin.crt.http.* -import aws.sdk.kotlin.crt.io.Buffer -import aws.sdk.kotlin.runtime.ClientException -import aws.smithy.kotlin.runtime.http.* -import aws.smithy.kotlin.runtime.http.HeadersBuilder -import aws.smithy.kotlin.runtime.http.response.HttpResponse -import aws.smithy.kotlin.runtime.io.SdkByteReadChannel -import aws.smithy.kotlin.runtime.logging.Logger -import kotlinx.atomicfu.locks.reentrantLock -import kotlinx.atomicfu.locks.withLock -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.channels.Channel - -/** - * Implements the CRT stream response interface which proxies the response from the CRT to the SDK - * @param conn The HTTP connection used to make the request. Will be closed when the response handler completes - */ -@OptIn(ExperimentalCoroutinesApi::class) -internal class SdkStreamResponseHandler( - private val conn: HttpClientConnection -) : HttpStreamResponseHandler { - // TODO - need to cancel the stream when the body is closed from the caller side early. - // There is no great way to do that currently without either (1) closing the connection or (2) throwing an - // exception from a callback such that AWS_OP_ERROR is returned. Wait for HttpStream to have explicit cancellation - - private val logger = Logger.getLogger() - private val responseReady = Channel(1) - private val headers = HeadersBuilder() - - private var sdkBody: BufferedReadChannel? = null - - private val lock = reentrantLock() // protects crtStream and cancelled state - private var crtStream: HttpStream? = null - // if the (coroutine) job is completed before the stream's onResponseComplete callback is - // invoked (for any reason) we consider the stream "cancelled" - private var cancelled = false - - private val Int.isMainHeadersBlock: Boolean - get() = when (this) { - HttpHeaderBlock.MAIN.blockType -> true - else -> false - } - - private var streamCompleted = false - - /** - * Called by the response read channel as data is consumed - * @param size the number of bytes consumed - */ - private fun onDataConsumed(size: Int) { - lock.withLock { - crtStream?.incrementWindow(size) - } - } - - override fun onResponseHeaders( - stream: HttpStream, - responseStatusCode: Int, - blockType: Int, - nextHeaders: List? - ) { - if (!blockType.isMainHeadersBlock) return - - nextHeaders?.forEach { - headers.append(it.name, it.value) - } - } - - private fun createHttpResponseBody(contentLength: Long): HttpBody { - sdkBody = bufferedReadChannel(::onDataConsumed) - return object : HttpBody.Streaming() { - override val contentLength: Long = contentLength - override fun readFrom(): SdkByteReadChannel { - return sdkBody!! - } - } - } - - // signal response ready and engine can proceed (all that is required is headers, body is consumed asynchronously) - private fun signalResponse(stream: HttpStream) { - // already signalled - if (responseReady.isClosedForSend) return - - val transferEncoding = headers["Transfer-Encoding"]?.lowercase() - val chunked = transferEncoding == "chunked" - val contentLength = headers["Content-Length"]?.toLong() ?: 0 - val status = HttpStatusCode.fromValue(stream.responseStatusCode) - - val hasBody = (contentLength > 0 || chunked) && - (status !in listOf(HttpStatusCode.NotModified, HttpStatusCode.NoContent)) && - !status.isInformational() - - val body = when (hasBody) { - false -> HttpBody.Empty - true -> createHttpResponseBody(contentLength) - } - - val resp = HttpResponse( - status, - headers.build(), - body - ) - - val result = responseReady.trySend(resp) - check(result.isSuccess) { "signalling response failed, result was: ${result.exceptionOrNull()}" } - responseReady.close() - } - - override fun onResponseHeadersDone(stream: HttpStream, blockType: Int) { - if (!blockType.isMainHeadersBlock) return - signalResponse(stream) - } - - override fun onResponseBody(stream: HttpStream, bodyBytesIn: Buffer): Int { - val isCancelled = lock.withLock { - crtStream = stream - cancelled - } - - // short circuit, stop buffering data and discard remaining incoming bytes - if (isCancelled) return bodyBytesIn.len - - // we should have created a response channel if we expected a body - val sdkRespChan = checkNotNull(sdkBody) { "unexpected response body" } - sdkRespChan.write(bodyBytesIn) - - // explicit window management is done in BufferedReadChannel which calls `onDataConsumed` - // as data is read from the channel - return 0 - } - - override fun onResponseComplete(stream: HttpStream, errorCode: Int) { - // stream is only valid until the end of this callback, ensure any further data being read downstream - // doesn't call incrementWindow on a resource that has been free'd - lock.withLock { - crtStream = null - streamCompleted = true - } - - // close the body channel - if (errorCode != 0) { - val errorDescription = CRT.errorString(errorCode) - val ex = ClientException("CrtHttpEngine::response failed: ec=$errorCode; description=$errorDescription") - responseReady.close(ex) - sdkBody?.cancel(ex) - } else { - // closing the channel to indicate no more data will be sent - sdkBody?.close() - // ensure a response was signalled (will close the channel on it's own if it wasn't already sent) - signalResponse(stream) - } - } - - internal suspend fun waitForResponse(): HttpResponse { - return responseReady.receive() - } - - /** - * Invoked only after the consumer is finished with the response and it is safe to cleanup resources - */ - internal fun complete() { - // We have no way of cancelling the stream, we have to drive it to exhaustion OR close the connection. - // At this point we know it's safe to release resources so if the stream hasn't completed yet - // we forcefully shutdown the connection. This can happen when the stream's window is full and it's waiting - // on the window to be incremented to proceed (i.e. the user didn't consume the stream for whatever reason - // and more data is pending arrival). It can also happen if the coroutine for this request is cancelled - // before onResponseComplete fires. - lock.withLock { - val forceClose = !streamCompleted - - if (forceClose) { - logger.debug { "stream did not complete before job, forcing connection shutdown! handler=$this; conn=$conn; stream=$crtStream" } - conn.shutdown() - cancelled = true - } - - conn.close() - } - } -} diff --git a/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/Segment.kt b/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/Segment.kt deleted file mode 100644 index 9134d173d3b..00000000000 --- a/aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/Segment.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.http.engine.crt - -import aws.smithy.kotlin.runtime.io.SdkByteBuffer -import aws.smithy.kotlin.runtime.io.readFully - -internal typealias Segment = SdkByteBuffer - -/** - * Create a segment from the given src [ByteArray] and mark the entire contents readable - */ -internal fun newReadableSegment(src: ByteArray): Segment = Segment.of(src).apply { advance(src.size.toULong()) } - -internal fun Segment.copyTo(dest: SdkByteBuffer, limit: Int = Int.MAX_VALUE): Int { - check(readRemaining > 0u) { "nothing left to read from segment" } - val wc = minOf(readRemaining, limit.toULong()) - readFully(dest, wc) - return wc.toInt() -} - -internal fun Segment.copyTo(dest: ByteArray, offset: Int = 0, length: Int = dest.size - offset): Int { - check(readRemaining > 0u) { "nothing left to read from segment" } - val wc = minOf(length.toULong(), readRemaining).toInt() - readFully(dest, offset, wc) - return wc -} diff --git a/aws-runtime/http-client-engine-crt/common/test/aws/sdk/kotlin/runtime/http/engine/crt/BufferedReadChannelTest.kt b/aws-runtime/http-client-engine-crt/common/test/aws/sdk/kotlin/runtime/http/engine/crt/BufferedReadChannelTest.kt deleted file mode 100644 index d26c1331374..00000000000 --- a/aws-runtime/http-client-engine-crt/common/test/aws/sdk/kotlin/runtime/http/engine/crt/BufferedReadChannelTest.kt +++ /dev/null @@ -1,548 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.http.engine.crt - -import aws.sdk.kotlin.crt.io.byteArrayBuffer -import aws.smithy.kotlin.runtime.io.readByte -import aws.smithy.kotlin.runtime.testing.ManualDispatchTestBase -import aws.smithy.kotlin.runtime.util.Sha256 -import aws.smithy.kotlin.runtime.util.encodeToHex -import aws.smithy.kotlin.runtime.util.sha256 -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.ClosedReceiveChannelException -import kotlinx.coroutines.test.runTest -import kotlin.random.Random -import kotlin.test.* -import kotlin.test.AfterTest -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertFails -import kotlin.test.assertFailsWith -import kotlin.test.assertFalse -import kotlin.test.assertTrue - -internal fun BufferedReadChannel.write(bytes: ByteArray) { - write(byteArrayBuffer(bytes)) -} - -internal fun BufferedReadChannel.write(str: String) { - write(str.encodeToByteArray()) -} - -// test suite adapted from: https://github.com/ktorio/ktor/blob/main/ktor-io/common/test/io/ktor/utils/io/ByteBufferChannelScenarioTest.kt -@OptIn(ExperimentalCoroutinesApi::class) -class BufferedReadChannelTest : ManualDispatchTestBase() { - private var totalRead: Int = 0 - private val ch by lazy { - bufferedReadChannel { size -> - totalRead += size - } - } - - private fun BufferedReadChannel.write(bytes: ByteArray) { - write(byteArrayBuffer(bytes)) - } - - private fun BufferedReadChannel.write(str: String) { - write(str.encodeToByteArray()) - } - - class TestException : RuntimeException("test exception") - - @AfterTest - fun finish() { - ch.cancel(CancellationException("Test finished")) - } - - @Test - fun testReadBeforeAvailable() = runTest { - // test readAvailable() suspends when no data is available - expect(1) - - val data = "1234" - - launch { - expect(3) - val buf = ByteArray(16) - - // should suspend - val rc = ch.readAvailable(buf) - expect(5) - assertEquals(data.length, rc) - } - - expect(2) - yield() - - expect(4) - - // read continuation should be queued to resume - ch.write(data) - yield() - - finish(6) - } - - @Test - fun testReadBeforeAvailableFully() = runTest { - // test readFully() suspends when no data is available - expect(1) - - val data = "1234" - - launch { - expect(3) - val buf = ByteArray(16) - - // should suspend - ch.readFully(buf, length = 4) - expect(5) - } - - expect(2) - yield() - - expect(4) - - // read continuation should be queued to resume - ch.write(data) - yield() - - finish(6) - } - - @Test - fun testReadAfterAvailable() = runTest { - // test readAvailable() does NOT suspend when data is available - expect(1) - ch.write("1234") - launch { - expect(3) - - val buf = ByteArray(16) - // should NOT suspend - val rc = ch.readAvailable(buf) - - expect(4) - assertEquals(4, rc) - - expect(5) - } - - expect(2) - yield() - finish(6) - } - - @Test - fun testReadAfterAvailableFully() = runTest { - // test readFully() does NOT suspend when data is available to satisfy the request - expect(1) - - ch.write("1234") - - launch { - expect(3) - - val buf = ByteArray(16) - // should NOT suspend - ch.readFully(buf, length = 4) - - expect(4) - } - - expect(2) - yield() - - finish(5) - } - - @Test - fun testReadFullySuspends() = runTest { - // test readFully() suspends when not enough data is available to satisfy the request - expect(1) - - ch.write("1234") - - launch { - expect(3) - - val buf = ByteArray(16) - // should suspend - ch.readFully(buf, length = 8) - - expect(6) - } - - expect(2) - yield() - expect(4) - ch.write("5678") - - expect(5) - yield() - - finish(7) - } - - @Test - fun testReadToEmpty() = runTest { - // test readAvailable() does not suspend when length is zero - // (in practice you wouldn't set 0 but it could happen when combined with an offset) - expect(1) - - val buf = ByteArray(16) - val rc = ch.readAvailable(buf, length = 0) - expect(2) - assertEquals(0, rc) - - finish(3) - } - - @Test - fun testReadToEmptyFromFailedChannel() = runTest { - expect(1) - ch.cancel(TestException()) - val buf = ByteArray(16) - val rc = ch.readAvailable(buf, length = 0) - expect(2) - assertEquals(-1, rc) - finish(3) - } - - @Test - fun testReadToEmptyFromClosedChannel() = runTest { - expect(1) - ch.close() - val buf = ByteArray(16) - val rc = ch.readAvailable(buf, length = 0) - expect(2) - assertEquals(-1, rc) - finish(3) - } - - @Test - fun testReadFullyFromFailedChannel() = runTest { - expect(1) - ch.cancel(TestException()) - assertFails { - val buf = ByteArray(1) - ch.readFully(buf) - } - finish(2) - } - - @Test - fun testReadFullyFromClosedChannel() = runTest { - expect(1) - ch.close() - assertFails { - val buf = ByteArray(1) - ch.readFully(buf) - } - finish(2) - } - - @Test - fun readPartialSegment() = runTest { - expect(1) - ch.write("1234") - launch { - expect(4) - ch.write("5678") - expect(5) - } - expect(2) - val buf = ByteArray(16) - val rc1 = ch.readAvailable(buf, length = 2) - expect(3) - assertEquals(2, rc1) - yield() - expect(6) - // second read should consume the remainder of the first write "34" + the entire second write - val rc2 = ch.readAvailable(buf, offset = 2) - assertEquals(6, rc2) - finish(7) - } - - @Test - fun testReadState() = runTest { - assertFalse(ch.isClosedForWrite) - assertFalse(ch.isClosedForRead) - assertEquals(0, ch.availableForRead) - ch.write("1234") - assertEquals(4, ch.availableForRead) - ch.close() - assertTrue(ch.isClosedForWrite) - assertFalse(ch.isClosedForRead) - - val buf = ByteArray(16) - val rc = ch.readAvailable(buf) - assertEquals(4, rc) - - assertEquals(0, ch.availableForRead) - assertTrue(ch.isClosedForRead) - } - - @Test - fun testReadRemaining() = runTest { - expect(1) - ch.write("1234") - launch { - expect(3) - val buf = ch.readRemaining() - assertEquals("1234", buf.decodeToString()) - expect(5) - } - - expect(2) - yield() - expect(4) - ch.close() - yield() - finish(6) - } - - @Test - fun testReadRemainingLimit() = runTest { - // should test partial segment reading - expect(1) - ch.write("123") - ch.write("456") - ch.write("789") - launch { - expect(3) - // should NOT suspend because of limit - val buf = ch.readRemaining(limit = 5) - assertEquals("12345", buf.decodeToString()) - expect(4) - } - - expect(2) - yield() - - expect(5) - ch.close() - - expect(6) - - assertEquals(4, ch.availableForRead) - // should NOT suspend because the channel is closed - val buf = ch.readRemaining() - assertEquals("6789", buf.decodeToString()) - - finish(7) - } - - @Test - fun testReadInProgress() = runTest { - expect(1) - launch { - expect(3) - val buf = ByteArray(16) - ch.readAvailable(buf) - } - expect(2) - yield() - expect(4) - - assertFailsWith("Read operation already in progress") { - val buf = ByteArray(16) - ch.readAvailable(buf) - } - ch.close() - finish(5) - } - - @Test - fun testReadFullyEof() = runTest { - expect(1) - ch.write("1234") - val buf = ByteArray(16) - launch { - expect(3) - assertFailsWith("Unexpeced EOF: expected 12 more bytes") { - ch.readFully(buf) - } - } - expect(2) - yield() - expect(4) - - ch.close() - finish(5) - } - - @Test - fun testResumeReadFromFailedChannel() = runTest { - expect(1) - - launch { - expect(3) - ch.cancel(TestException()) - } - - expect(2) - val buf = ByteArray(16) - assertFailsWith { - // should suspend and fail with the exception when resumed - ch.readAvailable(buf) - } - finish(4) - } - - @Test - fun testResumeReadAvailableFromClosedChannelNoContent() = runTest { - expect(1) - - launch { - expect(3) - ch.close() - } - - expect(2) - val buf = ByteArray(16) - val rc = ch.readAvailable(buf) - assertEquals(-1, rc) - finish(4) - } - - @Test - fun testReadAndWriteFully() = runTest { - val bytes = byteArrayOf(1, 2, 3, 4, 5) - ch.write(bytes) - val buf = ByteArray(5) - ch.readFully(buf) - assertTrue { buf.contentEquals(bytes) } - - ch.write(bytes) - val buf2 = ByteArray(4) - ch.readFully(buf2) - assertEquals(1, ch.availableForRead) - assertEquals(5, ch.readByte()) - ch.close() - - assertFails { - ch.readFully(buf) - } - } - - @Test - fun testLargeTransfer() = runTest { - val size = 262144 + 512 - launch { - ch.write(ByteArray(size)) - ch.close() - } - - val buf = ch.readRemaining() - assertEquals(size, buf.size) - } - - @OptIn(DelicateCoroutinesApi::class) - @Test - fun testWriteRaceCondition() = runTest { - var totalBytes = 0 - val channel = bufferedReadChannel { size -> totalBytes += size } - val writeJob = GlobalScope.async { - try { - val data = byteArrayOf(2) - repeat(1_000_000) { - channel.write(data) - } - channel.close() - } catch (ex: Exception) { - channel.cancel(ex) - throw ex - } - } - - val readJob = GlobalScope.async { - channel.readRemaining() - } - - writeJob.await() - readJob.await() - assertEquals(1_000_000, totalBytes) - } - - @OptIn(DelicateCoroutinesApi::class) - private suspend fun runReadSuspendIntegrityTest(reader: suspend (BufferedReadChannel, Int) -> String) { - // writer is setup to write random lengths and delay to cause the reader to enter a suspend loop - val data = ByteArray(16 * 1024 * 1024) { it.toByte() } - var totalBytes = 0 - val channel = bufferedReadChannel { size -> totalBytes += size } - val writeSha256 = GlobalScope.async { - var wcRemaining = data.size - var offset = 0 - val checksum = Sha256() - while (wcRemaining > 0) { - // random write sizes - val wc = minOf(wcRemaining, Random.nextInt(256, 8 * 1024)) - val slice = data.sliceArray(offset until offset + wc) - checksum.update(slice) - channel.write(slice) - offset += wc - wcRemaining -= wc - - if (wcRemaining % 256 == 0) { - delay(Random.nextLong(0, 10)) - } - } - - channel.close() - - checksum.digest().encodeToHex() - } - - val readSha256 = GlobalScope.async { reader(channel, data.size) } - - val origSha = data.sha256().encodeToHex() - val writeSha = writeSha256.await() - val readSha = readSha256.await() - assertEquals(origSha, writeSha) - assertEquals(origSha, readSha) - } - - @Test - fun testReadFullyIntegrity() = runTest { - // see https://github.com/awslabs/aws-sdk-kotlin/issues/526 - runReadSuspendIntegrityTest { channel, totalSize -> - val dest = ByteArray(totalSize) - channel.readFully(dest) - dest.sha256().encodeToHex() - } - } - - @Test - fun testReadAvailableIntegrity() = runTest { - runReadSuspendIntegrityTest { channel, totalSize -> - val checksum = Sha256() - var totalRead = 0 - while (!channel.isClosedForRead) { - val chunk = ByteArray(8 * 1024) - val rc = channel.readAvailable(chunk) - if (rc < 0) break - - totalRead += rc - val slice = if (rc != chunk.size) chunk.sliceArray(0 until rc) else chunk - checksum.update(slice) - } - - assertEquals(totalSize, totalRead) - checksum.digest().encodeToHex() - } - } - - @Test - fun testReadRemainingIntegrity() = runTest { - runReadSuspendIntegrityTest { channel, totalSize -> - val data = channel.readRemaining() - assertEquals(totalSize, data.size) - data.sha256().encodeToHex() - } - } -} diff --git a/aws-runtime/http-client-engine-crt/common/test/aws/sdk/kotlin/runtime/http/engine/crt/RequestConversionTest.kt b/aws-runtime/http-client-engine-crt/common/test/aws/sdk/kotlin/runtime/http/engine/crt/RequestConversionTest.kt deleted file mode 100644 index 8f352fcc3a2..00000000000 --- a/aws-runtime/http-client-engine-crt/common/test/aws/sdk/kotlin/runtime/http/engine/crt/RequestConversionTest.kt +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.http.engine.crt - -import aws.sdk.kotlin.runtime.crt.ReadChannelBodyStream -import aws.smithy.kotlin.runtime.content.ByteStream -import aws.smithy.kotlin.runtime.http.* -import aws.smithy.kotlin.runtime.http.content.ByteArrayContent -import aws.smithy.kotlin.runtime.http.request.HttpRequest -import aws.smithy.kotlin.runtime.io.SdkByteReadChannel -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel -import kotlin.coroutines.EmptyCoroutineContext -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertFalse - -class RequestConversionTest { - private fun byteStreamFromContents(contents: String): ByteStream { - return object : ByteStream.OneShotStream() { - override val contentLength: Long = contents.length.toLong() - override fun readFrom(): SdkByteReadChannel { - return SdkByteReadChannel(contents.encodeToByteArray()) - } - } - } - - @Test - fun testUri() { - val request = HttpRequest( - HttpMethod.GET, - Url.parse("https://test.aws.com?foo=bar"), - Headers.Empty, - HttpBody.Empty - ) - val uri = request.uri - assertEquals("https://test.aws.com", uri.toString()) - } - - @Test - fun testSdkToCrtRequestBytesBody() { - val body = ByteArrayContent("foobar".encodeToByteArray()) - val request = HttpRequest( - HttpMethod.POST, - Url.parse("https://test.aws.com?foo=bar"), - Headers.Empty, - body, - ) - - val crtRequest = request.toCrtRequest(EmptyCoroutineContext) - assertEquals("POST", crtRequest.method) - assertFalse(crtRequest.body is ReadChannelBodyStream) - } - - @Test - fun testSdkToCrtRequestStreamingBody() { - val stream = byteStreamFromContents("foobar") - val body = stream.toHttpBody() - val request = HttpRequest( - HttpMethod.POST, - Url.parse("https://test.aws.com?foo=bar"), - Headers.Empty, - body, - ) - - val testContext = EmptyCoroutineContext + Job() - val crtRequest = request.toCrtRequest(testContext) - assertEquals("POST", crtRequest.method) - val crtBody = crtRequest.body as ReadChannelBodyStream - crtBody.cancel() - } - - @Test - fun testEngineAddsContentLengthHeader() { - val stream = byteStreamFromContents("foobar") - val body = stream.toHttpBody() - val request = HttpRequest( - HttpMethod.POST, - Url.parse("https://test.aws.com?foo=bar"), - Headers.Empty, - body, - ) - - val testContext = EmptyCoroutineContext + Job() - val crtRequest = request.toCrtRequest(testContext) - assertEquals("6", crtRequest.headers["Content-Length"]) - - val crtBody = crtRequest.body as ReadChannelBodyStream - crtBody.cancel() - } - - @Test - fun testEngineDoesNotAddContentLengthHeaderForEmptyBody() { - val request = HttpRequest( - HttpMethod.POST, - Url.parse("https://test.aws.com?foo=bar"), - Headers.Empty, - HttpBody.Empty - ) - - val testContext = EmptyCoroutineContext + Job() - val crtRequest = request.toCrtRequest(testContext) - assertFalse(crtRequest.headers.contains("Content-Length")) - } -} diff --git a/aws-runtime/http-client-engine-crt/common/test/aws/sdk/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt b/aws-runtime/http-client-engine-crt/common/test/aws/sdk/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt deleted file mode 100644 index eacef65cfb3..00000000000 --- a/aws-runtime/http-client-engine-crt/common/test/aws/sdk/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.http.engine.crt - -import aws.sdk.kotlin.crt.http.* -import aws.sdk.kotlin.crt.io.byteArrayBuffer -import aws.smithy.kotlin.runtime.ClientException -import aws.smithy.kotlin.runtime.http.HttpBody -import aws.smithy.kotlin.runtime.http.HttpStatusCode -import io.kotest.matchers.string.shouldContain -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.runTest -import kotlin.test.* - -@OptIn(ExperimentalCoroutinesApi::class) -class SdkStreamResponseHandlerTest { - - private class MockHttpStream(override val responseStatusCode: Int) : HttpStream { - var closed: Boolean = false - override fun activate() {} - override fun close() { closed = true } - override fun incrementWindow(size: Int) {} - } - - private class MockHttpClientConnection : HttpClientConnection { - var isClosed: Boolean = false - override fun close() { isClosed = true } - override fun makeRequest(httpReq: HttpRequest, handler: HttpStreamResponseHandler): HttpStream { throw UnsupportedOperationException("not implemented for test") } - override fun shutdown() { } - } - - private val mockConn = MockHttpClientConnection() - - @Test - fun testWaitSuccessResponse() = runTest { - val handler = SdkStreamResponseHandler(mockConn) - val stream = MockHttpStream(200) - launch { - val headers = listOf( - HttpHeader("foo", "bar"), - HttpHeader("baz", "qux"), - ) - handler.onResponseHeaders(stream, 200, HttpHeaderBlock.MAIN.blockType, headers) - handler.onResponseHeadersDone(stream, HttpHeaderBlock.MAIN.blockType) - } - - // should be signalled as soon as headers are available - val resp = handler.waitForResponse() - assertEquals(HttpStatusCode.OK, resp.status) - - assertTrue(resp.body is HttpBody.Empty) - handler.onResponseComplete(stream, 0) - - assertFalse(mockConn.isClosed) - handler.complete() - assertTrue(mockConn.isClosed) - } - - @Test - fun testWaitNoHeaders() = runTest { - val handler = SdkStreamResponseHandler(mockConn) - val stream = MockHttpStream(200) - launch { - handler.onResponseComplete(stream, 0) - } - - val resp = handler.waitForResponse() - assertEquals(HttpStatusCode.OK, resp.status) - } - - @Test - fun testWaitFailedResponse() = runTest { - val handler = SdkStreamResponseHandler(mockConn) - val stream = MockHttpStream(200) - launch { - handler.onResponseComplete(stream, -1) - } - - // failed engine execution should raise an exception - assertFails { - handler.waitForResponse() - } - } - - @Test - fun testRespBodyCreated() = runTest { - val handler = SdkStreamResponseHandler(mockConn) - val stream = MockHttpStream(200) - launch { - val headers = listOf( - HttpHeader("Content-Length", "72") - ) - handler.onResponseHeaders(stream, 200, HttpHeaderBlock.MAIN.blockType, headers) - handler.onResponseHeadersDone(stream, HttpHeaderBlock.MAIN.blockType) - } - - // should be signalled as soon as headers are available - val resp = handler.waitForResponse() - assertEquals(HttpStatusCode.OK, resp.status) - - assertEquals(72, resp.body.contentLength) - assertTrue(resp.body is HttpBody.Streaming) - val respChan = (resp.body as HttpBody.Streaming).readFrom() - assertFalse(respChan.isClosedForWrite) - - assertFalse(mockConn.isClosed) - handler.onResponseComplete(stream, 0) - assertTrue(respChan.isClosedForWrite) - } - - @Test - fun testRespBody() = runTest { - val handler = SdkStreamResponseHandler(mockConn) - val stream = MockHttpStream(200) - val data = "Fool of a Took! Throw yourself in next time and rid us of your stupidity!" - launch { - val headers = listOf( - HttpHeader("Content-Length", "${data.length}") - ) - handler.onResponseHeaders(stream, 200, HttpHeaderBlock.MAIN.blockType, headers) - handler.onResponseHeadersDone(stream, HttpHeaderBlock.MAIN.blockType) - handler.onResponseBody(stream, byteArrayBuffer(data.encodeToByteArray())) - handler.onResponseComplete(stream, 0) - } - - // should be signalled as soon as headers are available - val resp = handler.waitForResponse() - assertEquals(HttpStatusCode.OK, resp.status) - - assertEquals(data.length.toLong(), resp.body.contentLength) - assertTrue(resp.body is HttpBody.Streaming) - val respChan = (resp.body as HttpBody.Streaming).readFrom() - - assertTrue(respChan.isClosedForWrite) - - assertEquals(data, respChan.readRemaining().decodeToString()) - } - - @Test - fun testStreamError() = runTest { - val handler = SdkStreamResponseHandler(mockConn) - val stream = MockHttpStream(200) - val data = "foo bar" - val socketClosedEc = 1051 - launch { - val headers = listOf( - HttpHeader("Content-Length", "${data.length}") - ) - handler.onResponseHeaders(stream, 200, HttpHeaderBlock.MAIN.blockType, headers) - handler.onResponseHeadersDone(stream, HttpHeaderBlock.MAIN.blockType) - handler.onResponseBody(stream, byteArrayBuffer("foo".encodeToByteArray())) - handler.onResponseComplete(stream, socketClosedEc) - } - - // should be signalled as soon as headers are available - val resp = handler.waitForResponse() - assertEquals(HttpStatusCode.OK, resp.status) - - assertEquals(data.length.toLong(), resp.body.contentLength) - val respChan = (resp.body as HttpBody.Streaming).readFrom() - - assertTrue(respChan.isClosedForWrite) - assertFailsWith { - respChan.readRemaining() - }.message.shouldContain("CrtHttpEngine::response failed: ec=$socketClosedEc") - } -} diff --git a/aws-runtime/http-client-engine-crt/common/test/aws/sdk/kotlin/runtime/http/engine/crt/SegmentTest.kt b/aws-runtime/http-client-engine-crt/common/test/aws/sdk/kotlin/runtime/http/engine/crt/SegmentTest.kt deleted file mode 100644 index 92d94e4079d..00000000000 --- a/aws-runtime/http-client-engine-crt/common/test/aws/sdk/kotlin/runtime/http/engine/crt/SegmentTest.kt +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.http.engine.crt - -import aws.smithy.kotlin.runtime.io.SdkByteBuffer -import aws.smithy.kotlin.runtime.io.decodeToString -import kotlin.test.Test -import kotlin.test.assertEquals - -class SegmentTest { - @Test - fun testCopyToByteArray() { - val segment = newReadableSegment("1234".encodeToByteArray()) - val dest = ByteArray(16) - val rc = segment.copyTo(dest) - assertEquals(4, rc) - assertEquals("1234", dest.decodeToString(0, 4)) - } - - @Test - fun testCopyToSdkBuffer() { - val segment = newReadableSegment("1234".encodeToByteArray()) - val dest = SdkByteBuffer(16u) - val rc = segment.copyTo(dest) - assertEquals(4, rc) - assertEquals(4u, dest.writePosition) - assertEquals(4u, dest.readRemaining) - assertEquals("1234", dest.decodeToString()) - } -} diff --git a/aws-runtime/http-client-engine-crt/jvm/src/aws/sdk/kotlin/runtime/http/engine/crt/BufferedReadChannelJVM.kt b/aws-runtime/http-client-engine-crt/jvm/src/aws/sdk/kotlin/runtime/http/engine/crt/BufferedReadChannelJVM.kt deleted file mode 100644 index e1c9bc287ce..00000000000 --- a/aws-runtime/http-client-engine-crt/jvm/src/aws/sdk/kotlin/runtime/http/engine/crt/BufferedReadChannelJVM.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.http.engine.crt - -import aws.smithy.kotlin.runtime.io.SdkByteBuffer -import aws.smithy.kotlin.runtime.io.of -import java.nio.ByteBuffer - -internal actual fun bufferedReadChannel(onBytesRead: (n: Int) -> Unit): BufferedReadChannel = - BufferedReadChannelImpl(onBytesRead) - -internal class BufferedReadChannelImpl( - onBytesRead: (n: Int) -> Unit -) : AbstractBufferedReadChannel(onBytesRead) { - - override suspend fun readAvailable(sink: ByteBuffer): Int { - if (sink.remaining() == 0) return 0 - val sdkSink = SdkByteBuffer.of(sink) - val consumed = readAsMuchAsPossible(sdkSink, sink.remaining()) - return when { - consumed == 0 && closed != null -> -1 - consumed > 0 -> { - sink.position(sink.position() + consumed) - consumed - } - else -> readAvailableSuspend(sink) - } - } - - private suspend fun readAvailableSuspend(dest: ByteBuffer): Int { - if (!readSuspend()) { - return -1 - } - return readAvailable(dest) - } -} diff --git a/aws-runtime/http-client-engine-crt/jvm/test/aws/sdk/kotlin/runtime/http/engine/crt/AsyncStressTest.kt b/aws-runtime/http-client-engine-crt/jvm/test/aws/sdk/kotlin/runtime/http/engine/crt/AsyncStressTest.kt deleted file mode 100644 index 004855409db..00000000000 --- a/aws-runtime/http-client-engine-crt/jvm/test/aws/sdk/kotlin/runtime/http/engine/crt/AsyncStressTest.kt +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.http.engine.crt - -import aws.smithy.kotlin.runtime.http.HttpMethod -import aws.smithy.kotlin.runtime.http.Protocol -import aws.smithy.kotlin.runtime.http.readAll -import aws.smithy.kotlin.runtime.http.request.HttpRequestBuilder -import aws.smithy.kotlin.runtime.http.request.url -import aws.smithy.kotlin.runtime.http.response.complete -import aws.smithy.kotlin.runtime.http.sdkHttpClient -import aws.smithy.kotlin.runtime.httptest.TestWithLocalServer -import aws.smithy.kotlin.runtime.testing.IgnoreWindows -import io.ktor.application.* -import io.ktor.response.* -import io.ktor.routing.* -import io.ktor.server.cio.* -import io.ktor.server.engine.* -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.async -import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withTimeout -import kotlinx.coroutines.yield -import kotlin.test.Test -import kotlin.time.Duration.Companion.seconds - -@OptIn(ExperimentalCoroutinesApi::class) -class AsyncStressTest : TestWithLocalServer() { - - override val server = embeddedServer(CIO, serverPort) { - routing { - get("/largeResponse") { - // something that fills the stream window... - val respSize = DEFAULT_WINDOW_SIZE_BYTES * 2 - val text = "testing" - call.respondText(text.repeat(respSize / text.length)) - } - } - } - - @Test - fun testConcurrentRequests() = runBlocking { - // https://github.com/awslabs/aws-sdk-kotlin/issues/170 - val client = sdkHttpClient(CrtHttpEngine()) - val request = HttpRequestBuilder().apply { - url { - scheme = Protocol.HTTP - method = HttpMethod.GET - host = testHost - port = serverPort - path = "/largeResponse" - } - } - - repeat(1_000) { - async { - try { - - val call = client.call(request) - yield() - call.response.body.readAll() - call.complete() - } catch (ex: Exception) { - println("exception on $it: $ex") - throw ex - } - } - yield() - } - } - - @IgnoreWindows("FIXME - times out after upgrade to kotlinx.coroutines 1.6.0") - @Test - fun testStreamNotConsumed() = runBlocking { - // test that filling the stream window and not consuming the body stream still cleans up resources - // appropriately and allows requests to proceed (a stream that isn't consumed will be in a stuck state - // if the window is full and never incremented again, this can lead to all connections being consumed - // and the engine to no longer make further requests) - val client = sdkHttpClient(CrtHttpEngine()) - val request = HttpRequestBuilder().apply { - url { - scheme = Protocol.HTTP - method = HttpMethod.GET - host = testHost - port = serverPort - path = "/largeResponse" - } - } - - withTimeout(5.seconds) { - repeat(1_000) { - async { - try { - val call = client.call(request) - yield() - call.complete() - } catch (ex: Exception) { - println("exception on $it: $ex") - throw ex - } - } - yield() - } - } - } -} diff --git a/aws-runtime/http-client-engine-crt/jvm/test/aws/sdk/kotlin/runtime/http/engine/crt/BufferedReadChannelByteBufferTest.kt b/aws-runtime/http-client-engine-crt/jvm/test/aws/sdk/kotlin/runtime/http/engine/crt/BufferedReadChannelByteBufferTest.kt deleted file mode 100644 index 7f82c990f47..00000000000 --- a/aws-runtime/http-client-engine-crt/jvm/test/aws/sdk/kotlin/runtime/http/engine/crt/BufferedReadChannelByteBufferTest.kt +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -package aws.sdk.kotlin.runtime.http.engine.crt - -import aws.smithy.kotlin.runtime.testing.ManualDispatchTestBase -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.runTest -import kotlinx.coroutines.yield -import java.nio.ByteBuffer -import kotlin.test.AfterTest -import kotlin.test.Test -import kotlin.test.assertEquals - -class BufferedReadChannelByteBufferTest : ManualDispatchTestBase() { - private var totalRead: Int = 0 - private val ch by lazy { - bufferedReadChannel { size -> - totalRead += size - } - } - - @AfterTest - fun finish() { - ch.cancel(CancellationException("Test finished")) - } - - @OptIn(ExperimentalCoroutinesApi::class) - @Test - fun testReadBeforeAvailable() = runTest { - // test readAvailable() suspends when no data is available - expect(1) - - val data = "1234" - - launch { - expect(3) - val buf = ByteBuffer.allocate(16) - - // should suspend - val rc = ch.readAvailable(buf) - expect(5) - assertEquals(data.length, rc) - assertEquals(rc, buf.position()) - } - - expect(2) - yield() - - expect(4) - - // read continuation should be queued to resume - ch.write(data) - yield() - - finish(6) - } - - @OptIn(ExperimentalCoroutinesApi::class) - @Test - fun testReadAfterAvailable() = runTest { - // test readAvailable() does NOT suspend when data is available - expect(1) - ch.write("1234") - launch { - expect(3) - - val buf = ByteBuffer.allocate(16) - // should NOT suspend - val rc = ch.readAvailable(buf) - - expect(4) - assertEquals(4, rc) - assertEquals(rc, buf.position()) - - expect(5) - } - - expect(2) - yield() - finish(6) - } -} diff --git a/build.gradle.kts b/build.gradle.kts index 9b1bd61508a..a67071805fe 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -101,7 +101,6 @@ tasks.dokkaHtmlMultiModule.configure { val excludeFromDocumentation = listOf( project(":aws-runtime:testing"), - project(":aws-runtime:crt-util"), ) removeChildTasks(excludeFromDocumentation) } diff --git a/builder.json b/builder.json index 2e18ad6ba66..d6462263567 100644 --- a/builder.json +++ b/builder.json @@ -32,10 +32,10 @@ ], "upstream": [ { - "name": "smithy-kotlin" + "name": "aws-crt-kotlin" }, { - "name": "aws-crt-kotlin" + "name": "smithy-kotlin" } ], "variants": { diff --git a/codegen/smithy-aws-kotlin-codegen/src/main/kotlin/aws/sdk/kotlin/codegen/AwsKotlinDependency.kt b/codegen/smithy-aws-kotlin-codegen/src/main/kotlin/aws/sdk/kotlin/codegen/AwsKotlinDependency.kt index c36322b28f5..e20ac5e71a6 100644 --- a/codegen/smithy-aws-kotlin-codegen/src/main/kotlin/aws/sdk/kotlin/codegen/AwsKotlinDependency.kt +++ b/codegen/smithy-aws-kotlin-codegen/src/main/kotlin/aws/sdk/kotlin/codegen/AwsKotlinDependency.kt @@ -42,7 +42,6 @@ object AwsKotlinDependency { val AWS_TYPES = KotlinDependency(GradleConfiguration.Api, AWS_CLIENT_RT_ROOT_NS, AWS_CLIENT_RT_GROUP, "aws-types", AWS_CLIENT_RT_VERSION) val AWS_JSON_PROTOCOLS = KotlinDependency(GradleConfiguration.Implementation, "$AWS_CLIENT_RT_ROOT_NS.protocol.json", AWS_CLIENT_RT_GROUP, "aws-json-protocols", AWS_CLIENT_RT_VERSION) val AWS_XML_PROTOCOLS = KotlinDependency(GradleConfiguration.Implementation, "$AWS_CLIENT_RT_ROOT_NS.protocol.xml", AWS_CLIENT_RT_GROUP, "aws-xml-protocols", AWS_CLIENT_RT_VERSION) - val AWS_CRT_HTTP_ENGINE = KotlinDependency(GradleConfiguration.Implementation, "$AWS_CLIENT_RT_ROOT_NS.http.engine.crt", AWS_CLIENT_RT_GROUP, "http-client-engine-crt", AWS_CLIENT_RT_VERSION) val AWS_EVENT_STREAM = KotlinDependency(GradleConfiguration.Implementation, "$AWS_CLIENT_RT_ROOT_NS.protocol.eventstream", AWS_CLIENT_RT_GROUP, "aws-event-stream", AWS_CLIENT_RT_VERSION) } @@ -59,7 +58,6 @@ private val sameProjectDeps: Map by lazy { AwsKotlinDependency.AWS_TYPES to """project(":aws-runtime:aws-types")""", AwsKotlinDependency.AWS_JSON_PROTOCOLS to """project(":aws-runtime:protocols:aws-json-protocols")""", AwsKotlinDependency.AWS_XML_PROTOCOLS to """project(":aws-runtime:protocols:aws-xml-protocols")""", - AwsKotlinDependency.AWS_CRT_HTTP_ENGINE to """project(":aws-runtime:http-client-engine-crt")""", AwsKotlinDependency.AWS_EVENT_STREAM to """project(":aws-runtime:protocols:aws-event-stream")""", ) } diff --git a/codegen/smithy-aws-kotlin-codegen/src/main/kotlin/aws/sdk/kotlin/codegen/protocols/core/AwsHttpProtocolClientGenerator.kt b/codegen/smithy-aws-kotlin-codegen/src/main/kotlin/aws/sdk/kotlin/codegen/protocols/core/AwsHttpProtocolClientGenerator.kt index cb3a7c8e2c3..522789ae041 100644 --- a/codegen/smithy-aws-kotlin-codegen/src/main/kotlin/aws/sdk/kotlin/codegen/protocols/core/AwsHttpProtocolClientGenerator.kt +++ b/codegen/smithy-aws-kotlin-codegen/src/main/kotlin/aws/sdk/kotlin/codegen/protocols/core/AwsHttpProtocolClientGenerator.kt @@ -5,7 +5,6 @@ package aws.sdk.kotlin.codegen.protocols.core -import aws.sdk.kotlin.codegen.AwsKotlinDependency import aws.sdk.kotlin.codegen.AwsRuntimeTypes import aws.sdk.kotlin.codegen.protocols.middleware.AwsSignatureVersion4 import aws.sdk.kotlin.codegen.sdkId @@ -36,7 +35,7 @@ open class AwsHttpProtocolClientGenerator( override val defaultHttpClientEngineSymbol: Symbol get() = buildSymbol { name = "CrtHttpEngine" - namespace(AwsKotlinDependency.AWS_CRT_HTTP_ENGINE) + namespace(KotlinDependency.AWS_CRT_HTTP_ENGINE) } override fun render(writer: KotlinWriter) { diff --git a/settings.gradle.kts b/settings.gradle.kts index 7c2eb80b101..aeb2e32a2a0 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -37,11 +37,9 @@ include(":aws-runtime:aws-config") include(":aws-runtime:aws-endpoint") include(":aws-runtime:aws-signing") include(":aws-runtime:aws-http") -include(":aws-runtime:http-client-engine-crt") include(":aws-runtime:protocols:aws-json-protocols") include(":aws-runtime:protocols:aws-xml-protocols") include(":aws-runtime:protocols:aws-event-stream") -include(":aws-runtime:crt-util") include(":tests") include(":tests:codegen:event-stream") diff --git a/tests/codegen/event-stream/build.gradle.kts b/tests/codegen/event-stream/build.gradle.kts index 3f9e57c9172..5e0a8aca364 100644 --- a/tests/codegen/event-stream/build.gradle.kts +++ b/tests/codegen/event-stream/build.gradle.kts @@ -145,7 +145,7 @@ dependencies { implementation(project(":aws-runtime:protocols:aws-event-stream")) implementation(project(":aws-runtime:aws-http")) implementation(project(":aws-runtime:protocols:aws-json-protocols")) - implementation(project(":aws-runtime:http-client-engine-crt")) // because it is hard coded in generated source currently + implementation("aws.smithy.kotlin:http-client-engine-crt:$smithyKotlinVersion") // because it is hard coded in generated source currently implementation(project(":aws-runtime:aws-config")) implementation(project(":aws-runtime:aws-core")) implementation(project(":aws-runtime:aws-endpoint"))