-
Notifications
You must be signed in to change notification settings - Fork 55
feat(rt): event stream framing support #320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ianbotsf
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks most good, well done! The biggest question blocking my approval is the interpretation of length fields as signed integers.
| * Read an encoded header from the [buffer] | ||
| */ | ||
| public fun decode(buffer: Buffer): Header { | ||
| check(buffer.readRemaining >= MIN_HEADER_LEN.toULong()) { "Invalid frame header; require at least 2 bytes" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Reuse the constant in the error message:
check(buffer.readRemaining >= MIN_HEADER_LEN.toULong()) { "Invalid frame header; require at least $MIN_HEADER_LEN bytes" }| fun fromTypeId(value: Byte): HeaderType { | ||
| return when (value.toInt()) { | ||
| 0 -> TRUE | ||
| 1 -> FALSE | ||
| 2 -> BYTE | ||
| 3 -> INT16 | ||
| 4 -> INT32 | ||
| 5 -> INT64 | ||
| 6 -> BYTE_ARRAY | ||
| 7 -> STRING | ||
| 8 -> TIMESTAMP | ||
| 9 -> UUID | ||
| else -> throw IllegalArgumentException("Unknown HeaderType: $value") | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment: We can avoid duplication by using values instead:
fun fromTypeId(value: Byte): HeaderType =
requireNotNull(values().find { it.value == value }) { "Unknown HeaderType: $value" }| is ByteArray -> { | ||
| dest.writeHeader(HeaderType.BYTE_ARRAY) | ||
| check(value.size in Short.MIN_VALUE..Short.MAX_VALUE) { "HeaderValue ByteArray too long" } | ||
| dest.writeShort(value.size.toShort()) | ||
| dest.writeFully(value) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Is any recovery possible at this point? If so, we should probably check the array size before writing the header type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure there is any recovery possible here but agree that the check belongs before starting the write.
| } | ||
| is ByteArray -> { | ||
| dest.writeHeader(HeaderType.BYTE_ARRAY) | ||
| check(value.size in Short.MIN_VALUE..Short.MAX_VALUE) { "HeaderValue ByteArray too long" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: The lower bound here should probably be 0 instead of Short.MIN_VALUE.
| check(value.size in Short.MIN_VALUE..Short.MAX_VALUE) { "HeaderValue ByteArray too long" } | ||
| dest.writeShort(value.size.toShort()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: It's not clear to me from reading the event streams description that the value byte length is meant to be a signed 16-bit integer. Do we have any other sources that may indicate whether 32KB is the max size for a blob/string? Or are 64K values possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I think the header value lengths and overall lengths in the prelude ought to be unsigned. Good call out, will fix.
| @Suppress("NOTHING_TO_INLINE") | ||
| private inline fun MutableBuffer.writeHeader(headerType: HeaderType) = writeByte(headerType.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Why inline this function if the compiler thinks there's nothing to inline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm smarter of course...will remove inline
| // read headers | ||
| var headerBytesConsumed = 0UL | ||
| while (headerBytesConsumed < prelude.headersLength.toULong()) { | ||
| val start = messageBuffer.readPosition | ||
| val header = Header.decode(messageBuffer) | ||
| headerBytesConsumed += messageBuffer.readPosition - start | ||
| message.addHeader(header) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Should we verify when we're done parsing headers that headerBytesConsumed == prelude.headersLength.toULong()? It seems like that could provide an early warning that a message is malformed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems sensible, will update
| * Encode a message to the [dest] buffer | ||
| */ | ||
| public fun encode(dest: MutableBuffer) { | ||
| val encodedHeaders = SdkByteBuffer(16u) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Why 16u?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason, had to pick something. Any suggestion on what a sensible default would be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the absence of a reason to pick something else, I'd say 0u. In fact, upon further examination of smithy-kotlin#482, I'd suggest adding a default value of 0u to the secondary constructor.
kggilmer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very clean, nice work.
| */ | ||
|
|
||
| description = "Support for the vnd.amazon.event-stream content type" | ||
| extra["displayName"] = "AWS :: SDK :: Kotlin :: Event Stream" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
seems like it would be more descriptive if the display name has "Protocols" after "Kotlin", also for json and xml.
| is String -> { | ||
| dest.writeHeader(HeaderType.STRING) | ||
| val bytes = value.encodeToByteArray() | ||
| check(bytes.size in Short.MIN_VALUE..Short.MAX_VALUE) { "HeaderValue String too long" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ian's point above applies here as well
Nit: The lower bound here should probably be 0 instead of Short.MIN_VALUE.
| HeaderType.BYTE_ARRAY, HeaderType.STRING -> { | ||
| val len = buffer.readShort() | ||
| if (len < 0 || buffer.readRemaining < len.toULong()) { | ||
| throw IllegalStateException("Invalid HeaderValue; type=$type, len=$len") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion
Seems like we may also want to know what buffer.readRemaining was at the time of check if this is thrown
| * The decoder will consume the prelude when enough data is available. When it is invoked with enough | ||
| * data it will consume the remaining message bytes. | ||
| */ | ||
| public fun decodeFrame(buffer: Buffer): DecodedFrame { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question
I gather that the intent here that if not enough bytes are in the buffer to decode the next frame that DecodedFrame.Incomplete is returned. Since there is only one valid type that can be returned Message and no state is captured by Incomplete would it not be simpler to represent this case with null instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya we could probably do that, I'll take a look and see. I'm not even sure we'll keep FrameDecoder in it's current state depending on how it all gets wired up. I was sort of following along with both the Java and Rust implementations that have similar abstractions.
Issue #
N/A
Description of changes
vnd.amazon.event-streamcontent-type.SdkBuffer->SdkByteBufferand related API changesScope
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.