Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Progress Listener Invocation methods to Asynchronous and Synchronous code paths #5044

Open
wants to merge 65 commits into
base: feature/anirudkr-progress-listener
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
588b102
Adding interfaces for Progress Listener and state capturing Progress …
anirudh9391 Dec 4, 2023
da26d3e
Modified Indentation
anirudh9391 Dec 5, 2023
99febb6
Added new change script
anirudh9391 Dec 5, 2023
7e70224
Revert "Added new change script"
anirudh9391 Dec 5, 2023
5e9057e
Added new change script
anirudh9391 Dec 5, 2023
e607d6c
Moved ProgressSnapshot to its own folder
anirudh9391 Dec 5, 2023
3d26526
Added sdkResponse to Progress Snapshot
anirudh9391 Dec 5, 2023
48a6617
Change totalTransferSize to totalBytes
anirudh9391 Dec 5, 2023
b096765
Modify return types of start and elapsedTime to Optional
anirudh9391 Dec 6, 2023
b558e6b
Progress Listener Interface definition
anirudh9391 Dec 8, 2023
07f2bfb
Address PR comments on Progress Listener
anirudh9391 Dec 8, 2023
d2a3253
Fix checkstyle issues
anirudh9391 Dec 11, 2023
eaf9470
Fix checkstyle issues
anirudh9391 Dec 11, 2023
276fb70
Implement Default Progress Snapshot
anirudh9391 Dec 14, 2023
7b24093
Revert "Implement Default Progress Snapshot"
anirudh9391 Dec 14, 2023
a943002
Revert "Revert "Implement Default Progress Snapshot""
anirudh9391 Dec 14, 2023
1d56ce2
Remove changes to pom
anirudh9391 Dec 14, 2023
7c1978b
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Dec 14, 2023
2e66a9a
Add parameterized unit tests
anirudh9391 Dec 20, 2023
3ffbca2
fix failing tests
anirudh9391 Dec 20, 2023
2dfdef7
Defined ProgressListener invoker methods, along with success and fail…
anirudh9391 Dec 27, 2023
713f6c1
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Dec 27, 2023
ba7b059
Address PR comments
anirudh9391 Jan 3, 2024
dbdd27d
Remove SDK Preview API annotations
anirudh9391 Jan 4, 2024
29259df
Fixed a typo with error logging
anirudh9391 Jan 4, 2024
16e06b7
Rectify typo in adding listeners to RequestOverrideConfiguration and …
anirudh9391 Jan 9, 2024
54caa47
Added Progress Updater Class to invoke listener methods
anirudh9391 Jan 29, 2024
ef05f6b
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Jan 30, 2024
631f674
ProgressUpdater class
anirudh9391 Jan 31, 2024
700bd5e
Define ProgressUpdater and LoggingProgressListener
anirudh9391 Feb 1, 2024
06f811b
Added ProgressUpdater into ExecutionContext and removed asynchronous …
anirudh9391 Feb 13, 2024
3faf34e
Fixed checkstyle issues
anirudh9391 Feb 13, 2024
8946bc2
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Feb 13, 2024
8f18496
Merged from master
anirudh9391 Feb 13, 2024
6eacf65
Added tests for attemptFailureResponseBytesReceived
anirudh9391 Feb 14, 2024
af77917
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Feb 14, 2024
43dd6a2
Asynchronous Code Path Progress Listener WIP
anirudh9391 Feb 15, 2024
6b16403
Added Pre and Post Execution stages in Request Pipeline to update th…
joviegas Feb 18, 2024
7a3db5a
Adding Pre, Post execution stages for tracking http requests invoked …
anirudh9391 Mar 20, 2024
3f0db6b
Add Progress Listener lifecycle methods to Asynchronous request path
anirudh9391 Mar 26, 2024
5d1fc49
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Mar 26, 2024
d3efc8e
Fix rebase
anirudh9391 Mar 26, 2024
98a2a85
Fix method call
anirudh9391 Mar 26, 2024
51ac003
Remove unintentionally added test class
anirudh9391 Mar 26, 2024
210c8ed
Remove needles indentation changes
anirudh9391 Mar 26, 2024
6509782
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Apr 22, 2024
944a76d
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Apr 23, 2024
d5f1258
WIP
anirudh9391 Apr 23, 2024
582c724
Merge branch 'master' into feature/anirudkr-progress-listener
anirudh9391 May 6, 2024
55a9bd8
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 May 6, 2024
8756570
Fix PR comments, added an interface and abstract class to denote Uplo…
anirudh9391 Jun 7, 2024
06f2c2d
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Jun 7, 2024
a604bd3
Remove incorrect modification of test class
anirudh9391 Jun 7, 2024
be2f663
Repurposed BytesReadTrackingPublisher
anirudh9391 Jun 10, 2024
a04b6eb
Merge branch 'master' into dev/progress-listener
anirudh9391 Jul 5, 2024
dea870b
Added testing for catching attempt failures
anirudh9391 Jul 5, 2024
e0e5e23
Added Tests for async exception reporting
anirudh9391 Jul 5, 2024
89b9f0a
Fixed checkstyle
anirudh9391 Jul 5, 2024
13a8461
Modified method call signatures
anirudh9391 Jul 5, 2024
fa580e4
Added comments to update BytesReadTrackingPublishee class
anirudh9391 Jul 5, 2024
2d9cfe7
Merge branch 'feature/anirudkr-progress-listener' into dev/progress-l…
anirudh9391 Jul 5, 2024
b40b3e1
Revert "Merge branch 'feature/anirudkr-progress-listener' into dev/pr…
anirudh9391 Jul 5, 2024
ab9445d
Improve code quality
anirudh9391 Jul 9, 2024
3eb5af9
Progress Listener for sync codepath
anirudh9391 Jul 15, 2024
f6f2c25
Add response content length
anirudh9391 Jul 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeRequestMutableStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.MergeCustomHeadersStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.MergeCustomQueryParamsStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.PostExecutionUpdateProgressStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.PreExecutionUpdateProgressStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.QueryParametersToBodyStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.UnwrapResponseContainer;
import software.amazon.awssdk.core.internal.util.ThrowableUtils;
Expand Down Expand Up @@ -201,12 +203,14 @@ public <OutputT> CompletableFuture<OutputT> execute(
.then(() -> new HttpChecksumStage(ClientType.ASYNC))
.then(MakeRequestImmutableStage::new)
.then(RequestPipelineBuilder
.first(AsyncSigningStage::new)
.first(PreExecutionUpdateProgressStage::new)
anirudh9391 marked this conversation as resolved.
Show resolved Hide resolved
.then(AsyncSigningStage::new)
.then(AsyncBeforeTransmissionExecutionInterceptorsStage::new)
.then(d -> new MakeAsyncHttpRequestStage<>(responseHandler, d))
.wrappedWith(AsyncApiCallAttemptMetricCollectionStage::new)
.wrappedWith((deps, wrapped) -> new AsyncRetryableStage<>(responseHandler, deps, wrapped))
.then(async(() -> new UnwrapResponseContainer<>()))
.then(() -> new PostExecutionUpdateProgressStage<>())
anirudh9391 marked this conversation as resolved.
Show resolved Hide resolved
.then(async(() -> new AfterExecutionInterceptorsStage<>()))
.wrappedWith(AsyncExecutionFailureExceptionReportingStage::new)
.wrappedWith(AsyncApiCallTimeoutTrackingStage::new)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeRequestMutableStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.MergeCustomHeadersStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.MergeCustomQueryParamsStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.PostExecutionUpdateProgressStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.PreExecutionUpdateProgressStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.QueryParametersToBodyStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.SigningStage;
Expand Down Expand Up @@ -204,7 +206,8 @@ public <OutputT> OutputT execute(HttpResponseHandler<Response<OutputT>> response
.then(MakeRequestImmutableStage::new)
// End of mutating request
.then(RequestPipelineBuilder
.first(SigningStage::new)
.first(PreExecutionUpdateProgressStage::new)
.then(SigningStage::new)
.then(BeforeTransmissionExecutionInterceptorsStage::new)
.then(MakeHttpRequestStage::new)
.then(AfterTransmissionExecutionInterceptorsStage::new)
Expand All @@ -219,6 +222,7 @@ public <OutputT> OutputT execute(HttpResponseHandler<Response<OutputT>> response
.wrappedWith((deps, wrapped) -> new ApiCallMetricCollectionStage<>(wrapped))
.then(() -> new UnwrapResponseContainer<>())
.then(() -> new AfterExecutionInterceptorsStage<>())
.then(() -> new PostExecutionUpdateProgressStage<>())
.wrappedWith(ExecutionFailureExceptionReportingStage::new)
.build(httpClientDependencies)
.execute(request, createRequestExecutionDependencies());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@ public CompletableFuture<OutputT> execute(SdkHttpFullRequest input, RequestExecu
CompletableFuture<OutputT> executeFuture = wrappedExecute.handle((o, t) -> {
if (t != null) {
Throwable toReport = t;

if (toReport instanceof CompletionException) {
toReport = toReport.getCause();
}
toReport = reportFailureToInterceptors(context, toReport);

context.executionContext().progressUpdater().ifPresent(progressUpdater -> {
progressUpdater.attemptFailure(t);
anirudh9391 marked this conversation as resolved.
Show resolved Hide resolved
});

throw CompletableFutureUtils.errorAsCompletionException(ThrowableUtils.asSdkException(toReport));
} else {
return o;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public OutputT execute(SdkHttpFullRequest input, RequestExecutionContext context
return wrapped.execute(input, context);
} catch (Exception e) {
Throwable throwable = reportFailureToInterceptors(context, e);

context.executionContext().progressUpdater().ifPresent(progressUpdater -> {
progressUpdater.attemptFailure(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be throwable. Can we add a test for this?

});
throw failure(throwable);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import software.amazon.awssdk.core.internal.http.timers.TimeoutTracker;
import software.amazon.awssdk.core.internal.http.timers.TimerUtils;
import software.amazon.awssdk.core.internal.metrics.BytesReadTrackingPublisher;
import software.amazon.awssdk.core.internal.metrics.BytesSentTrackingPublisher;
import software.amazon.awssdk.core.internal.progress.listener.ProgressUpdater;
import software.amazon.awssdk.core.internal.util.MetricUtils;
import software.amazon.awssdk.core.metrics.CoreMetric;
import software.amazon.awssdk.http.SdkHttpFullRequest;
Expand All @@ -58,6 +60,7 @@
import software.amazon.awssdk.metrics.MetricCollector;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.StringUtils;

/**
* Delegate to the HTTP implementation to make an HTTP request and receive the response.
Expand Down Expand Up @@ -137,6 +140,21 @@ private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullReque

MetricCollector httpMetricCollector = MetricUtils.createHttpMetricsCollector(context);

//If Progress Listening is enabled, wrap around BytesSentTrackingPublisher to track progress on bytes sent
if (context.executionContext().progressUpdater().isPresent()) {
anirudh9391 marked this conversation as resolved.
Show resolved Hide resolved

ProgressUpdater progressUpdater = context.executionContext().progressUpdater().get();

if (shouldSetContentLength(request, requestProvider)) {
progressUpdater.updateRequestContentLength(requestProvider.contentLength().get());
}

requestProvider = new BytesSentTrackingPublisher(requestProvider,
progressUpdater,
requestProvider.contentLength());

}

AsyncExecuteRequest.Builder executeRequestBuilder = AsyncExecuteRequest.builder()
.request(requestWithContentLength)
.requestContentPublisher(requestProvider)
Expand Down Expand Up @@ -303,13 +321,27 @@ public void onHeaders(SdkHttpResponse headers) {
long d = now - startTime;
context.attemptMetricCollector().reportMetric(CoreMetric.TIME_TO_FIRST_BYTE, Duration.ofNanos(d));
super.onHeaders(headers);

context.executionContext().progressUpdater().ifPresent(progressUpdater -> {
progressUpdater.responseHeaderReceived();
headers.firstMatchingHeader(CONTENT_LENGTH).ifPresent(value -> {
if (!StringUtils.isNotBlank(value)) {
progressUpdater.updateResponseContentLength(Long.parseLong(value));
}
});
});
}

@Override
public void onStream(Publisher<ByteBuffer> stream) {
BytesReadTrackingPublisher bytesReadTrackingPublisher;
AtomicLong bytesReadCounter = context.executionAttributes()
.getAttribute(SdkInternalExecutionAttribute.RESPONSE_BYTES_READ);
BytesReadTrackingPublisher bytesReadTrackingPublisher = new BytesReadTrackingPublisher(stream, bytesReadCounter);

bytesReadTrackingPublisher = new BytesReadTrackingPublisher(stream,
anirudh9391 marked this conversation as resolved.
Show resolved Hide resolved
bytesReadCounter,
context.executionContext().progressUpdater());

super.onStream(bytesReadTrackingPublisher);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.http.pipeline.stages;

import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;

@SdkInternalApi
public class PostExecutionUpdateProgressStage<OutputT> implements RequestPipeline<OutputT, OutputT> {
@Override
public OutputT execute(OutputT input, RequestExecutionContext context) throws Exception {

context.executionContext().progressUpdater().ifPresent(progressUpdater -> {
progressUpdater.executionSuccess((SdkResponse) input);
});
return input;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.http.pipeline.stages;

import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
import software.amazon.awssdk.core.internal.http.pipeline.RequestToRequestPipeline;
import software.amazon.awssdk.core.internal.progress.listener.ProgressUpdater;
import software.amazon.awssdk.http.SdkHttpFullRequest;

@SdkInternalApi
public class PreExecutionUpdateProgressStage implements RequestToRequestPipeline {

@Override
public SdkHttpFullRequest execute(SdkHttpFullRequest input, RequestExecutionContext context) throws Exception {

if (progressListenerAttached(context.originalRequest())) {
Long requestContentLength =
context.requestProvider() != null && context.requestProvider().contentLength().isPresent() ?
context.requestProvider().contentLength().get() : null;

if (context.executionContext().progressUpdater().isPresent()) {
context.executionContext().progressUpdater().get().requestPrepared(input);
anirudh9391 marked this conversation as resolved.
Show resolved Hide resolved
} else {
ProgressUpdater progressUpdater = new ProgressUpdater(context.originalRequest(), requestContentLength);
progressUpdater.requestPrepared(input);
context.executionContext().toBuilder().progressUpdater(progressUpdater);
anirudh9391 marked this conversation as resolved.
Show resolved Hide resolved
}
}

return input;
}

public boolean progressListenerAttached(SdkRequest request) {
if (request.overrideConfiguration().isPresent() &&
!request.overrideConfiguration().get().progressListeners().isEmpty()) {
return true;
}
return false;
anirudh9391 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.Response;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;

Expand All @@ -31,6 +32,9 @@
public class UnwrapResponseContainer<OutputT> implements RequestPipeline<Response<OutputT>, OutputT> {
@Override
public OutputT execute(Response<OutputT> input, RequestExecutionContext context) throws Exception {
context.executionContext().progressUpdater().ifPresent(progressUpdater -> {
progressUpdater.executionSuccess((SdkResponse) input.response());
});
return input.response();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
package software.amazon.awssdk.core.internal.metrics;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.internal.progress.listener.ProgressUpdater;

/**
* Publisher that tracks how many bytes are published from the wrapped publisher to the downstream subscriber.
Expand All @@ -29,15 +31,20 @@
public final class BytesReadTrackingPublisher implements Publisher<ByteBuffer> {
private final Publisher<ByteBuffer> upstream;
private final AtomicLong bytesRead;
private ProgressUpdater progressUpdater;

public BytesReadTrackingPublisher(Publisher<ByteBuffer> upstream, AtomicLong bytesRead) {
public BytesReadTrackingPublisher(Publisher<ByteBuffer> upstream, AtomicLong bytesRead,
Optional<ProgressUpdater> progressUpdater) {
this.upstream = upstream;
this.bytesRead = bytesRead;
progressUpdater.ifPresent(value -> {
this.progressUpdater = value;
});
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
upstream.subscribe(new BytesReadTracker(subscriber, bytesRead));
upstream.subscribe(new BytesReadTracker(subscriber, bytesRead, progressUpdater));
}

public long bytesRead() {
Expand All @@ -47,21 +54,30 @@ public long bytesRead() {
private static final class BytesReadTracker implements Subscriber<ByteBuffer> {
private final Subscriber<? super ByteBuffer> downstream;
private final AtomicLong bytesRead;
private final ProgressUpdater progressUpdater;

private BytesReadTracker(Subscriber<? super ByteBuffer> downstream, AtomicLong bytesRead) {
private BytesReadTracker(Subscriber<? super ByteBuffer> downstream,
AtomicLong bytesRead, ProgressUpdater progressUpdater) {
this.downstream = downstream;
this.bytesRead = bytesRead;
this.progressUpdater = progressUpdater;
}

@Override
public void onSubscribe(Subscription subscription) {
downstream.onSubscribe(subscription);
if (progressUpdater != null) {
progressUpdater.resetBytesReceived();
}
}

@Override
public void onNext(ByteBuffer byteBuffer) {
bytesRead.addAndGet(byteBuffer.remaining());
downstream.onNext(byteBuffer);
if (progressUpdater != null) {
progressUpdater.incrementBytesReceived(bytesRead.get());
}
}

@Override
Expand Down
Loading
Loading