Skip to content

Conversation

@aajtodd
Copy link
Contributor

@aajtodd aajtodd commented Feb 23, 2022

Issue #

Description of changes

WIP branch for implementing event streams.

This PR mostly covers generating serialization and deserialization for the vnd.amazon.event-stream content type.

  • feat: add chunked signing interfaces required to sign each event (could also be used to sign chunked payloads of any type really)
  • refactor: update event stream encode/decode to use Flow directly to simplify the generated code

Example Codegen

Example for transcribe streaming StartStreamTranscription operation (which has both an input and output stream)

Serialize:

private suspend fun serializeStartStreamTranscriptionOperationBody(context: ExecutionContext, input: StartStreamTranscriptionRequest): HttpBody {
    val stream = input.audioStream ?: return HttpBody.Empty
    val signingConfig = context.newEventStreamSigningConfig()
    val messages = stream.map {
            encodeStartStreamTranscriptionAudioStreamEventMessage(it)
        }
        .sign(context, signingConfig)
        .encode()

    return messages.asEventStreamHttpBody()
}

private fun encodeStartStreamTranscriptionAudioStreamEventMessage(input: AudioStream): Message = buildMessage {
    addHeader(":message-type", HeaderValue.String("event"))
    when(input) {
        is AudioStream.AudioEvent -> {
            addHeader(":event-type", HeaderValue.String("AudioEvent"))
            addHeader(":content-type", HeaderValue.String("application/octet-stream"))
            payload = input.value.audioChunk
        }
        is AudioStream.SdkUnknown -> error("cannot serialize the unknown event type!")
    }
}

Deserialize:

private suspend fun deserializeStartStreamTranscriptionOperationBody(builder: StartStreamTranscriptionResponse.Builder, body: HttpBody) {
    val chan = body.toSdkByteReadChannel() ?: return
    val events = decodeFrames(chan)
        .map { message ->
            when(val mt = message.type()) {
                is MessageType.Event -> when(mt.shapeType) {
                    "TranscriptEvent" -> {
                        val e = deserializeTranscriptEventPayload(message.payload)
                        TranscriptResultStream.TranscriptEvent(e)
                    }
                    else -> TranscriptResultStream.SdkUnknown
                }
                is MessageType.Exception -> when(mt.shapeType){
                    "BadRequestException" -> {
                        val err = deserializeBadRequestExceptionPayload(message.payload)
                        throw err
                    }
                    "LimitExceededException" -> {
                        val err = deserializeLimitExceededExceptionPayload(message.payload)
                        throw err
                    }
                    "InternalFailureException" -> {
                        val err = deserializeInternalFailureExceptionPayload(message.payload)
                        throw err
                    }
                    "ConflictException" -> {
                        val err = deserializeConflictExceptionPayload(message.payload)
                        throw err
                    }
                    "ServiceUnavailableException" -> {
                        val err = deserializeServiceUnavailableExceptionPayload(message.payload)
                        throw err
                    }
                    else -> throw TranscribeStreamingException("error processing event stream, unrecognized errorType: ${mt.shapeType}")
                }
                is MessageType.Error -> throw TranscribeStreamingException("error processing event stream: errorCode=${mt.errorCode}; message=${mt.message}")
                is MessageType.SdkUnknown -> throw ClientException("unrecognized event stream message `:message-type`: ${mt.messageType}")
            }
        }
    builder.transcriptResultStream = events
}

Current state

I've been able to test deserialization (output event streams) for S3 select object and it works fine. Serialization has been tested as part of some local integration testing but the event stream signing needs a bit more work before I can send a real request with an input event stream (e.g. transcribe streaming).

Left to do (follow on PRs)

  • comb through fixme/todo's and triage
  • we need to populate quite a bit more into the execution context for signing. In particular signing of the initial request vs the event stream needs looked at.
  • I have some generated integration tests that I'm working on that aren't quite ready that I'll PR separately. Similar to pagination and waiter tests.
  • more testing of real service calls
  • provide new examples and updated developer guide

I will probably leave RPC event streams as a follow up to this work. Basically an event stream can send/receive an initial request/response. That initial request/response can be tied to the HTTP request OR be sent as the first message on the event stream. The former is what we have now and how S3 and transcribe streaming work. The latter will require a bit more investigation and abstraction to figure out how to implement them. To my knowledge QLDB and kinesis leverage RPC event streams

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

