Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Progress Listener Invocation methods to Asynchronous code paths #5044

Open
wants to merge 54 commits into
base: feature/anirudkr-progress-listener
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
588b102
Adding interfaces for Progress Listener and state capturing Progress …
anirudh9391 Dec 4, 2023
da26d3e
Modified Indentation
anirudh9391 Dec 5, 2023
99febb6
Added new change script
anirudh9391 Dec 5, 2023
7e70224
Revert "Added new change script"
anirudh9391 Dec 5, 2023
5e9057e
Added new change script
anirudh9391 Dec 5, 2023
e607d6c
Moved ProgressSnapshot to its own folder
anirudh9391 Dec 5, 2023
3d26526
Added sdkResponse to Progress Snapshot
anirudh9391 Dec 5, 2023
48a6617
Change totalTransferSize to totalBytes
anirudh9391 Dec 5, 2023
b096765
Modify return types of start and elapsedTime to Optional
anirudh9391 Dec 6, 2023
b558e6b
Progress Listener Interface definition
anirudh9391 Dec 8, 2023
07f2bfb
Address PR comments on Progress Listener
anirudh9391 Dec 8, 2023
d2a3253
Fix checkstyle issues
anirudh9391 Dec 11, 2023
eaf9470
Fix checkstyle issues
anirudh9391 Dec 11, 2023
276fb70
Implement Default Progress Snapshot
anirudh9391 Dec 14, 2023
7b24093
Revert "Implement Default Progress Snapshot"
anirudh9391 Dec 14, 2023
a943002
Revert "Revert "Implement Default Progress Snapshot""
anirudh9391 Dec 14, 2023
1d56ce2
Remove changes to pom
anirudh9391 Dec 14, 2023
7c1978b
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Dec 14, 2023
2e66a9a
Add parameterized unit tests
anirudh9391 Dec 20, 2023
3ffbca2
fix failing tests
anirudh9391 Dec 20, 2023
2dfdef7
Defined ProgressListener invoker methods, along with success and fail…
anirudh9391 Dec 27, 2023
713f6c1
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Dec 27, 2023
ba7b059
Address PR comments
anirudh9391 Jan 3, 2024
dbdd27d
Remove SDK Preview API annotations
anirudh9391 Jan 4, 2024
29259df
Fixed a typo with error logging
anirudh9391 Jan 4, 2024
16e06b7
Rectify typo in adding listeners to RequestOverrideConfiguration and …
anirudh9391 Jan 9, 2024
54caa47
Added Progress Updater Class to invoke listener methods
anirudh9391 Jan 29, 2024
ef05f6b
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Jan 30, 2024
631f674
ProgressUpdater class
anirudh9391 Jan 31, 2024
700bd5e
Define ProgressUpdater and LoggingProgressListener
anirudh9391 Feb 1, 2024
06f811b
Added ProgressUpdater into ExecutionContext and removed asynchronous …
anirudh9391 Feb 13, 2024
3faf34e
Fixed checkstyle issues
anirudh9391 Feb 13, 2024
8946bc2
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Feb 13, 2024
8f18496
Merged from master
anirudh9391 Feb 13, 2024
6eacf65
Added tests for attemptFailureResponseBytesReceived
anirudh9391 Feb 14, 2024
af77917
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Feb 14, 2024
43dd6a2
Asynchronous Code Path Progress Listener WIP
anirudh9391 Feb 15, 2024
6b16403
Added Pre and Post Execution stages in Request Pipeline to update th…
joviegas Feb 18, 2024
7a3db5a
Adding Pre, Post execution stages for tracking http requests invoked …
anirudh9391 Mar 20, 2024
3f0db6b
Add Progress Listener lifecycle methods to Asynchronous request path
anirudh9391 Mar 26, 2024
5d1fc49
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Mar 26, 2024
d3efc8e
Fix rebase
anirudh9391 Mar 26, 2024
98a2a85
Fix method call
anirudh9391 Mar 26, 2024
51ac003
Remove unintentionally added test class
anirudh9391 Mar 26, 2024
210c8ed
Remove needles indentation changes
anirudh9391 Mar 26, 2024
6509782
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Apr 22, 2024
944a76d
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Apr 23, 2024
d5f1258
WIP
anirudh9391 Apr 23, 2024
582c724
Merge branch 'master' into feature/anirudkr-progress-listener
anirudh9391 May 6, 2024
55a9bd8
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 May 6, 2024
8756570
Fix PR comments, added an interface and abstract class to denote Uplo…
anirudh9391 Jun 7, 2024
06f2c2d
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Jun 7, 2024
a604bd3
Remove incorrect modification of test class
anirudh9391 Jun 7, 2024
be2f663
Repurposed BytesReadTrackingPublisher
anirudh9391 Jun 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private static <T> CompletableFuture<T> sendAsync(SdkAsyncHttpClient client,
SdkHttpFullRequest request,
HttpResponseHandler<T> handler,
CompletableFuture<?> parentFuture) {
SdkHttpContentPublisher requestContentPublisher = new SimpleHttpContentPublisher(request);
SdkHttpContentPublisher requestContentPublisher = new SimpleHttpContentPublisher(request, null);
TransformingAsyncResponseHandler<T> responseHandler =
new AsyncResponseHandler<>(handler, Function.identity(), new ExecutionAttributes());
CompletableFuture<T> responseHandlerFuture = responseHandler.prepare();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@

package software.amazon.awssdk.core.http;

import java.util.Optional;
import software.amazon.awssdk.annotations.NotThreadSafe;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptorChain;
import software.amazon.awssdk.core.interceptor.InterceptorContext;
import software.amazon.awssdk.core.internal.progress.listener.ProgressUpdater;
import software.amazon.awssdk.core.signer.Signer;
import software.amazon.awssdk.metrics.MetricCollector;
import software.amazon.awssdk.utils.builder.CopyableBuilder;
Expand All @@ -38,15 +36,13 @@ public final class ExecutionContext implements ToCopyableBuilder<ExecutionContex
private final ExecutionInterceptorChain interceptorChain;
private final ExecutionAttributes executionAttributes;
private final MetricCollector metricCollector;
private final ProgressUpdater progressUpdater;

private ExecutionContext(final Builder builder) {
this.signer = builder.signer;
this.interceptorContext = builder.interceptorContext;
this.interceptorChain = builder.interceptorChain;
this.executionAttributes = builder.executionAttributes;
this.metricCollector = builder.metricCollector;
this.progressUpdater = builder.progressUpdater;
}

public static ExecutionContext.Builder builder() {
Expand Down Expand Up @@ -80,10 +76,6 @@ public MetricCollector metricCollector() {
return metricCollector;
}

public Optional<ProgressUpdater> progressUpdater() {
return progressUpdater != null ? Optional.of(progressUpdater) : Optional.empty();
}

@Override
public Builder toBuilder() {
return new Builder(this);
Expand All @@ -95,7 +87,6 @@ public static class Builder implements CopyableBuilder<Builder, ExecutionContext
private ExecutionAttributes executionAttributes;
private Signer signer;
private MetricCollector metricCollector;
private ProgressUpdater progressUpdater;

private Builder() {
}
Expand All @@ -106,7 +97,6 @@ private Builder(ExecutionContext executionContext) {
this.interceptorChain = executionContext.interceptorChain;
this.executionAttributes = executionContext.executionAttributes;
this.metricCollector = executionContext.metricCollector;
this.progressUpdater = executionContext.progressUpdater;
}

public Builder interceptorContext(InterceptorContext interceptorContext) {
Expand Down Expand Up @@ -134,11 +124,6 @@ public Builder metricCollector(MetricCollector metricCollector) {
return this;
}

public Builder progressUpdater(ProgressUpdater progressUpdater) {
this.progressUpdater = progressUpdater;
return this;
}

@Override
public ExecutionContext build() {
return new ExecutionContext(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import software.amazon.awssdk.core.http.ExecutionContext;
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder;
import software.amazon.awssdk.core.internal.http.pipeline.stages.AfterExecutionInterceptorsStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.AfterExecutionProgressReportingStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.ApplyTransactionIdStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.ApplyUserAgentStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallAttemptMetricCollectionStage;
Expand All @@ -39,6 +40,7 @@
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncSigningStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.BeforeExecutionProgressReportingStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.CompressRequestStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.HttpChecksumStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage;
Expand Down Expand Up @@ -201,12 +203,14 @@ public <OutputT> CompletableFuture<OutputT> execute(
.then(() -> new HttpChecksumStage(ClientType.ASYNC))
.then(MakeRequestImmutableStage::new)
.then(RequestPipelineBuilder
.first(AsyncSigningStage::new)
.first(BeforeExecutionProgressReportingStage::new)
.then(AsyncSigningStage::new)
.then(AsyncBeforeTransmissionExecutionInterceptorsStage::new)
.then(d -> new MakeAsyncHttpRequestStage<>(responseHandler, d))
.wrappedWith(AsyncApiCallAttemptMetricCollectionStage::new)
.wrappedWith((deps, wrapped) -> new AsyncRetryableStage<>(responseHandler, deps, wrapped))
.then(async(() -> new UnwrapResponseContainer<>()))
.then(async(() -> new AfterExecutionProgressReportingStage<>()))
.then(async(() -> new AfterExecutionInterceptorsStage<>()))
.wrappedWith(AsyncExecutionFailureExceptionReportingStage::new)
.wrappedWith(AsyncApiCallTimeoutTrackingStage::new)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder;
import software.amazon.awssdk.core.internal.http.pipeline.stages.AfterExecutionInterceptorsStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.AfterExecutionProgressReportingStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.AfterTransmissionExecutionInterceptorsStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.ApplyTransactionIdStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.ApplyUserAgentStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.BeforeExecutionProgressReportingStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.BeforeTransmissionExecutionInterceptorsStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.BeforeUnmarshallingExecutionInterceptorsStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.CompressRequestStage;
Expand Down Expand Up @@ -204,7 +206,8 @@ public <OutputT> OutputT execute(HttpResponseHandler<Response<OutputT>> response
.then(MakeRequestImmutableStage::new)
// End of mutating request
.then(RequestPipelineBuilder
.first(SigningStage::new)
.first(BeforeExecutionProgressReportingStage::new)
.then(SigningStage::new)
.then(BeforeTransmissionExecutionInterceptorsStage::new)
.then(MakeHttpRequestStage::new)
.then(AfterTransmissionExecutionInterceptorsStage::new)
Expand All @@ -218,6 +221,7 @@ public <OutputT> OutputT execute(HttpResponseHandler<Response<OutputT>> response
.wrappedWith(ApiCallTimeoutTrackingStage::new)::build)
.wrappedWith((deps, wrapped) -> new ApiCallMetricCollectionStage<>(wrapped))
.then(() -> new UnwrapResponseContainer<>())
.then(() -> new AfterExecutionProgressReportingStage<>())
.then(() -> new AfterExecutionInterceptorsStage<>())
.wrappedWith(ExecutionFailureExceptionReportingStage::new)
.build(httpClientDependencies)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.core.internal.http;

import java.util.Optional;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.RequestOverrideConfiguration;
import software.amazon.awssdk.core.SdkRequest;
Expand All @@ -26,6 +27,7 @@
import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
import software.amazon.awssdk.core.internal.http.timers.TimeoutTracker;
import software.amazon.awssdk.core.internal.progress.listener.ProgressUpdater;
import software.amazon.awssdk.core.signer.Signer;
import software.amazon.awssdk.metrics.MetricCollector;
import software.amazon.awssdk.utils.Validate;
Expand All @@ -44,6 +46,7 @@ public final class RequestExecutionContext {
private TimeoutTracker apiCallTimeoutTracker;
private TimeoutTracker apiCallAttemptTimeoutTracker;
private MetricCollector attemptMetricCollector;
private ProgressUpdater progressUpdater;

private RequestExecutionContext(Builder builder) {
this.requestProvider = builder.requestProvider;
Expand Down Expand Up @@ -127,6 +130,14 @@ public void attemptMetricCollector(MetricCollector metricCollector) {
this.attemptMetricCollector = metricCollector;
}

public Optional<ProgressUpdater> progressUpdater() {
return progressUpdater != null ? Optional.of(progressUpdater) : Optional.empty();
}

public void progressUpdater(ProgressUpdater progressUpdater) {
this.progressUpdater = progressUpdater;
}

/**
* Sets the request body provider.
* Used for transforming the original body provider to sign events for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.internal.progress.listener.ProgressUpdater;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
import software.amazon.awssdk.utils.IoUtils;
Expand All @@ -35,11 +36,13 @@ public final class SimpleHttpContentPublisher implements SdkHttpContentPublisher

private final byte[] content;
private final int length;
private final ProgressUpdater progressUpdater;

public SimpleHttpContentPublisher(SdkHttpFullRequest request) {
public SimpleHttpContentPublisher(SdkHttpFullRequest request, ProgressUpdater progressUpdater) {
this.content = request.contentStreamProvider().map(p -> invokeSafely(() -> IoUtils.toByteArray(p.newStream())))
.orElseGet(() -> new byte[0]);
this.length = content.length;
this.progressUpdater = progressUpdater;
}

@Override
Expand All @@ -52,7 +55,7 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
s.onSubscribe(new SubscriptionImpl(s));
}

private class SubscriptionImpl implements Subscription {
private final class SubscriptionImpl implements Subscription {
private boolean running = true;
private final Subscriber<? super ByteBuffer> s;

Expand All @@ -68,6 +71,11 @@ public void request(long n) {
s.onError(new IllegalArgumentException("Demand must be positive"));
} else {
s.onNext(ByteBuffer.wrap(content));

if (progressUpdater != null) {
progressUpdater.incrementBytesSent(length);
}

s.onComplete();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.http.pipeline.stages;

import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
import software.amazon.awssdk.core.internal.util.ProgressListenerUtils;

@SdkInternalApi
public class AfterExecutionProgressReportingStage<OutputT> implements RequestPipeline<OutputT, OutputT> {
@Override
public OutputT execute(OutputT input, RequestExecutionContext context) throws Exception {
if (input instanceof SdkResponse) {
ProgressListenerUtils.updateProgressListenersWithSuccessResponse((SdkResponse) input, context);
}

return input;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.core.internal.http.pipeline.stages;

import static software.amazon.awssdk.core.internal.http.pipeline.stages.utils.ExceptionReportingUtils.reportFailureToInterceptors;
import static software.amazon.awssdk.core.internal.http.pipeline.stages.utils.ExceptionReportingUtils.reportFailureToProgressListeners;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -42,16 +43,19 @@ public CompletableFuture<OutputT> execute(SdkHttpFullRequest input, RequestExecu
CompletableFuture<OutputT> executeFuture = wrappedExecute.handle((o, t) -> {
if (t != null) {
Throwable toReport = t;

if (toReport instanceof CompletionException) {
toReport = toReport.getCause();
}
toReport = reportFailureToInterceptors(context, toReport);

// If Progress Listeners are attached to the request, update them with the throwable
if (context.progressUpdater().isPresent()) {
reportFailureToProgressListeners(context.progressUpdater().get(), toReport);
}

throw CompletableFutureUtils.errorAsCompletionException(ThrowableUtils.asSdkException(toReport));
} else {
return o;
}
return o;
});
return CompletableFutureUtils.forwardExceptionTo(executeFuture, wrappedExecute);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.http.pipeline.stages;

import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
import software.amazon.awssdk.core.internal.http.pipeline.RequestToRequestPipeline;
import software.amazon.awssdk.core.internal.progress.listener.ProgressUpdater;
import software.amazon.awssdk.core.internal.util.ProgressListenerUtils;
import software.amazon.awssdk.http.SdkHttpFullRequest;

@SdkInternalApi
public class BeforeExecutionProgressReportingStage implements RequestToRequestPipeline {

@Override
public SdkHttpFullRequest execute(SdkHttpFullRequest input, RequestExecutionContext context) throws Exception {

if (ProgressListenerUtils.progressListenerAttached(context.originalRequest())) {
Long requestContentLength =
context.requestProvider() != null && context.requestProvider().contentLength().isPresent() ?
context.requestProvider().contentLength().get() : null;

ProgressUpdater progressUpdater = new ProgressUpdater(context.originalRequest(), requestContentLength);
progressUpdater.requestPrepared(input);
context.progressUpdater(progressUpdater);
}

return input;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.core.internal.http.pipeline.stages;

import static software.amazon.awssdk.core.internal.http.pipeline.stages.utils.ExceptionReportingUtils.reportFailureToInterceptors;
import static software.amazon.awssdk.core.internal.http.pipeline.stages.utils.ExceptionReportingUtils.reportFailureToProgressListeners;
import static software.amazon.awssdk.core.internal.util.ThrowableUtils.failure;

import software.amazon.awssdk.annotations.SdkInternalApi;
Expand All @@ -37,6 +38,10 @@ public OutputT execute(SdkHttpFullRequest input, RequestExecutionContext context
return wrapped.execute(input, context);
} catch (Exception e) {
Throwable throwable = reportFailureToInterceptors(context, e);

context.progressUpdater().ifPresent(progressUpdater -> {
reportFailureToProgressListeners(progressUpdater, throwable);
});
throw failure(throwable);
}
}
Expand Down