Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package aws.sdk.kotlin.runtime.crt
import aws.sdk.kotlin.crt.http.HttpRequestBodyStream
import aws.sdk.kotlin.crt.io.MutableBuffer
import aws.sdk.kotlin.runtime.InternalSdkApi
import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.SdkByteBuffer
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
import aws.smithy.kotlin.runtime.io.readAvailable
import kotlinx.atomicfu.atomic
Expand All @@ -19,7 +19,7 @@ import kotlin.coroutines.CoroutineContext
/**
* write as much of [outgoing] to [dest] as possible
*/
internal expect fun transferRequestBody(outgoing: SdkBuffer, dest: MutableBuffer)
internal expect fun transferRequestBody(outgoing: SdkByteBuffer, dest: MutableBuffer)

/**
* Implement's [HttpRequestBodyStream] which proxies an SDK request body channel [SdkByteReadChannel]
Expand All @@ -34,8 +34,8 @@ public class ReadChannelBodyStream(
private val producerJob = Job(callContext.job)
override val coroutineContext: CoroutineContext = callContext + producerJob

private val currBuffer = atomic<SdkBuffer?>(null)
private val bufferChan = Channel<SdkBuffer>(Channel.UNLIMITED)
private val currBuffer = atomic<SdkByteBuffer?>(null)
private val bufferChan = Channel<SdkByteBuffer>(Channel.UNLIMITED)

init {
producerJob.invokeOnCompletion { cause ->
Expand Down Expand Up @@ -74,7 +74,7 @@ public class ReadChannelBodyStream(
// immediately in the current thread. The coroutine will fill the buffer but won't suspend because
// we know data is available.
launch(start = CoroutineStart.UNDISPATCHED) {
val sdkBuffer = SdkBuffer(bodyChan.availableForRead)
val sdkBuffer = SdkByteBuffer(bodyChan.availableForRead.toULong())
bodyChan.readAvailable(sdkBuffer)
bufferChan.send(sdkBuffer)
}.invokeOnCompletion { cause ->
Expand All @@ -99,7 +99,7 @@ public class ReadChannelBodyStream(

transferRequestBody(outgoing, buffer)

if (outgoing.readRemaining > 0) {
if (outgoing.readRemaining > 0u) {
currBuffer.value = outgoing
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
package aws.sdk.kotlin.runtime.crt

import aws.sdk.kotlin.crt.io.MutableBuffer
import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.SdkByteBuffer
import aws.smithy.kotlin.runtime.io.readAvailable

internal actual fun transferRequestBody(outgoing: SdkBuffer, dest: MutableBuffer) {
internal actual fun transferRequestBody(outgoing: SdkByteBuffer, dest: MutableBuffer) {
outgoing.readAvailable(dest.buffer)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package aws.sdk.kotlin.runtime.http.engine.crt

import aws.sdk.kotlin.crt.io.Buffer
import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.SdkByteBuffer
import aws.smithy.kotlin.runtime.io.bytes
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
Expand Down Expand Up @@ -92,7 +92,7 @@ internal abstract class AbstractBufferedReadChannel(
}

override suspend fun readRemaining(limit: Int): ByteArray {
val buffer = SdkBuffer(minOf(availableForRead, limit))
val buffer = SdkByteBuffer(minOf(availableForRead, limit).toULong())

val consumed = readAsMuchAsPossible(buffer, limit)

Expand All @@ -103,7 +103,7 @@ internal abstract class AbstractBufferedReadChannel(
}
}

protected fun readAsMuchAsPossible(dest: SdkBuffer, limit: Int): Int {
protected fun readAsMuchAsPossible(dest: SdkByteBuffer, limit: Int): Int {
var consumed = 0
var remaining = limit

Expand All @@ -116,15 +116,15 @@ internal abstract class AbstractBufferedReadChannel(

markBytesConsumed(rc)

if (segment.readRemaining > 0) {
if (segment.readRemaining > 0u) {
currSegment.update { segment }
}
}

return consumed
}

private suspend fun readRemainingSuspend(buffer: SdkBuffer, limit: Int): ByteArray {
private suspend fun readRemainingSuspend(buffer: SdkByteBuffer, limit: Int): ByteArray {
check(currSegment.value == null) { "current segment should be drained already" }

var consumed = 0
Expand All @@ -137,7 +137,7 @@ internal abstract class AbstractBufferedReadChannel(
markBytesConsumed(rc)

if (remaining <= 0) {
if (segment.readRemaining > 0) {
if (segment.readRemaining > 0u) {
currSegment.update { segment }
}
break
Expand All @@ -162,7 +162,7 @@ internal abstract class AbstractBufferedReadChannel(

markBytesConsumed(rc)

if (segment.readRemaining > 0) {
if (segment.readRemaining > 0u) {
currSegment.update { segment }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,26 @@

package aws.sdk.kotlin.runtime.http.engine.crt

import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.SdkByteBuffer
import aws.smithy.kotlin.runtime.io.readFully

internal typealias Segment = SdkBuffer
internal typealias Segment = SdkByteBuffer

/**
* Create a segment from the given src [ByteArray] and mark the entire contents readable
*/
internal fun newReadableSegment(src: ByteArray): Segment = Segment.of(src).apply { commitWritten(src.size) }
internal fun newReadableSegment(src: ByteArray): Segment = Segment.of(src).apply { advance(src.size.toULong()) }

