Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changes/354c9857-8af1-43b7-9bcf-bff91aa32bc5.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"id": "354c9857-8af1-43b7-9bcf-bff91aa32bc5",
"type": "misc",
"description": "Use explict CoroutineScope for consuming event stream flow"
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import aws.smithy.kotlin.runtime.io.SdkByteChannel
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
import aws.smithy.kotlin.runtime.io.bytes
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlin.coroutines.coroutineContext

/**
* Transform the stream of messages into a stream of raw bytes. Each
Expand All @@ -31,33 +31,37 @@ public fun Flow<Message>.encode(): Flow<ByteArray> = map {

/**
* Transform a stream of encoded messages into an [HttpBody].
* @param scope parent scope to launch a coroutine in that consumes the flow and populates a [SdkByteReadChannel]
*/
@InternalSdkApi
public suspend fun Flow<ByteArray>.asEventStreamHttpBody(): HttpBody {
public suspend fun Flow<ByteArray>.asEventStreamHttpBody(scope: CoroutineScope): HttpBody {
val encodedMessages = this
val ch = SdkByteChannel(true)

// FIXME - we should probably tie this to our own scope (off ExecutionContext) but for now
// tie it to whatever arbitrary scope we are in
val scope = CoroutineScope(coroutineContext)

return object : HttpBody.Streaming() {
override val contentLength: Long? = null
override val isReplayable: Boolean = false
override val isDuplex: Boolean = true

private var job: Job? = null

override fun readFrom(): SdkByteReadChannel {
// FIXME - delaying launch here until the channel is consumed from the HTTP engine is a hacky way
// of enforcing ordering to ensure the ExecutionContext is updated with the
// AwsSigningAttributes.RequestSignature by the time the messages are collected and sign() is called
val job = scope.launch {
encodedMessages.collect {
ch.writeFully(it)

// Although rare, nothing stops downstream consumers from invoking readFrom() more than once.
// Only launch background collection task on first call
if (job == null) {
job = scope.launch {
encodedMessages.collect {
ch.writeFully(it)
}
}
}

job.invokeOnCompletion { cause ->
cause?.let { it.printStackTrace() }
ch.close(cause)
job?.invokeOnCompletion { cause ->
ch.close(cause)
}
}

return ch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class FrameEncoderTest {
"baz",
).map { it.encodeToByteArray() }

val body = messages.asEventStreamHttpBody()
val body = messages.asEventStreamHttpBody(this)
val actual = body.readAll()
val expected = "foobarbaz"
assertEquals(expected, actual?.decodeToString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class EventStreamSerializerGenerator(
}

writer.write("")
writer.write("return messages.#T()", AwsRuntimeTypes.AwsEventStream.asEventStreamHttpBody)
writer.write("return messages.#T(context)", AwsRuntimeTypes.AwsEventStream.asEventStreamHttpBody)
}
Comment on lines 77 to 79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: I'd normally expect a generator change to break some unit tests. Do we not have unit tests for this code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have codegen integration tests for event streams


private fun encodeEventStreamMessage(
Expand Down