From 065e6fbe87b1cbbe52a3376f9b9f14fc80ae7d3c Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Tue, 24 Aug 2021 15:37:00 -0400 Subject: [PATCH] fix: ensure CRT doesn't starve the dispatcher --- .../runtime/crt/ReadChannelBodyStream.kt | 71 ++++++++++--------- .../runtime/crt/ReadChannelBodyStreamTest.kt | 8 +-- services/build.gradle.kts | 4 ++ 3 files changed, 47 insertions(+), 36 deletions(-) diff --git a/aws-runtime/crt-util/common/src/aws/sdk/kotlin/runtime/crt/ReadChannelBodyStream.kt b/aws-runtime/crt-util/common/src/aws/sdk/kotlin/runtime/crt/ReadChannelBodyStream.kt index fb006aff295..ab81bad4c8f 100644 --- a/aws-runtime/crt-util/common/src/aws/sdk/kotlin/runtime/crt/ReadChannelBodyStream.kt +++ b/aws-runtime/crt-util/common/src/aws/sdk/kotlin/runtime/crt/ReadChannelBodyStream.kt @@ -35,15 +35,12 @@ public class ReadChannelBodyStream( override val coroutineContext: CoroutineContext = callContext + producerJob private val currBuffer = atomic(null) - private val bufferChan = Channel(1) + private val bufferChan = Channel(Channel.UNLIMITED) init { producerJob.invokeOnCompletion { cause -> bodyChan.cancel(cause) } - - // launch a coroutine to fill the buffer channel - proxyRequestBody() } // lie - CRT tries to control this via normal seek operations (e.g. when they calculate a hash for signing @@ -52,16 +49,51 @@ public class ReadChannelBodyStream( // and handle these concerns. override fun resetPosition(): Boolean = true - @OptIn(ExperimentalCoroutinesApi::class) override fun sendRequestBody(buffer: MutableBuffer): Boolean { + return doSendRequestBody(buffer).also { if (it) producerJob.complete() } + } + + @OptIn(ExperimentalCoroutinesApi::class) + private fun doSendRequestBody(buffer: MutableBuffer): Boolean { + // ensure the request context hasn't been cancelled + callContext.ensureActive() var outgoing = currBuffer.getAndSet(null) ?: bufferChan.tryReceive().getOrNull() + if (bodyChan.availableForRead > 0 && outgoing == null) { + // NOTE: It is critical that the coroutine launched doesn't actually suspend because it will never + // get a chance to resume. The CRT will consume the dispatcher thread until the data has been read + // completely. We could launch one of the coroutines into a different dispatcher but this won't work + // on platforms (e.g. JS) that don't have multiple threads. Essentially the CRT will starve + // the dispatcher and not allow other coroutines to make progress. + // see: https://github.com/awslabs/aws-sdk-kotlin/issues/282 + // + // TODO - we could get rid of this extra copy + coroutine if readAvailable() had a non-suspend version + // see: https://youtrack.jetbrains.com/issue/KTOR-2772 + // + // To get around this, if there is data to read we launch a coroutine UNDISPATCHED so that it runs + // immediately in the current thread. The coroutine will fill the buffer but won't suspend because + // we know data is available. + launch(start = CoroutineStart.UNDISPATCHED) { + val sdkBuffer = SdkBuffer(bodyChan.availableForRead) + bodyChan.readAvailable(sdkBuffer) + bufferChan.send(sdkBuffer) + }.invokeOnCompletion { cause -> + if (cause != null) { + producerJob.completeExceptionally(cause) + bufferChan.close(cause) + } + } + } + + if (bodyChan.availableForRead == 0 && bodyChan.isClosedForRead) { + bufferChan.close() + } + if (outgoing == null) { if (bufferChan.isClosedForReceive) { return true } - // ensure the request context hasn't been cancelled - callContext.ensureActive() + outgoing = bufferChan.tryReceive().getOrNull() ?: return false } @@ -73,29 +105,4 @@ public class ReadChannelBodyStream( return bufferChan.isClosedForReceive && currBuffer.value == null } - - private fun proxyRequestBody() { - // TODO - we could get rid of this extra copy + coroutine if readAvailable() had a non-suspend version - // see: https://youtrack.jetbrains.com/issue/KTOR-2772 - val job = launch(start = CoroutineStart.UNDISPATCHED) { - while (!bodyChan.isClosedForRead) { - bodyChan.awaitContent() - if (bodyChan.isClosedForRead) return@launch - - // TODO - we could pool these - val buffer = SdkBuffer(bodyChan.availableForRead) - bodyChan.readAvailable(buffer) - bufferChan.send(buffer) - } - } - - job.invokeOnCompletion { cause -> - bufferChan.close(cause) - if (cause != null) { - producerJob.completeExceptionally(cause) - } else { - producerJob.complete() - } - } - } } diff --git a/aws-runtime/crt-util/common/test/aws/sdk/kotlin/runtime/crt/ReadChannelBodyStreamTest.kt b/aws-runtime/crt-util/common/test/aws/sdk/kotlin/runtime/crt/ReadChannelBodyStreamTest.kt index f64fe0f7d1f..0ea7d58c8e0 100644 --- a/aws-runtime/crt-util/common/test/aws/sdk/kotlin/runtime/crt/ReadChannelBodyStreamTest.kt +++ b/aws-runtime/crt-util/common/test/aws/sdk/kotlin/runtime/crt/ReadChannelBodyStreamTest.kt @@ -34,17 +34,17 @@ class ReadChannelBodyStreamTest { } @Test - fun testCancellation() = runSuspendTest { + fun testCancellation(): Unit = runSuspendTest { val chan = SdkByteChannel() val job = Job() val stream = ReadChannelBodyStream(chan, coroutineContext + job) - yield() job.cancel() - yield() val (sendBuffer, _) = mutableBuffer(16) - assertTrue(stream.sendRequestBody(sendBuffer)) + assertFailsWith { + stream.sendRequestBody(sendBuffer) + } } @Test diff --git a/services/build.gradle.kts b/services/build.gradle.kts index ebe9b0e6875..7511f747f44 100644 --- a/services/build.gradle.kts +++ b/services/build.gradle.kts @@ -104,6 +104,10 @@ subprojects { classpath = compileDependencyFiles + runtimeDependencyFiles testClassesDirs = output.classesDirs useJUnitPlatform() + testLogging { + events("passed", "skipped", "failed") + showStandardStreams = true + } } } }