internal fun Segment.copyTo(dest: SdkBuffer, limit: Int = Int.MAX_VALUE): Int {
check(readRemaining > 0) { "nothing left to read from segment" }
val wc = minOf(readRemaining, limit)
internal fun Segment.copyTo(dest: SdkByteBuffer, limit: Int = Int.MAX_VALUE): Int {
check(readRemaining > 0u) { "nothing left to read from segment" }
val wc = minOf(readRemaining, limit.toULong())
readFully(dest, wc)
return wc
return wc.toInt()
}

internal fun Segment.copyTo(dest: ByteArray, offset: Int = 0, length: Int = dest.size - offset): Int {
check(readRemaining > 0) { "nothing left to read from segment" }
val wc = minOf(length, readRemaining)
check(readRemaining > 0u) { "nothing left to read from segment" }
val wc = minOf(length.toULong(), readRemaining).toInt()
readFully(dest, offset, wc)
return wc
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package aws.sdk.kotlin.runtime.http.engine.crt

import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.SdkByteBuffer
import aws.smithy.kotlin.runtime.io.decodeToString
import kotlin.test.Test
import kotlin.test.assertEquals
Expand All @@ -23,11 +23,11 @@ class SegmentTest {
@Test
fun testCopyToSdkBuffer() {
val segment = newReadableSegment("1234".encodeToByteArray())
val dest = SdkBuffer(16)
val dest = SdkByteBuffer(16u)
val rc = segment.copyTo(dest)
assertEquals(4, rc)
assertEquals(4, dest.writePosition)
assertEquals(4, dest.readRemaining)
assertEquals(4u, dest.writePosition)
assertEquals(4u, dest.readRemaining)
assertEquals("1234", dest.decodeToString())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package aws.sdk.kotlin.runtime.http.engine.crt

import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.SdkByteBuffer
import aws.smithy.kotlin.runtime.io.of
import java.nio.ByteBuffer

Expand All @@ -18,7 +18,7 @@ internal class BufferedReadChannelImpl(

override suspend fun readAvailable(sink: ByteBuffer): Int {
if (sink.remaining() == 0) return 0
val sdkSink = SdkBuffer.of(sink)
val sdkSink = SdkByteBuffer.of(sink)
val consumed = readAsMuchAsPossible(sdkSink, sink.remaining())
return when {
consumed == 0 && closed != null -> -1
Expand Down
25 changes: 25 additions & 0 deletions aws-runtime/protocols/event-stream/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

description = "Support for the vnd.amazon.event-stream content type"
extra["displayName"] = "AWS :: SDK :: Kotlin :: Protocols :: Event Stream"
extra["moduleName"] = "aws.sdk.kotlin.runtime.protocol.eventstream"

kotlin {
sourceSets {
commonMain {
dependencies {
api(project(":aws-runtime:aws-core"))
}
}

commonTest {
dependencies {
implementation(project(":aws-runtime:testing"))
}
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

package aws.sdk.kotlin.runtime.protocol.eventstream

import aws.sdk.kotlin.runtime.InternalSdkApi
import aws.smithy.kotlin.runtime.io.Buffer
import aws.smithy.kotlin.runtime.io.SdkByteBuffer
import aws.smithy.kotlin.runtime.io.readFully

@InternalSdkApi
public class FrameDecoder {
private var prelude: Prelude? = null

/**
* Reset the decoder discarding any intermediate state
*/
public fun reset() { prelude = null }

private fun isFrameAvailable(buffer: Buffer): Boolean {
val totalLen = prelude?.totalLen ?: return false
val remaining = totalLen - PRELUDE_BYTE_LEN_WITH_CRC
return buffer.readRemaining >= remaining.toULong()
}

/**
* Attempt to decode a [Message] from the buffer. This function expects to be called over and over again
* with more data in the buffer each time its called. When there is not enough data to decode this function
* returns null.
* The decoder will consume the prelude when enough data is available. When it is invoked with enough
* data it will consume the remaining message bytes.
*/
public fun decodeFrame(buffer: Buffer): Message? {
if (prelude == null && buffer.readRemaining >= PRELUDE_BYTE_LEN_WITH_CRC.toULong()) {
prelude = Prelude.decode(buffer)
}

return when (isFrameAvailable(buffer)) {
true -> {
val currPrelude = checkNotNull(prelude)
val messageBuf = SdkByteBuffer(currPrelude.totalLen.toULong())
currPrelude.encode(messageBuf)
buffer.readFully(messageBuf)
reset()
Message.decode(messageBuf)
}
else -> null
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

package aws.sdk.kotlin.runtime.protocol.eventstream

import aws.sdk.kotlin.runtime.InternalSdkApi
import aws.smithy.kotlin.runtime.io.*

private const val MIN_HEADER_LEN = 2
private const val MAX_HEADER_NAME_LEN = 255

/*
Header Wire Format

+--------------------+
|Hdr Name Len (8) |
+--------------------+-----------------------------------------------+
| Header Name (*) ... |
+--------------------+-----------------------------------------------+
|Hdr Value Type (8) |
+--------------------+-----------------------------------------------+
| Header Value (*) ... |
+--------------------------------------------------------------------+
*/

/**
* An event stream frame header
*/
@InternalSdkApi
public data class Header(val name: String, val value: HeaderValue) {
public companion object {
/**
* Read an encoded header from the [buffer]
*/
public fun decode(buffer: Buffer): Header {
check(buffer.readRemaining >= MIN_HEADER_LEN.toULong()) { "Invalid frame header; require at least $MIN_HEADER_LEN bytes" }
val nameLen = buffer.readByte().toInt()
check(nameLen > 0) { "Invalid header name length: $nameLen" }
val nameBytes = ByteArray(nameLen)
buffer.readFully(nameBytes)
val value = HeaderValue.decode(buffer)
return Header(nameBytes.decodeToString(), value)
}
}

/**
* Encode a header to [dest] buffer
*/
public fun encode(dest: MutableBuffer) {
val bytes = name.encodeToByteArray()
check(bytes.size < MAX_HEADER_NAME_LEN) { "Header name too long" }
dest.writeByte(bytes.size.toByte())
dest.writeFully(bytes)
value.encode(dest)
}
}
Loading