Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FileAsyncResponseTransformer - write to position #5241

Merged
merged 4 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ExecutorService;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.utils.ToString;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.builder.CopyableBuilder;
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;
Expand All @@ -41,11 +42,19 @@ public final class FileTransformerConfiguration implements ToCopyableBuilder<Fil
private final FileWriteOption fileWriteOption;
private final FailureBehavior failureBehavior;
private final ExecutorService executorService;
private final Long position;

private FileTransformerConfiguration(DefaultBuilder builder) {
this.fileWriteOption = Validate.paramNotNull(builder.fileWriteOption, "fileWriteOption");
this.failureBehavior = Validate.paramNotNull(builder.failureBehavior, "failureBehavior");
this.executorService = builder.executorService;
this.position = builder.position;
if (fileWriteOption != FileWriteOption.WRITE_TO_POSITION && position != null) {
throw new IllegalArgumentException(String.format(
"'position' can only be used with 'WRITE_TO_POSITION' file write option, but was used with '%s'",
fileWriteOption
));
}
}

/**
Expand All @@ -72,6 +81,18 @@ public Optional<ExecutorService> executorService() {
return Optional.ofNullable(executorService);
}

/**
* Exclusively used with {@link FileWriteOption#WRITE_TO_POSITION}. Configures the position, where to start writing to the
* existing file. The location correspond to the first byte where new data will be written. For example, if {@code 128} is
* configured, bytes 0-127 of the existing file will remain untouched and data will be written starting at byte 128. If not
* specified, defaults to 0.
*
* @return The offset at which to start overwriting data in the file.
*/
public Long position() {
return position;
}

/**
* Create a {@link Builder}, used to create a {@link FileTransformerConfiguration}.
*/
Expand Down Expand Up @@ -137,6 +158,9 @@ public boolean equals(Object o) {
if (failureBehavior != that.failureBehavior) {
return false;
}
if (!Objects.equals(position, that.position)) {
return false;
}
return Objects.equals(executorService, that.executorService);
}

Expand All @@ -145,6 +169,7 @@ public int hashCode() {
int result = fileWriteOption != null ? fileWriteOption.hashCode() : 0;
result = 31 * result + (failureBehavior != null ? failureBehavior.hashCode() : 0);
result = 31 * result + (executorService != null ? executorService.hashCode() : 0);
result = 31 * result + (position != null ? position.hashCode() : 0);
return result;
}

Expand All @@ -165,7 +190,15 @@ public enum FileWriteOption {
/**
* Create a new file if it doesn't exist, otherwise append to the existing file.
*/
CREATE_OR_APPEND_TO_EXISTING
CREATE_OR_APPEND_TO_EXISTING,

/**
* Write to an existing file at the specified position, defined by the {@link FileTransformerConfiguration#position()}. If
* the file does not exist, a {@link java.nio.file.NoSuchFileException} will be thrown. If
* {@link FileTransformerConfiguration#position()} is not configured, start overwriting data at the beginning of the file
* (byte 0).
*/
WRITE_TO_POSITION
}

/**
Expand Down Expand Up @@ -209,12 +242,24 @@ public interface Builder extends CopyableBuilder<Builder, FileTransformerConfigu
* @return This object for method chaining.
*/
Builder executorService(ExecutorService executorService);

/**
* Exclusively used with {@link FileWriteOption#WRITE_TO_POSITION}. Configures the position, where to start writing to the
* existing file. The location correspond to the first byte where new data will be written. For example, if {@code 128} is
* configured, bytes 0-127 of the existing file will remain untouched and data will be written starting at byte 128. If
* not specified, defaults to 0.
*
* @param writePosition the position at where to start writing data to the file.
* @return This object for method chaining.
*/
Builder position(Long writePosition);
}

