-
Notifications
You must be signed in to change notification settings - Fork 55
Bootstrap event streams #537
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e7a63f4
90ffcd7
ab7cb9a
219e38c
64734c6
9f83729
f26ed77
3d8b101
54deef6
f80d912
80b4906
837d962
1972fa9
61513bc
b857b12
5821d0c
1f535e8
b98f700
77e43a0
4e24020
c82e218
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,8 +5,11 @@ | |
|
|
||
| package aws.sdk.kotlin.runtime.execution | ||
|
|
||
| import aws.sdk.kotlin.runtime.auth.credentials.CredentialsProvider | ||
| import aws.smithy.kotlin.runtime.client.ClientOption | ||
| import aws.smithy.kotlin.runtime.time.Instant | ||
| import aws.smithy.kotlin.runtime.util.AttributeKey | ||
| import aws.smithy.kotlin.runtime.util.InternalApi | ||
|
|
||
| /** | ||
| * Operation (execution) options related to authorization | ||
|
|
@@ -34,4 +37,17 @@ public object AuthAttributes { | |
| * NOTE: This is not a common option. | ||
| */ | ||
| public val SigningDate: ClientOption<Instant> = ClientOption("SigningDate") | ||
|
|
||
| /** | ||
| * The [CredentialsProvider] to complete the signing process with. Defaults to the provider configured | ||
| * on the service client. | ||
| * NOTE: This is not a common option. | ||
| */ | ||
| public val CredentialsProvider: ClientOption<CredentialsProvider> = ClientOption("CredentialsProvider") | ||
|
Comment on lines
+41
to
+46
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: These attributes are intended to be used by end-users? If so, can we offer better guidance than "This is not a common option"? Maybe something like "Changing this option is only required in advanced use cases".
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure we can change it. I'm not convinced all of these are intended for use by end-users but I also don't see a reason why they couldn't be. We need to spend some time looking at per/operation config and interceptors. Originally the execution context was envisioned to be something that end users may influence directly but currently they have no way of accessing it. |
||
|
|
||
| /** | ||
| * The signature of the HTTP request. This will only exist after the request has been signed! | ||
| */ | ||
| @InternalApi | ||
| public val RequestSignature: AttributeKey<ByteArray> = AttributeKey("AWS_HTTP_SIGNATURE") | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -144,7 +144,12 @@ public class AwsSigV4SigningMiddleware(private val config: Config) : ModifyReque | |
| } | ||
| } | ||
|
|
||
| val signedRequest = AwsSigner.signRequest(signableRequest, opSigningConfig.toCrt()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: Does
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you asking should it still exist in aws-crt-kotlin? Probably not if the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, asking about the implementation in aws-crt-kotlin specifically. I didn't see any remaining usage after this removal. |
||
| val signingResult = AwsSigner.sign(signableRequest, opSigningConfig.toCrt()) | ||
| val signedRequest = checkNotNull(signingResult.signedRequest) { "signing result must return a non-null HTTP request" } | ||
|
|
||
| // Add the signature to the request context | ||
| req.context[AuthAttributes.RequestSignature] = signingResult.signature | ||
|
|
||
| req.subject.update(signedRequest) | ||
| req.subject.body.resetStream() | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| /* | ||
| * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| * SPDX-License-Identifier: Apache-2.0. | ||
| */ | ||
|
|
||
| package aws.sdk.kotlin.runtime.protocol.eventstream | ||
|
|
||
| import aws.sdk.kotlin.runtime.auth.signing.* | ||
| import aws.sdk.kotlin.runtime.execution.AuthAttributes | ||
| import aws.smithy.kotlin.runtime.client.ExecutionContext | ||
| import aws.smithy.kotlin.runtime.io.SdkByteBuffer | ||
| import aws.smithy.kotlin.runtime.io.bytes | ||
| import aws.smithy.kotlin.runtime.time.Clock | ||
| import aws.smithy.kotlin.runtime.time.Instant | ||
| import aws.smithy.kotlin.runtime.util.InternalApi | ||
| import aws.smithy.kotlin.runtime.util.get | ||
| import kotlinx.coroutines.flow.Flow | ||
| import kotlinx.coroutines.flow.flow | ||
|
|
||
| /** | ||
| * Creates a flow that signs each event stream message with the given signing config. | ||
| * | ||
| * Each message's signature incorporates the signature of the previous message. | ||
| * The very first message incorporates the signature of the initial-request for | ||
| * both HTTP2 and WebSockets. The initial signature comes from the execution context. | ||
| */ | ||
| @InternalApi | ||
| public fun Flow<Message>.sign( | ||
| context: ExecutionContext, | ||
| config: AwsSigningConfig, | ||
| ): Flow<Message> = flow { | ||
| val messages = this@sign | ||
|
|
||
| // NOTE: We need the signature of the initial HTTP request to seed the event stream signatures | ||
| // This is a bit of a chicken and egg problem since the event stream is constructed before the request | ||
| // is signed. The body of the stream shouldn't start being consumed though until after the entire request | ||
| // is built. Thus, by the time we get here the signature will exist in the context. | ||
| var prevSignature = context.getOrNull(AuthAttributes.RequestSignature) ?: error("expected initial HTTP signature to be set before message signing commences") | ||
|
|
||
| // signature date is updated per event message | ||
| val configBuilder = config.toBuilder() | ||
|
|
||
| messages.collect { message -> | ||
| // FIXME - can we get an estimate here on size? | ||
| val buffer = SdkByteBuffer(0U) | ||
| message.encode(buffer) | ||
|
|
||
| // the entire message is wrapped as the payload of the signed message | ||
| val result = signPayload(configBuilder, prevSignature, buffer.bytes()) | ||
| prevSignature = result.signature | ||
| emit(result.output) | ||
| } | ||
| } | ||
|
Comment on lines
+28
to
+53
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: Am I reading correctly that this will collect every message (i.e., the entire event stream) and then emit a new stream with signed messages? Isn't that wasteful and unnecessarily greedy? Could/should we
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is inside of a flow collector so collection is expected and how many of the provided transforms are implemented internally. Note the |
||
|
|
||
| internal suspend fun signPayload( | ||
| configBuilder: AwsSigningConfig.Builder, | ||
| prevSignature: ByteArray, | ||
| messagePayload: ByteArray, | ||
| clock: Clock = Clock.System | ||
| ): SigningResult<Message> { | ||
| val dt = clock.now().truncateSubsecs() | ||
| val config = configBuilder.apply { date = dt }.build() | ||
|
|
||
| val result = sign(messagePayload, prevSignature, config) | ||
| val signature = result.signature | ||
|
|
||
| val signedMessage = buildMessage { | ||
| addHeader(":date", HeaderValue.Timestamp(dt)) | ||
| addHeader(":chunk-signature", HeaderValue.ByteArray(signature)) | ||
| payload = messagePayload | ||
| } | ||
|
|
||
| return SigningResult(signedMessage, signature) | ||
| } | ||
|
|
||
| /** | ||
| * Truncate the sub-seconds from the current time | ||
| */ | ||
| private fun Instant.truncateSubsecs(): Instant = Instant.fromEpochSeconds(epochSeconds, 0) | ||
|
|
||
| /** | ||
| * Create a new signing config for an event stream using the current context to set the operation/service specific | ||
| * configuration (e.g. region, signing service, credentials, etc) | ||
| */ | ||
| @InternalApi | ||
| public fun ExecutionContext.newEventStreamSigningConfig(): AwsSigningConfig = AwsSigningConfig { | ||
| algorithm = AwsSigningAlgorithm.SIGV4 | ||
| signatureType = AwsSignatureType.HTTP_REQUEST_CHUNK | ||
| region = this@newEventStreamSigningConfig[AuthAttributes.SigningRegion] | ||
| service = this@newEventStreamSigningConfig[AuthAttributes.SigningService] | ||
| credentialsProvider = this@newEventStreamSigningConfig[AuthAttributes.CredentialsProvider] | ||
| useDoubleUriEncode = false | ||
| normalizeUriPath = true | ||
|
Comment on lines
+92
to
+93
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: We purposely ignore these values in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I don't follow...
Unknown. There is still some work left before I can send an input stream. I've validated most of the message serialization locally with some integration tests (that I will add to future PR after I clean them up). Signing is an area that may change in a future PR once I understand what we need to do better.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Sorry, disregard. I misread this code. |
||
| signedBodyHeader = AwsSignedBodyHeaderType.NONE | ||
|
|
||
| // FIXME - needs to be set on the operation for initial request | ||
| // signedBodyHeader = AwsSignedBodyHeaderType.X_AMZ_CONTENT_SHA256 | ||
| // signedBodyValue = AwsSignedBodyValue.STREAMING_AWS4_HMAC_SHA256_PAYLOAD | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment: Yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll do it either in one of the follow on PRs or separately after event streams is complete