Comment on lines +19 to +20
// FIXME - should we just move these into core and get rid of aws-types at this point?
api(project(":aws-runtime:aws-types"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment: Yes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do it either in one of the follow on PRs or separately after event streams is complete

Comment on lines +41 to +46
/**
* The [CredentialsProvider] to complete the signing process with. Defaults to the provider configured
* on the service client.
* NOTE: This is not a common option.
*/
public val CredentialsProvider: ClientOption<CredentialsProvider> = ClientOption("CredentialsProvider")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: These attributes are intended to be used by end-users? If so, can we offer better guidance than "This is not a common option"? Maybe something like "Changing this option is only required in advanced use cases".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure we can change it. I'm not convinced all of these are intended for use by end-users but I also don't see a reason why they couldn't be. We need to spend some time looking at per/operation config and interceptors. Originally the execution context was envisioned to be something that end users may influence directly but currently they have no way of accessing it.

}
}

val signedRequest = AwsSigner.signRequest(signableRequest, opSigningConfig.toCrt())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Does AwsSigner.signRequest still need to exist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you asking should it still exist in aws-crt-kotlin?

Probably not if the sign() function returning AwsSigningResult supersedes it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, asking about the implementation in aws-crt-kotlin specifically. I didn't see any remaining usage after this removal.