private static final class DefaultBuilder implements Builder {
private FileWriteOption fileWriteOption;
private FailureBehavior failureBehavior;
private ExecutorService executorService;
private Long position;

private DefaultBuilder() {
}
Expand All @@ -223,6 +268,7 @@ private DefaultBuilder(FileTransformerConfiguration fileTransformerConfiguration
this.fileWriteOption = fileTransformerConfiguration.fileWriteOption;
this.failureBehavior = fileTransformerConfiguration.failureBehavior;
this.executorService = fileTransformerConfiguration.executorService;
this.position = fileTransformerConfiguration.position;
}

@Override
Expand All @@ -243,10 +289,25 @@ public Builder executorService(ExecutorService executorService) {
return this;
}

@Override
public Builder position(Long position) {
this.position = position;
return this;
}

@Override
public FileTransformerConfiguration build() {
return new FileTransformerConfiguration(this);
}
}

}
@Override
public String toString() {
return ToString.builder("FileTransformerConfiguration")
.add("fileWriteOption", this.fileWriteOption)
.add("failureBehavior", this.failureBehavior)
.add("executorService", this.executorService)
.add("position", this.position)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.core.internal.async;

import static software.amazon.awssdk.core.FileTransformerConfiguration.FileWriteOption.CREATE_OR_APPEND_TO_EXISTING;
import static software.amazon.awssdk.core.FileTransformerConfiguration.FileWriteOption.WRITE_TO_POSITION;
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;

import java.io.IOException;
Expand All @@ -42,6 +43,7 @@
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.utils.Validate;

/**
* {@link AsyncResponseTransformer} that writes the data to the specified file.
Expand All @@ -58,19 +60,21 @@ public final class FileAsyncResponseTransformer<ResponseT> implements AsyncRespo
private final FileTransformerConfiguration configuration;

public FileAsyncResponseTransformer(Path path) {
this.path = path;
this.configuration = FileTransformerConfiguration.defaultCreateNew();
this.position = 0L;
this(path, FileTransformerConfiguration.defaultCreateNew(), 0L);
}

public FileAsyncResponseTransformer(Path path, FileTransformerConfiguration fileConfiguration) {
this(path, fileConfiguration, determineFilePositionToWrite(path, fileConfiguration));
}

private FileAsyncResponseTransformer(Path path, FileTransformerConfiguration fileTransformerConfiguration, long position) {
this.path = path;
this.configuration = fileConfiguration;
this.position = determineFilePositionToWrite(path);
this.configuration = fileTransformerConfiguration;
this.position = position;
}

private long determineFilePositionToWrite(Path path) {
if (configuration.fileWriteOption() == CREATE_OR_APPEND_TO_EXISTING) {
private static long determineFilePositionToWrite(Path path, FileTransformerConfiguration fileConfiguration) {
if (fileConfiguration.fileWriteOption() == CREATE_OR_APPEND_TO_EXISTING) {
try {
return Files.size(path);
} catch (NoSuchFileException e) {
Expand All @@ -79,6 +83,9 @@ private long determineFilePositionToWrite(Path path) {
throw SdkClientException.create("Cannot determine the current file size " + path, exception);
}
}
if (fileConfiguration.fileWriteOption() == WRITE_TO_POSITION) {
return Validate.getOrDefault(fileConfiguration.position(), () -> 0L);
}
return 0L;
}

Expand All @@ -95,6 +102,9 @@ private AsynchronousFileChannel createChannel(Path path) throws IOException {
case CREATE_NEW:
Collections.addAll(options, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
break;
case WRITE_TO_POSITION:
Collections.addAll(options, StandardOpenOption.WRITE);
break;
default:
throw new IllegalArgumentException("Unsupported file write option: " + configuration.fileWriteOption());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,33 @@
package software.amazon.awssdk.core;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static software.amazon.awssdk.core.FileTransformerConfiguration.FailureBehavior.DELETE;
import static software.amazon.awssdk.core.FileTransformerConfiguration.FileWriteOption.CREATE_NEW;

import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

class FileTransformerConfigurationTest {

@ParameterizedTest
@EnumSource(
value = FileTransformerConfiguration.FileWriteOption.class,
names = {"CREATE_NEW", "CREATE_OR_REPLACE_EXISTING", "CREATE_OR_APPEND_TO_EXISTING"})
void position_whenUsedWithNotWriteToPosition_shouldThrowIllegalArgumentException(
FileTransformerConfiguration.FileWriteOption fileWriteOption) {
FileTransformerConfiguration.Builder builder = FileTransformerConfiguration.builder()
.position(123L)
.failureBehavior(DELETE)
.fileWriteOption(fileWriteOption);
assertThatThrownBy(builder::build)
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(fileWriteOption.name());
}

@Test
void equalsHashcode() {
EqualsVerifier.forClass(FileTransformerConfiguration.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
Expand All @@ -50,6 +50,7 @@
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.FileTransformerConfiguration;
import software.amazon.awssdk.core.FileTransformerConfiguration.FileWriteOption;
import software.amazon.awssdk.core.FileTransformerConfiguration.FailureBehavior;
import software.amazon.awssdk.core.async.SdkPublisher;

/**
Expand Down Expand Up @@ -185,8 +186,11 @@ void createOrAppendExisting_fileExists_shouldAppend() throws Exception {
@MethodSource("configurations")
void exceptionOccurred_deleteFileBehavior(FileTransformerConfiguration configuration) throws Exception {
Path testPath = testFs.getPath("test_file.txt");
FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath,
configuration);
if (configuration.fileWriteOption() == FileWriteOption.WRITE_TO_POSITION) {
// file must exist for WRITE_TO_POSITION
Files.write(testPath, "foobar".getBytes(StandardCharsets.UTF_8));
}
FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath, configuration);
stubException(RandomStringUtils.random(200), transformer);
if (configuration.failureBehavior() == LEAVE) {
assertThat(testPath).exists();
Expand All @@ -196,28 +200,19 @@ void exceptionOccurred_deleteFileBehavior(FileTransformerConfiguration configura
}

private static List<FileTransformerConfiguration> configurations() {
return Arrays.asList(
FileTransformerConfiguration.defaultCreateNew(),
FileTransformerConfiguration.defaultCreateOrAppend(),
FileTransformerConfiguration.defaultCreateOrReplaceExisting(),
FileTransformerConfiguration.builder()
.fileWriteOption(FileWriteOption.CREATE_NEW)
.failureBehavior(LEAVE).build(),
FileTransformerConfiguration.builder()
.fileWriteOption(FileWriteOption.CREATE_NEW)
.failureBehavior(DELETE).build(),
FileTransformerConfiguration.builder()
.fileWriteOption(FileWriteOption.CREATE_OR_APPEND_TO_EXISTING)
.failureBehavior(DELETE).build(),
FileTransformerConfiguration.builder()
.fileWriteOption(FileWriteOption.CREATE_OR_APPEND_TO_EXISTING)
.failureBehavior(LEAVE).build(),
FileTransformerConfiguration.builder()
.fileWriteOption(FileWriteOption.CREATE_OR_REPLACE_EXISTING)
.failureBehavior(DELETE).build(),
FileTransformerConfiguration.builder()
.fileWriteOption(FileWriteOption.CREATE_OR_REPLACE_EXISTING)
.failureBehavior(LEAVE).build());
List<FileTransformerConfiguration> conf = new ArrayList<>();
conf.add(FileTransformerConfiguration.defaultCreateNew());
conf.add(FileTransformerConfiguration.defaultCreateOrAppend());
conf.add(FileTransformerConfiguration.defaultCreateOrReplaceExisting());
for (FailureBehavior failureBehavior : FailureBehavior.values()) {
for (FileWriteOption fileWriteOption : FileWriteOption.values()) {
conf.add(FileTransformerConfiguration.builder()
.fileWriteOption(fileWriteOption)
.failureBehavior(failureBehavior)
.build());
}
}
return conf;
}

@Test
Expand Down Expand Up @@ -246,6 +241,63 @@ void explicitExecutor_shouldUseExecutor() throws Exception {
}
}

@Test
void writeToPosition_fileExists_shouldAppendFromPosition() throws Exception {
int totalSize = 100;
long prefixSize = 80L;
int newContentLength = 20;

Path testPath = testFs.getPath("test_file.txt");
String contentBeforeRewrite = RandomStringUtils.randomAlphanumeric(totalSize);
byte[] existingBytes = contentBeforeRewrite.getBytes(StandardCharsets.UTF_8);
Files.write(testPath, existingBytes);
String newContent = RandomStringUtils.randomAlphanumeric(newContentLength);
FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(
testPath,
FileTransformerConfiguration.builder()
.position(prefixSize)
.failureBehavior(DELETE)
.fileWriteOption(FileWriteOption.WRITE_TO_POSITION)
.build());

stubSuccessfulStreaming(newContent, transformer);

String expectedContent = contentBeforeRewrite.substring(0, 80) + newContent;
assertThat(testPath).hasContent(expectedContent);
}

@Test
void writeToPosition_fileDoesNotExists_shouldThrowException() throws Exception {
Path path = testFs.getPath("this/file/does/not/exists");
FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(path);
transformer.prepare();
transformer.onResponse("foobar");
assertThatThrownBy(() -> transformer.onStream(testPublisher("foo-bar-content")))
.hasRootCauseInstanceOf(NoSuchFileException.class);

}

@Test
void writeToPosition_fileExists_positionNotDefined_shouldRewriteFromStart() throws Exception {
int totalSize = 100;
Path testPath = testFs.getPath("test_file.txt");
String contentBeforeRewrite = RandomStringUtils.randomAlphanumeric(totalSize);
byte[] existingBytes = contentBeforeRewrite.getBytes(StandardCharsets.UTF_8);
Files.write(testPath, existingBytes);
String newContent = RandomStringUtils.randomAlphanumeric(totalSize);
FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(
testPath,
FileTransformerConfiguration.builder()
.failureBehavior(DELETE)
.fileWriteOption(FileWriteOption.WRITE_TO_POSITION)
.build());

stubSuccessfulStreaming(newContent, transformer);

assertThat(testPath).hasContent(newContent);

}

private static void stubSuccessfulStreaming(String newContent, FileAsyncResponseTransformer<String> transformer) throws Exception {
CompletableFuture<String> future = transformer.prepare();
transformer.onResponse("foobar");
Expand Down
Loading