Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,12 @@ public class ReadChannelBodyStream(
override val coroutineContext: CoroutineContext = callContext + producerJob

private val currBuffer = atomic<SdkBuffer?>(null)
private val bufferChan = Channel<SdkBuffer>(1)
private val bufferChan = Channel<SdkBuffer>(Channel.UNLIMITED)

init {
producerJob.invokeOnCompletion { cause ->
bodyChan.cancel(cause)
}

// launch a coroutine to fill the buffer channel
proxyRequestBody()
}

// lie - CRT tries to control this via normal seek operations (e.g. when they calculate a hash for signing
Expand All @@ -52,16 +49,51 @@ public class ReadChannelBodyStream(
// and handle these concerns.
override fun resetPosition(): Boolean = true

@OptIn(ExperimentalCoroutinesApi::class)
override fun sendRequestBody(buffer: MutableBuffer): Boolean {
return doSendRequestBody(buffer).also { if (it) producerJob.complete() }
}

@OptIn(ExperimentalCoroutinesApi::class)
private fun doSendRequestBody(buffer: MutableBuffer): Boolean {
// ensure the request context hasn't been cancelled
callContext.ensureActive()
var outgoing = currBuffer.getAndSet(null) ?: bufferChan.tryReceive().getOrNull()

if (bodyChan.availableForRead > 0 && outgoing == null) {
// NOTE: It is critical that the coroutine launched doesn't actually suspend because it will never
// get a chance to resume. The CRT will consume the dispatcher thread until the data has been read
// completely. We could launch one of the coroutines into a different dispatcher but this won't work
// on platforms (e.g. JS) that don't have multiple threads. Essentially the CRT will starve
// the dispatcher and not allow other coroutines to make progress.
// see: https://github.com/awslabs/aws-sdk-kotlin/issues/282
//
// TODO - we could get rid of this extra copy + coroutine if readAvailable() had a non-suspend version
// see: https://youtrack.jetbrains.com/issue/KTOR-2772
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side question

doesn't look like this ticket is getting much love. perhaps something to bring up in our next meeting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps, we could also probably go submit a PR if we felt the need

//
// To get around this, if there is data to read we launch a coroutine UNDISPATCHED so that it runs
// immediately in the current thread. The coroutine will fill the buffer but won't suspend because
// we know data is available.
launch(start = CoroutineStart.UNDISPATCHED) {
val sdkBuffer = SdkBuffer(bodyChan.availableForRead)
bodyChan.readAvailable(sdkBuffer)
bufferChan.send(sdkBuffer)
}.invokeOnCompletion { cause ->
if (cause != null) {
producerJob.completeExceptionally(cause)
bufferChan.close(cause)
}
}
}

if (bodyChan.availableForRead == 0 && bodyChan.isClosedForRead) {
bufferChan.close()
}

if (outgoing == null) {
if (bufferChan.isClosedForReceive) {
return true
}
// ensure the request context hasn't been cancelled
callContext.ensureActive()

outgoing = bufferChan.tryReceive().getOrNull() ?: return false
}

Expand All @@ -73,29 +105,4 @@ public class ReadChannelBodyStream(

return bufferChan.isClosedForReceive && currBuffer.value == null
}

private fun proxyRequestBody() {
// TODO - we could get rid of this extra copy + coroutine if readAvailable() had a non-suspend version
// see: https://youtrack.jetbrains.com/issue/KTOR-2772
val job = launch(start = CoroutineStart.UNDISPATCHED) {
while (!bodyChan.isClosedForRead) {
bodyChan.awaitContent()
if (bodyChan.isClosedForRead) return@launch

// TODO - we could pool these
val buffer = SdkBuffer(bodyChan.availableForRead)
bodyChan.readAvailable(buffer)
bufferChan.send(buffer)
}
}

job.invokeOnCompletion { cause ->
bufferChan.close(cause)
if (cause != null) {
producerJob.completeExceptionally(cause)
} else {
producerJob.complete()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ class ReadChannelBodyStreamTest {
}

@Test
fun testCancellation() = runSuspendTest {
fun testCancellation(): Unit = runSuspendTest {
val chan = SdkByteChannel()
val job = Job()
val stream = ReadChannelBodyStream(chan, coroutineContext + job)
yield()

job.cancel()
yield()

val (sendBuffer, _) = mutableBuffer(16)
assertTrue(stream.sendRequestBody(sendBuffer))
assertFailsWith<CancellationException> {
stream.sendRequestBody(sendBuffer)
}
}

@Test
Expand Down
4 changes: 4 additions & 0 deletions services/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ subprojects {
classpath = compileDependencyFiles + runtimeDependencyFiles
testClassesDirs = output.classesDirs
useJUnitPlatform()
testLogging {
events("passed", "skipped", "failed")
showStandardStreams = true
}
Comment on lines +107 to +110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix: Generally we shouldn't show full output on passed tests. Was this left in here for debug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the same settings we use everywhere else:

I'm in favor of leaving it since most of our tests aren't printing anything anyway and it allows easier debugging if you do without changing build settings.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears there is precedent. Very well, my concern for this PR is addressed.

}
}
}
Expand Down