Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
import java.io.IOException;
import java.io.InputStream;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.utils.io.SdkLengthAwareInputStream;
import software.amazon.awssdk.utils.io.LengthAwareInputStream;

/**
* A wrapped stream to represent a "chunk" of data
*/
@SdkInternalApi
public final class ChunkInputStream extends SdkLengthAwareInputStream {
public final class ChunkInputStream extends LengthAwareInputStream {

public ChunkInputStream(InputStream inputStream, long length) {
super(inputStream, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import software.amazon.awssdk.core.internal.util.NoopSubscription;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.async.InputStreamConsumingPublisher;
import software.amazon.awssdk.utils.io.SdkLengthAwareInputStream;
import software.amazon.awssdk.utils.io.LengthAwareInputStream;

/**
* An implementation of {@link AsyncRequestBody} that allows performing a blocking write of an input stream to a downstream
Expand Down Expand Up @@ -89,7 +89,7 @@ public long writeInputStream(InputStream inputStream) {
try {
waitForSubscriptionIfNeeded();
if (contentLength != null) {
return delegate.doBlockingWrite(new SdkLengthAwareInputStream(inputStream, contentLength));
return delegate.doBlockingWrite(new LengthAwareInputStream(inputStream, contentLength));
}

return delegate.doBlockingWrite(inputStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import software.amazon.awssdk.metrics.MetricCollector;
import software.amazon.awssdk.utils.Pair;
import software.amazon.awssdk.utils.StringUtils;
import software.amazon.awssdk.utils.io.SdkLengthAwareInputStream;
import software.amazon.awssdk.utils.io.LengthAwareInputStream;

@SdkInternalApi
public abstract class BaseClientHandler {
Expand Down Expand Up @@ -136,7 +136,7 @@ private static RequestBody getBody(SdkHttpFullRequest request) {
ContentStreamProvider streamProvider = contentStreamProviderOptional.get();
if (contentLengthOptional.isPresent()) {
ContentStreamProvider toWrap = contentStreamProviderOptional.get();
streamProvider = () -> new SdkLengthAwareInputStream(toWrap.newStream(), contentLength);
streamProvider = () -> new LengthAwareInputStream(toWrap.newStream(), contentLength);
}

return new SdkInternalOnlyRequestBody(streamProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import software.amazon.awssdk.metrics.MetricCollector;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Pair;
import software.amazon.awssdk.utils.io.SdkLengthAwareInputStream;
import software.amazon.awssdk.utils.io.LengthAwareInputStream;

/**
* Delegate to the HTTP implementation to make an HTTP request and receive the response.
Expand Down Expand Up @@ -119,8 +119,8 @@ private static SdkHttpFullRequest enforceContentLengthIfPresent(SdkHttpFullReque
}

ContentStreamProvider requestContentProvider = requestContentStreamProviderOptional.get();
ContentStreamProvider lengthVerifyingProvider = () -> new SdkLengthAwareInputStream(requestContentProvider.newStream(),
contentLength.get());
ContentStreamProvider lengthVerifyingProvider = () -> new LengthAwareInputStream(requestContentProvider.newStream(),
contentLength.get());
return request.toBuilder()
.contentStreamProvider(lengthVerifyingProvider)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
import software.amazon.awssdk.core.internal.http.timers.TimeoutTracker;
import software.amazon.awssdk.utils.io.SdkLengthAwareInputStream;
import software.amazon.awssdk.utils.io.LengthAwareInputStream;
import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.SdkHttpClient;
Expand Down Expand Up @@ -135,9 +135,9 @@ public void execute_testLengthChecking(String description,
InputStream requestContentStream = capturedRequest.contentStreamProvider().get().newStream();

if (expectLengthAware) {
assertThat(requestContentStream).isInstanceOf(SdkLengthAwareInputStream.class);
assertThat(requestContentStream).isInstanceOf(LengthAwareInputStream.class);
} else {
assertThat(requestContentStream).isNotInstanceOf(SdkLengthAwareInputStream.class);
assertThat(requestContentStream).isNotInstanceOf(LengthAwareInputStream.class);
}
} else {
assertThat(capturedRequest.contentStreamProvider()).isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,21 @@
* has less bytes (i.e. reaches EOF) before the expected length is reached, it will throw {@code IOException}.
*/
@SdkProtectedApi
public class SdkLengthAwareInputStream extends FilterInputStream {
private static final Logger LOG = Logger.loggerFor(SdkLengthAwareInputStream.class);
public class LengthAwareInputStream extends FilterInputStream {
private static final Logger LOG = Logger.loggerFor(LengthAwareInputStream.class);
private final long length;
private long remaining;
private long markedRemaining;

public SdkLengthAwareInputStream(InputStream in, long length) {
public LengthAwareInputStream(InputStream in, long length) {
super(in);
this.length = Validate.isNotNegative(length, "length");
this.remaining = this.length;
this.markedRemaining = this.remaining;
}

@Override
public int read() throws IOException {
public final int read() throws IOException {
if (!hasMoreBytes()) {
LOG.debug(() -> String.format("Specified InputStream length of %d has been reached. Returning EOF.", length));
return -1;
Expand All @@ -66,7 +66,7 @@ public int read() throws IOException {
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
public final int read(byte[] b, int off, int len) throws IOException {
if (!hasMoreBytes()) {
LOG.debug(() -> String.format("Specified InputStream length of %d has been reached. Returning EOF.", length));
return -1;
Expand All @@ -90,33 +90,33 @@ public int read(byte[] b, int off, int len) throws IOException {
}

@Override
public long skip(long requestedBytesToSkip) throws IOException {
public final long skip(long requestedBytesToSkip) throws IOException {
requestedBytesToSkip = Math.min(requestedBytesToSkip, remaining);
long skippedActual = super.skip(requestedBytesToSkip);
remaining -= skippedActual;
return skippedActual;
}

@Override
public int available() throws IOException {
public final int available() throws IOException {
int streamAvailable = super.available();
return Math.min(streamAvailable, saturatedCast(remaining));
}

@Override
public void mark(int readlimit) {
public final void mark(int readlimit) {
super.mark(readlimit);
// Store the current remaining bytes to restore on reset()
markedRemaining = remaining;
}

@Override
public void reset() throws IOException {
public final void reset() throws IOException {
super.reset();
remaining = markedRemaining;
}

public long remaining() {
public final long remaining() {
return remaining;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class SdkLengthAwareInputStreamTest {
class LengthAwareInputStreamTest {
private InputStream delegateStream;

@BeforeEach
Expand All @@ -40,7 +40,7 @@ void setup() {
void read_lengthIs0_returnsEof() throws IOException {
when(delegateStream.available()).thenReturn(Integer.MAX_VALUE);

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegateStream, 0);
LengthAwareInputStream is = new LengthAwareInputStream(delegateStream, 0);

assertThat(is.read()).isEqualTo(-1);
assertThat(is.read(new byte[16], 0, 16)).isEqualTo(-1);
Expand All @@ -51,7 +51,7 @@ void read_lengthNonZero_delegateEof_returnsEof() throws IOException {
when(delegateStream.read()).thenReturn(-1);
when(delegateStream.read(any(byte[].class), any(int.class), any(int.class))).thenReturn(-1);

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegateStream, 0);
LengthAwareInputStream is = new LengthAwareInputStream(delegateStream, 0);

assertThat(is.read()).isEqualTo(-1);
assertThat(is.read(new byte[16], 0, 16)).isEqualTo(-1);
Expand All @@ -61,7 +61,7 @@ void read_lengthNonZero_delegateEof_returnsEof() throws IOException {
void readByte_lengthNonZero_delegateHasAvailable_returnsDelegateData() throws IOException {
when(delegateStream.read()).thenReturn(42);

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegateStream, 16);
LengthAwareInputStream is = new LengthAwareInputStream(delegateStream, 16);

assertThat(is.read()).isEqualTo(42);
}
Expand All @@ -70,7 +70,7 @@ void readByte_lengthNonZero_delegateHasAvailable_returnsDelegateData() throws IO
void readArray_lengthNonZero_delegateHasAvailable_returnsDelegateData() throws IOException {
when(delegateStream.read(any(byte[].class), any(int.class), any(int.class))).thenReturn(8);

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegateStream, 16);
LengthAwareInputStream is = new LengthAwareInputStream(delegateStream, 16);

assertThat(is.read(new byte[16], 0, 16)).isEqualTo(8);
}
Expand All @@ -79,7 +79,7 @@ void readArray_lengthNonZero_delegateHasAvailable_returnsDelegateData() throws I
void readArray_lengthNonZero_propagatesCallToDelegate() throws IOException {
when(delegateStream.read(any(byte[].class), any(int.class), any(int.class))).thenReturn(8);

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegateStream, 16);
LengthAwareInputStream is = new LengthAwareInputStream(delegateStream, 16);
byte[] buff = new byte[16];
is.read(buff, 0, 16);

Expand All @@ -90,7 +90,7 @@ void readArray_lengthNonZero_propagatesCallToDelegate() throws IOException {
void read_markAndReset_availableReflectsNewLength() throws IOException {
delegateStream = new ByteArrayInputStream(new byte[32]);

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegateStream, 16);
LengthAwareInputStream is = new LengthAwareInputStream(delegateStream, 16);

for (int i = 0; i < 4; ++i) {
is.read();
Expand All @@ -113,7 +113,7 @@ void read_markAndReset_availableReflectsNewLength() throws IOException {
void skip_markAndReset_availableReflectsNewLength() throws IOException {
delegateStream = new ByteArrayInputStream(new byte[32]);

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegateStream, 16);
LengthAwareInputStream is = new LengthAwareInputStream(delegateStream, 16);

is.skip(4);

Expand Down Expand Up @@ -141,7 +141,7 @@ void skip_delegateSkipsLessThanRequested_availableUpdatedCorrectly() throws IOEx

when(delegateStream.read(any(byte[].class), any(int.class), any(int.class))).thenReturn(1);

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegateStream, 16);
LengthAwareInputStream is = new LengthAwareInputStream(delegateStream, 16);

long skipped = is.skip(4);

Expand All @@ -156,7 +156,7 @@ void readArray_delegateReadsLessThanRequested_availableUpdatedCorrectly() throws
return n / 2;
});

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegateStream, 16);
LengthAwareInputStream is = new LengthAwareInputStream(delegateStream, 16);

long read = is.read(new byte[16], 0, 8);

Expand All @@ -169,7 +169,7 @@ void readArray_delegateShorterThanExpected_throws() {
int delegateLength = 16;
ByteArrayInputStream delegate = new ByteArrayInputStream(new byte[delegateLength]);

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegate, delegateLength + 1);
LengthAwareInputStream is = new LengthAwareInputStream(delegate, delegateLength + 1);

assertThatThrownBy(() -> {
int read;
Expand All @@ -186,7 +186,7 @@ void readArray_readExactLength_doesNotThrow() throws IOException {
int delegateLength = 16;
ByteArrayInputStream delegate = new ByteArrayInputStream(new byte[delegateLength]);

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegate, delegateLength);
LengthAwareInputStream is = new LengthAwareInputStream(delegate, delegateLength);

int total = 0;
int read;
Expand All @@ -204,7 +204,7 @@ void readArray_delegateLongerThanRequired_truncated() throws IOException {
int length = 16;
ByteArrayInputStream delegate = new ByteArrayInputStream(new byte[delegateLength]);

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegate, length);
LengthAwareInputStream is = new LengthAwareInputStream(delegate, length);

int total = 0;
int read;
Expand All @@ -221,7 +221,7 @@ void readByte_delegateShorterThanExpected_throws() {
int delegateLength = 16;
ByteArrayInputStream delegate = new ByteArrayInputStream(new byte[delegateLength]);

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegate, delegateLength + 1);
LengthAwareInputStream is = new LengthAwareInputStream(delegate, delegateLength + 1);

assertThatThrownBy(() -> {
int read;
Expand All @@ -237,7 +237,7 @@ void readByte_readExactLength_doesNotThrow() throws IOException {
int delegateLength = 16;
ByteArrayInputStream delegate = new ByteArrayInputStream(new byte[delegateLength]);

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegate, delegateLength);
LengthAwareInputStream is = new LengthAwareInputStream(delegate, delegateLength);

int total = 0;
while (total != delegateLength && is.read() != -1) {
Expand All @@ -253,7 +253,7 @@ void readBytePartialThenMark_doesNotResetContentLength() throws IOException {
int expectedContentLength = delegateLength + 1;
ByteArrayInputStream delegate = new ByteArrayInputStream(new byte[delegateLength]);

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegate, expectedContentLength);
LengthAwareInputStream is = new LengthAwareInputStream(delegate, expectedContentLength);
is.read(); // read one byte
is.mark(1024);
// read another byte and reset, the length should not be reset based on the byte that was already read
Expand All @@ -274,7 +274,7 @@ void readByte_delegateLongerThanRequired_truncated() throws IOException {
int length = 16;
ByteArrayInputStream delegate = new ByteArrayInputStream(new byte[delegateLength]);

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegate, length);
LengthAwareInputStream is = new LengthAwareInputStream(delegate, length);

int total = 0;
while (total != delegateLength && is.read() != -1) {
Expand All @@ -290,7 +290,7 @@ public void skip_thenReadByteUntilEof_doesNotThrowLengthMismatch() throws IOExce

ByteArrayInputStream delegate = new ByteArrayInputStream(new byte[delegateLength]);

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegate, delegateLength);
LengthAwareInputStream is = new LengthAwareInputStream(delegate, delegateLength);

int bytesToSkip = 8;
int skippedBytes = 0;
Expand All @@ -315,7 +315,7 @@ public void skip_thenReadArrayUntilEof_doesNotThrowLengthMismatch() throws IOExc

ByteArrayInputStream delegate = new ByteArrayInputStream(new byte[delegateLength]);

SdkLengthAwareInputStream is = new SdkLengthAwareInputStream(delegate, delegateLength);
LengthAwareInputStream is = new LengthAwareInputStream(delegate, delegateLength);

int bytesToSkip = 8;
int skippedBytes = 0;
Expand Down