Comment on lines +28 to +53
public fun Flow<Message>.sign(
context: ExecutionContext,
config: AwsSigningConfig,
): Flow<Message> = flow {
val messages = this@sign

// NOTE: We need the signature of the initial HTTP request to seed the event stream signatures
// This is a bit of a chicken and egg problem since the event stream is constructed before the request
// is signed. The body of the stream shouldn't start being consumed though until after the entire request
// is built. Thus, by the time we get here the signature will exist in the context.
var prevSignature = context.getOrNull(AuthAttributes.RequestSignature) ?: error("expected initial HTTP signature to be set before message signing commences")

// signature date is updated per event message
val configBuilder = config.toBuilder()

messages.collect { message ->
// FIXME - can we get an estimate here on size?
val buffer = SdkByteBuffer(0U)
message.encode(buffer)

// the entire message is wrapped as the payload of the signed message
val result = signPayload(configBuilder, prevSignature, buffer.bytes())
prevSignature = result.signature
emit(result.output)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Am I reading correctly that this will collect every message (i.e., the entire event stream) and then emit a new stream with signed messages? Isn't that wasteful and unnecessarily greedy? Could/should we map instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inside of a flow collector so collection is expected and how many of the provided transforms are implemented internally.

Note the = flow { on line 31. We are creating a new flow that processes elements from the current one. Collection will not happen until the returned flow is collected

Comment on lines +92 to +93
useDoubleUriEncode = false
normalizeUriPath = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: We purposely ignore these values in AwsSigningConfig? Do any services (e.g., S3) need special values here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We purposely ignore these values in AwsSigningConfig?

I don't follow...

Do any services (e.g., S3) need special values here?

Unknown. There is still some work left before I can send an input stream. I've validated most of the message serialization locally with some integration tests (that I will add to future PR after I clean them up). Signing is an area that may change in a future PR once I understand what we need to do better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We purposely ignore these values in AwsSigningConfig?

I don't follow...

Sorry, disregard. I misread this code.

Comment on lines +44 to +54
val job = scope.launch {
encodedMessages.collect {
ch.writeFully(it)
}
}

job.invokeOnCompletion { cause ->
ch.close(cause)
}

return ch.toHttpBody()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Why is a Job necessary here? If invokeOnCompletion waits for the job to complete anyway then it seems like this could be replaced with simply invoking collect directly in this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to collect directly in this method. We are processing a stream and it needs to be done asynchronously since it is of unknown size. It also needs to happen after we have sent the initial request (which now that I look at the generated code may need tweaked a bit, will find out when I get it actually working on a real service).

We launch a coroutine to do this (which returns Job). invokeOnCompletion does not wait for the Job to complete, it simply wires up a completion handler that will fire when the job transitions to the Complete state (which will happen for either success or failed/cancelled jobs).

Comment on lines 98 to 99
// transcribe streaming contains exclusively EventStream operations which are not supported
"transcribestreaming",
// "transcribestreaming",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Can be removed?

Comment on lines 83 to 104
object AwsEventStream {
val decodeFrames = runtimeSymbol("decodeFrames", AwsKotlinDependency.AWS_EVENT_STREAM)
val Message = runtimeSymbol("Message", AwsKotlinDependency.AWS_EVENT_STREAM)
val HeaderValue = runtimeSymbol("HeaderValue", AwsKotlinDependency.AWS_EVENT_STREAM)
val buildMessage = runtimeSymbol("buildMessage", AwsKotlinDependency.AWS_EVENT_STREAM)
val MessageType = runtimeSymbol("MessageType", AwsKotlinDependency.AWS_EVENT_STREAM)
val MessageTypeExt = runtimeSymbol("type", AwsKotlinDependency.AWS_EVENT_STREAM)

val sign = runtimeSymbol("sign", AwsKotlinDependency.AWS_EVENT_STREAM)
val newEventStreamSigningConfig = runtimeSymbol("newEventStreamSigningConfig", AwsKotlinDependency.AWS_EVENT_STREAM)
val encode = runtimeSymbol("encode", AwsKotlinDependency.AWS_EVENT_STREAM)
val asEventStreamHttpBody = runtimeSymbol("asEventStreamHttpBody", AwsKotlinDependency.AWS_EVENT_STREAM)

val expectBool = runtimeSymbol("expectBool", AwsKotlinDependency.AWS_EVENT_STREAM)
val expectByte = runtimeSymbol("expectByte", AwsKotlinDependency.AWS_EVENT_STREAM)
val expectInt16 = runtimeSymbol("expectInt16", AwsKotlinDependency.AWS_EVENT_STREAM)
val expectInt32 = runtimeSymbol("expectInt32", AwsKotlinDependency.AWS_EVENT_STREAM)
val expectInt64 = runtimeSymbol("expectInt64", AwsKotlinDependency.AWS_EVENT_STREAM)
val expectByteArray = runtimeSymbol("expectByteArray", AwsKotlinDependency.AWS_EVENT_STREAM)
val expectString = runtimeSymbol("expectString", AwsKotlinDependency.AWS_EVENT_STREAM)
val expectTimestamp = runtimeSymbol("expectTimestamp", AwsKotlinDependency.AWS_EVENT_STREAM)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: I suggest we sort these lexicographically.

val streamingMember = ioShape.findStreamingMember(model)
streamingMember?.isUnionShape ?: false
val target = streamingMember?.let { model.expectShape(it.target) }
target?.isUnionShape ?: false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Why do we still need this integration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may or may not depending on what we do with RPC based event streams. I need to look into how much work it will be to support them (IIRC Kinesis and QLDB are the only ones that use this whereas transcribe streaming and S3 use the HTTP request/response for the initial request/response of the event stream)

Comment on lines 69 to 77
writer.write("val messages = stream.#T {", RuntimeTypes.KotlinxCoroutines.Flow.map)
.indent()
.indent()
.write("#T(it)", encodeFn)
.dedent()
.write("}")
.write(".#T(context, signingConfig)", AwsRuntimeTypes.AwsEventStream.sign)
.write(".#T()", AwsRuntimeTypes.AwsEventStream.encode)
.dedent()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: This indentation is non-standard and the encoding function doesn't need a full lambda body. I suggest:

writer.withBlock("val messages = stream", "") {
    write(".#T(::#T)", RuntimeTypes.KotlinxCoroutines.Flow.map, encodeFn)
    write(".#T(context, signingConfig)", AwsRuntimeTypes.AwsEventStream.sign)
    write(".#T()", AwsRuntimeTypes.AwsEventStream.encode)
}

Yielding:

val messages = stream
    .map(::encodeFooEventMessage)
    .sign(context, signingConfig)
    .encode()

@sonarqubecloud
Copy link

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 20 Code Smells

No Coverage information No Coverage information
0.0% 0.0% Duplication

@aajtodd aajtodd merged commit 7c090bf into feat-event-streams Mar 4, 2022
@aajtodd aajtodd deleted the bootstrap-event-streams branch March 4, 2022 15:47
aajtodd added a commit that referenced this pull request Mar 4, 2022
* fix event stream filtering

* add parsing of common headers

* refractor frame decoder into a Flow

* remove event stream operation filter from customizations

* refactore event stream parsing; implement rough deserialization codegen

* fix warning

* filter out event stream errors

* render deserialization for exception event stream messages

* inject http request signature into the execution context once known

* add support for chunked signing

* add encode transform for message stream

* inline signing config builder

* initial event stream serialize implementation

* fix compile issues

* disable wip integration tests

* suppress test; cleanup codegen
aajtodd added a commit that referenced this pull request Mar 9, 2022
* Bootstrap event streams (#537)

* fix event stream filtering

* add parsing of common headers

* refractor frame decoder into a Flow

* remove event stream operation filter from customizations

* refactore event stream parsing; implement rough deserialization codegen

* fix warning

* filter out event stream errors

* render deserialization for exception event stream messages

* inject http request signature into the execution context once known

* add support for chunked signing

* add encode transform for message stream

* inline signing config builder

* initial event stream serialize implementation

* fix compile issues

* disable wip integration tests

* suppress test; cleanup codegen

* Event Stream Codegen Tests (#542)

* Checkpoint Event Streams (#544)

* fix tests

* increase windows runner memory
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants