Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed a bug in file-based AsyncResponseTransformer that could cause a… #5220

Merged
merged 5 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-bugfix-23ca6e2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS SDK for Java v2",
"contributor": "",
"description": "Fixed a bug in file-based AsyncResponseTransformer that could cause a streaming request to hang if an exception was thrown from `onStream`"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static software.amazon.awssdk.core.FileTransformerConfiguration.FileWriteOption.CREATE_OR_APPEND_TO_EXISTING;
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
import static software.amazon.awssdk.utils.FunctionalUtils.runAndLogError;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -42,6 +43,7 @@
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.utils.Logger;

/**
* {@link AsyncResponseTransformer} that writes the data to the specified file.
Expand All @@ -50,6 +52,7 @@
*/
@SdkInternalApi
public final class FileAsyncResponseTransformer<ResponseT> implements AsyncResponseTransformer<ResponseT, ResponseT> {
private static final Logger log = Logger.loggerFor(FileAsyncResponseTransformer.class);
private final Path path;
private volatile AsynchronousFileChannel fileChannel;
private volatile CompletableFuture<Void> cf;
Expand Down Expand Up @@ -108,7 +111,9 @@ public CompletableFuture<ResponseT> prepare() {
cf = new CompletableFuture<>();
cf.whenComplete((r, t) -> {
if (t != null && fileChannel != null) {
invokeSafely(fileChannel::close);
runAndLogError(log.logger(),
String.format("Failed to close the file %s, resource may be leaked", path),
() -> fileChannel.close());
}
});
return cf.thenApply(ignored -> response);
Expand All @@ -121,21 +126,29 @@ public void onResponse(ResponseT response) {

@Override
public void onStream(SdkPublisher<ByteBuffer> publisher) {
// onStream may be called multiple times so reset the file channel every time
this.fileChannel = invokeSafely(() -> createChannel(path));
publisher.subscribe(new FileSubscriber(this.fileChannel, path, cf, this::exceptionOccurred,
position));
try {
// onStream may be called multiple times so reset the file channel every time
this.fileChannel = createChannel(path);
publisher.subscribe(new FileSubscriber(this.fileChannel, path, cf, this::exceptionOccurred,
position));
} catch (Throwable e) {
exceptionOccurred(e);
}
}

@Override
public void exceptionOccurred(Throwable throwable) {
try {
if (fileChannel != null) {
invokeSafely(fileChannel::close);
runAndLogError(log.logger(),
String.format("Failed to close the file %s, resource may be leaked", path),
() -> fileChannel.close());
}
} finally {
if (configuration.failureBehavior() == FailureBehavior.DELETE) {
invokeSafely(() -> Files.deleteIfExists(path));
runAndLogError(log.logger(),
String.format("Failed to delete the file %s", path),
() -> Files.deleteIfExists(path));
}
}
cf.completeExceptionally(throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ void noConfiguration_fileAlreadyExists_shouldThrowException() throws Exception {

CompletableFuture<String> future = transformer.prepare();
transformer.onResponse("foobar");
assertThatThrownBy(() -> transformer.onStream(testPublisher(content))).hasRootCauseInstanceOf(FileAlreadyExistsException.class);
transformer.onStream(testPublisher(content));
assertThatThrownBy(() -> future.join()).hasRootCauseInstanceOf(FileAlreadyExistsException.class);
}

@Test
Expand Down Expand Up @@ -246,6 +247,15 @@ void explicitExecutor_shouldUseExecutor() throws Exception {
}
}

@Test
void onStreamFailed_shouldCompleteFutureExceptionally() {
Path testPath = testFs.getPath("test_file.txt");
FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath);
CompletableFuture<String> future = transformer.prepare();
transformer.onStream(null);
assertThat(future).isCompletedExceptionally();
}

private static void stubSuccessfulStreaming(String newContent, FileAsyncResponseTransformer<String> transformer) throws Exception {
CompletableFuture<String> future = transformer.prepare();
transformer.onResponse("foobar");
Expand Down
Loading