From 83b35b045fd523ef4313ee81b0c91cc1ca976426 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Thu, 21 Apr 2022 11:41:54 -0700 Subject: [PATCH 01/15] Add RangeReadable interface for range base FileIO reads --- .../org/apache/iceberg/io/RangeReadable.java | 60 +++++++++++++++++++ .../apache/iceberg/aws/s3/S3InputStream.java | 18 +++++- .../iceberg/aws/s3/TestS3InputStream.java | 45 ++++++++++++++ 3 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 api/src/main/java/org/apache/iceberg/io/RangeReadable.java diff --git a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java new file mode 100644 index 000000000000..c338ea673580 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +/** + * {@code RangeReadable} is an interface that allows for implementations + * of {@link InputFile} streams to perform positional, range-based reads, which + * are more efficient than unbounded reads in many cloud provider object stores. + * + * Thread safety is not a requirement of the interface and is left to the + * implementation. + * + * If the implementation is also a {@link SeekableInputStream}, the position + * of the stream is not required to be updated based on the positional reads + * performed by this interface. Usage of {@link SeekableInputStream} should + * always seek to the appropriate position for {@link java.io.InputStream} + * based reads. + * + */ +public interface RangeReadable extends AutoCloseable { + + /** + * Fill the provided buffer with the contents of the input source starting + * at {@code position} for the given {@code offset} and {@code length}. + * + * @param position start position of the read + * @param buffer target buffer to copy data + * @param offset offset in the buffer to copy the data + * @param length size of the read + */ + void readFully(long position, byte [] buffer, int offset, int length); + + /** + * Fill the entire buffer with the contents of the input source starting + * at {@code position}. + * + * @param position start position of the read + * @param buffer target buffer to copy data + */ + default void readFully(long position, byte [] buffer) { + readFully(position, buffer, 0, buffer.length); + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java index 394449aee147..3c7c5c9c74e9 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java @@ -25,6 +25,7 @@ import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.io.FileIOMetricsContext; import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.metrics.MetricsContext; import org.apache.iceberg.metrics.MetricsContext.Counter; import org.apache.iceberg.metrics.MetricsContext.Unit; @@ -33,11 +34,13 @@ import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.sync.ResponseTransformer; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; -class S3InputStream extends SeekableInputStream { +class S3InputStream extends SeekableInputStream implements RangeReadable { private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class); private final StackTraceElement[] createStack; @@ -111,6 +114,19 @@ public int read(byte[] b, int off, int len) throws IOException { return bytesRead; } + @Override + public void readFully(long position, byte[] buffer, int offset, int length) { + GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder() + .bucket(location.bucket()) + .key(location.key()) + .range(String.format("bytes=%s-%s", position, position+length)); + + S3RequestUtil.configureEncryption(awsProperties, requestBuilder); + + ResponseBytes response = s3.getObject(requestBuilder.build(), ResponseTransformer.toBytes()); + System.arraycopy(response.asByteArrayUnsafe(), 0, buffer, offset, length); + } + @Override public void close() throws IOException { super.close(); diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java index 9568ac600882..e90cbbb3d5dc 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java @@ -25,6 +25,7 @@ import java.util.Random; import org.apache.commons.io.IOUtils; import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.io.RangeReadable; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -105,6 +106,50 @@ private void readAndCheck(SeekableInputStream in, long rangeStart, int size, byt assertArrayEquals(Arrays.copyOfRange(original, (int) rangeStart, (int) rangeEnd), actual); } + @Test + public void testRangeRead() throws Exception { + S3URI uri = new S3URI("s3://bucket/path/to/range-read.dat"); + int dataSize = 1024 * 1024 * 10; + byte[] expected = randomData(dataSize); + byte[] actual = new byte[dataSize]; + + long position; + int offset; + int length; + + writeS3Data(uri, expected); + + try (RangeReadable in = new S3InputStream(s3, uri)) { + // first 1k + position = 0; + offset = 0; + length = 1024; + readAndCheckRanges(in, expected, position, actual, offset, length); + + // last 1k + position = dataSize - 1024; + offset = dataSize - 1024; + length = 1024; + readAndCheckRanges(in, expected, position, actual, offset, length); + + // middle 2k + position = dataSize/2 - 1024; + offset = dataSize/2 - 1024; + length = 1024*2; + readAndCheckRanges(in, expected, position, actual, offset, length); + } + } + + private void readAndCheckRanges( + RangeReadable in, byte [] original, long position, byte [] buffer, int offset, + int length) { + in.readFully(position, buffer, offset, length); + + assertArrayEquals( + Arrays.copyOfRange(original, offset, offset+length), + Arrays.copyOfRange(buffer, offset, offset+length)); + } + @Test public void testClose() throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/closed.dat"); From c01cfb9e869ffeb43b35f81e3518ed5d24ba3e74 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Thu, 21 Apr 2022 12:14:16 -0700 Subject: [PATCH 02/15] Checkstyle --- .../main/java/org/apache/iceberg/aws/s3/S3InputStream.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java index 3c7c5c9c74e9..c55041e5c29d 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java @@ -24,8 +24,8 @@ import java.util.Arrays; import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.io.FileIOMetricsContext; -import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.io.RangeReadable; +import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.metrics.MetricsContext; import org.apache.iceberg.metrics.MetricsContext.Counter; import org.apache.iceberg.metrics.MetricsContext.Unit; @@ -119,7 +119,7 @@ public void readFully(long position, byte[] buffer, int offset, int length) { GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder() .bucket(location.bucket()) .key(location.key()) - .range(String.format("bytes=%s-%s", position, position+length)); + .range(String.format("bytes=%s-%s", position, position + length)); S3RequestUtil.configureEncryption(awsProperties, requestBuilder); From 4ae673c4da83de4f9db3064ac218c354175ef56b Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Thu, 21 Apr 2022 12:15:41 -0700 Subject: [PATCH 03/15] Style --- api/src/main/java/org/apache/iceberg/io/RangeReadable.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java index c338ea673580..3b895460f57c 100644 --- a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java +++ b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java @@ -45,7 +45,7 @@ public interface RangeReadable extends AutoCloseable { * @param offset offset in the buffer to copy the data * @param length size of the read */ - void readFully(long position, byte [] buffer, int offset, int length); + void readFully(long position, byte[] buffer, int offset, int length); /** * Fill the entire buffer with the contents of the input source starting @@ -54,7 +54,7 @@ public interface RangeReadable extends AutoCloseable { * @param position start position of the read * @param buffer target buffer to copy data */ - default void readFully(long position, byte [] buffer) { + default void readFully(long position, byte[] buffer) { readFully(position, buffer, 0, buffer.length); } } From b05b9688eb89702ca60ebff2cbf65667a8592100 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Thu, 21 Apr 2022 12:25:22 -0700 Subject: [PATCH 04/15] Checkstyle --- .../org/apache/iceberg/aws/s3/TestS3InputStream.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java index e90cbbb3d5dc..efa29920e41c 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java @@ -24,8 +24,8 @@ import java.util.Arrays; import java.util.Random; import org.apache.commons.io.IOUtils; -import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.io.RangeReadable; +import org.apache.iceberg.io.SeekableInputStream; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -133,9 +133,9 @@ public void testRangeRead() throws Exception { readAndCheckRanges(in, expected, position, actual, offset, length); // middle 2k - position = dataSize/2 - 1024; - offset = dataSize/2 - 1024; - length = 1024*2; + position = dataSize / 2 - 1024; + offset = dataSize / 2 - 1024; + length = 1024 * 2; readAndCheckRanges(in, expected, position, actual, offset, length); } } @@ -146,8 +146,8 @@ private void readAndCheckRanges( in.readFully(position, buffer, offset, length); assertArrayEquals( - Arrays.copyOfRange(original, offset, offset+length), - Arrays.copyOfRange(buffer, offset, offset+length)); + Arrays.copyOfRange(original, offset, offset + length), + Arrays.copyOfRange(buffer, offset, offset + length)); } @Test From 19c376db1d87f0ecbfc05f4042e055d912653100 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Sat, 23 Apr 2022 12:14:55 -0700 Subject: [PATCH 05/15] Add readTail --- .../java/org/apache/iceberg/io/IOUtil.java | 43 +++++++++++++++++++ .../org/apache/iceberg/io/RangeReadable.java | 30 +++++++++++-- .../apache/iceberg/aws/s3/S3InputStream.java | 27 +++++++++--- .../iceberg/aws/s3/TestS3InputStream.java | 5 +-- 4 files changed, 93 insertions(+), 12 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/io/IOUtil.java diff --git a/api/src/main/java/org/apache/iceberg/io/IOUtil.java b/api/src/main/java/org/apache/iceberg/io/IOUtil.java new file mode 100644 index 000000000000..e3658ccc1e55 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/io/IOUtil.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.io.InputStream; + +public class IOUtil { + /** + * Reads a buffer from a stream, making multiple read calls if necessary. + * + * @param stream an InputStream to read from + * @param bytes a buffer + * @param offset starting offset in the buffer for the data + * @param length length of bytes to copy from the input stream to the buffer + * @throws IOException if there is an error while reading + */ + @SuppressWarnings("checkstyle:InnerAssignment") + public static void readFully( + InputStream stream, byte[] bytes, int offset, int length) throws IOException { + int pos = offset; + int bytesRead; + while ((length - pos) > 0 && (bytesRead = stream.read(bytes, pos, length - pos)) > 0) { + pos += bytesRead; + } + + if(pos != length) { + throw new IOException("End of stream reached before completing read"); + } + } +} diff --git a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java index 3b895460f57c..ababa7aacd1f 100644 --- a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java +++ b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java @@ -19,6 +19,9 @@ package org.apache.iceberg.io; +import java.io.Closeable; +import java.io.IOException; + /** * {@code RangeReadable} is an interface that allows for implementations * of {@link InputFile} streams to perform positional, range-based reads, which @@ -34,7 +37,7 @@ * based reads. * */ -public interface RangeReadable extends AutoCloseable { +public interface RangeReadable extends Closeable { /** * Fill the provided buffer with the contents of the input source starting @@ -45,7 +48,7 @@ public interface RangeReadable extends AutoCloseable { * @param offset offset in the buffer to copy the data * @param length size of the read */ - void readFully(long position, byte[] buffer, int offset, int length); + void readFully(long position, byte[] buffer, int offset, int length) throws IOException; /** * Fill the entire buffer with the contents of the input source starting @@ -54,7 +57,28 @@ public interface RangeReadable extends AutoCloseable { * @param position start position of the read * @param buffer target buffer to copy data */ - default void readFully(long position, byte[] buffer) { + default void readFully(long position, byte[] buffer) throws IOException { readFully(position, buffer, 0, buffer.length); } + + /** + * Read the last {@code length} bytes from the file into the provided + * buffer at the given offset. + * + * @param buffer the buffer to write data into + * @param offset the offset in the buffer to start writing + * @param length the number of bytes from the end of the object to read + * @throws IOException if an error occurs while reading + */ + void readTail(byte [] buffer, int offset, int length) throws IOException; + + /** + * Read the full size of the buffer from the end of the file. + * + * @param buffer the buffer to write data into + * @throws IOException if an error occurs while reading + */ + default void readTail(byte [] buffer) throws IOException { + readTail(buffer, 0, buffer.length); + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java index c55041e5c29d..5a9f0cc2d45e 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java @@ -24,6 +24,7 @@ import java.util.Arrays; import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.io.FileIOMetricsContext; +import org.apache.iceberg.io.IOUtil; import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.metrics.MetricsContext; @@ -34,11 +35,9 @@ import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.sync.ResponseTransformer; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; class S3InputStream extends SeekableInputStream implements RangeReadable { private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class); @@ -115,16 +114,32 @@ public int read(byte[] b, int off, int len) throws IOException { } @Override - public void readFully(long position, byte[] buffer, int offset, int length) { + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + Preconditions.checkPositionIndexes(offset, offset + length, buffer.length); + + String range = String.format("bytes=%s-%s", position, position + length - 1); + + IOUtil.readFully(readRange(range), buffer, offset, length); + } + + @Override + public void readTail(byte[] buffer, int offset, int length) throws IOException { + Preconditions.checkPositionIndexes(offset, offset + length, buffer.length); + + String range = String.format("bytes=-%s", length); + + IOUtil.readFully(readRange(range), buffer, offset, length); + } + + private InputStream readRange(String range) { GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder() .bucket(location.bucket()) .key(location.key()) - .range(String.format("bytes=%s-%s", position, position + length)); + .range(range); S3RequestUtil.configureEncryption(awsProperties, requestBuilder); - ResponseBytes response = s3.getObject(requestBuilder.build(), ResponseTransformer.toBytes()); - System.arraycopy(response.asByteArrayUnsafe(), 0, buffer, offset, length); + return s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream()); } @Override diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java index efa29920e41c..17770339be0d 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java @@ -129,7 +129,6 @@ public void testRangeRead() throws Exception { // last 1k position = dataSize - 1024; offset = dataSize - 1024; - length = 1024; readAndCheckRanges(in, expected, position, actual, offset, length); // middle 2k @@ -140,9 +139,9 @@ public void testRangeRead() throws Exception { } } - private void readAndCheckRanges( + private void readAndCheckRanges ( RangeReadable in, byte [] original, long position, byte [] buffer, int offset, - int length) { + int length) throws IOException { in.readFully(position, buffer, offset, length); assertArrayEquals( From 895a77ad635e131a05b1613b5d982a47db85a3d7 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Sat, 23 Apr 2022 12:23:06 -0700 Subject: [PATCH 06/15] Checkstyle --- .../java/org/apache/iceberg/io/IOUtil.java | 29 ++++++++++++------- .../org/apache/iceberg/io/RangeReadable.java | 2 +- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/io/IOUtil.java b/api/src/main/java/org/apache/iceberg/io/IOUtil.java index e3658ccc1e55..3b4c6c041156 100644 --- a/api/src/main/java/org/apache/iceberg/io/IOUtil.java +++ b/api/src/main/java/org/apache/iceberg/io/IOUtil.java @@ -1,15 +1,20 @@ /* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.iceberg.io; @@ -18,6 +23,10 @@ import java.io.InputStream; public class IOUtil { + // not meant to be instantiated + private IOUtil() { + } + /** * Reads a buffer from a stream, making multiple read calls if necessary. * @@ -36,7 +45,7 @@ public static void readFully( pos += bytesRead; } - if(pos != length) { + if (pos != length) { throw new IOException("End of stream reached before completing read"); } } diff --git a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java index ababa7aacd1f..bd3253da620a 100644 --- a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java +++ b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java @@ -74,7 +74,7 @@ default void readFully(long position, byte[] buffer) throws IOException { /** * Read the full size of the buffer from the end of the file. - * + * * @param buffer the buffer to write data into * @throws IOException if an error occurs while reading */ From 298c17e340627910927f0a1c1b44e74f1ea14e5c Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Sat, 23 Apr 2022 12:26:32 -0700 Subject: [PATCH 07/15] Relocate IOUtil to core --- {api => core}/src/main/java/org/apache/iceberg/io/IOUtil.java | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {api => core}/src/main/java/org/apache/iceberg/io/IOUtil.java (100%) diff --git a/api/src/main/java/org/apache/iceberg/io/IOUtil.java b/core/src/main/java/org/apache/iceberg/io/IOUtil.java similarity index 100% rename from api/src/main/java/org/apache/iceberg/io/IOUtil.java rename to core/src/main/java/org/apache/iceberg/io/IOUtil.java From 961e7898b857233feac24b297d125f037c4b9023 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Sat, 23 Apr 2022 13:05:12 -0700 Subject: [PATCH 08/15] Fix readFully --- core/src/main/java/org/apache/iceberg/io/IOUtil.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/IOUtil.java b/core/src/main/java/org/apache/iceberg/io/IOUtil.java index 3b4c6c041156..0c2df71871a4 100644 --- a/core/src/main/java/org/apache/iceberg/io/IOUtil.java +++ b/core/src/main/java/org/apache/iceberg/io/IOUtil.java @@ -37,15 +37,15 @@ private IOUtil() { * @throws IOException if there is an error while reading */ @SuppressWarnings("checkstyle:InnerAssignment") - public static void readFully( - InputStream stream, byte[] bytes, int offset, int length) throws IOException { + public static void readFully(InputStream stream, byte[] bytes, int offset, int length) throws IOException { int pos = offset; - int bytesRead; - while ((length - pos) > 0 && (bytesRead = stream.read(bytes, pos, length - pos)) > 0) { + int bytesRead = 0; + + while ((length - bytesRead) > 0 && (bytesRead = stream.read(bytes, pos, length - bytesRead)) > 0) { pos += bytesRead; } - if (pos != length) { + if (bytesRead != length) { throw new IOException("End of stream reached before completing read"); } } From 1f92f2264b7ce7b5d05552417e3c771813ba0f05 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Sat, 23 Apr 2022 13:19:54 -0700 Subject: [PATCH 09/15] Update readTail to return actual size read before EOF --- .../org/apache/iceberg/io/RangeReadable.java | 11 +++++----- .../apache/iceberg/aws/s3/S3InputStream.java | 4 ++-- .../java/org/apache/iceberg/io/IOUtil.java | 21 ++++++++++++++++--- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java index bd3253da620a..fafdd7b02037 100644 --- a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java +++ b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java @@ -62,23 +62,24 @@ default void readFully(long position, byte[] buffer) throws IOException { } /** - * Read the last {@code length} bytes from the file into the provided - * buffer at the given offset. + * Read the last {@code length} bytes from the file. * * @param buffer the buffer to write data into * @param offset the offset in the buffer to start writing * @param length the number of bytes from the end of the object to read + * @return the actual number of bytes read * @throws IOException if an error occurs while reading */ - void readTail(byte [] buffer, int offset, int length) throws IOException; + int readTail(byte [] buffer, int offset, int length) throws IOException; /** * Read the full size of the buffer from the end of the file. * * @param buffer the buffer to write data into + * @return the actual number of bytes read * @throws IOException if an error occurs while reading */ - default void readTail(byte [] buffer) throws IOException { - readTail(buffer, 0, buffer.length); + default int readTail(byte [] buffer) throws IOException { + return readTail(buffer, 0, buffer.length); } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java index 5a9f0cc2d45e..b62b49f34676 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java @@ -123,12 +123,12 @@ public void readFully(long position, byte[] buffer, int offset, int length) thro } @Override - public void readTail(byte[] buffer, int offset, int length) throws IOException { + public int readTail(byte[] buffer, int offset, int length) throws IOException { Preconditions.checkPositionIndexes(offset, offset + length, buffer.length); String range = String.format("bytes=-%s", length); - IOUtil.readFully(readRange(range), buffer, offset, length); + return IOUtil.readToEnd(readRange(range), buffer, offset, length); } private InputStream readRange(String range) { diff --git a/core/src/main/java/org/apache/iceberg/io/IOUtil.java b/core/src/main/java/org/apache/iceberg/io/IOUtil.java index 0c2df71871a4..cfda22771259 100644 --- a/core/src/main/java/org/apache/iceberg/io/IOUtil.java +++ b/core/src/main/java/org/apache/iceberg/io/IOUtil.java @@ -38,6 +38,23 @@ private IOUtil() { */ @SuppressWarnings("checkstyle:InnerAssignment") public static void readFully(InputStream stream, byte[] bytes, int offset, int length) throws IOException { + if (readToEnd(stream, bytes, offset, length) != length) { + throw new IOException("End of stream reached before completing read"); + } + } + + /** + * Reads a buffer from a stream, making multiple read calls if necessary + * returning the number of bytes read until end of stream. + * + * @param stream an InputStream to read from + * @param bytes a buffer + * @param offset starting offset in the buffer for the data + * @param length length of bytes to copy from the input stream to the buffer + * @throws IOException if there is an error while reading + */ + @SuppressWarnings("checkstyle:InnerAssignment") + public static int readToEnd(InputStream stream, byte[] bytes, int offset, int length) throws IOException { int pos = offset; int bytesRead = 0; @@ -45,8 +62,6 @@ public static void readFully(InputStream stream, byte[] bytes, int offset, int l pos += bytesRead; } - if (bytesRead != length) { - throw new IOException("End of stream reached before completing read"); - } + return bytesRead; } } From 36032b1eff6651ddeef36c7465f19c38d540a843 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Sat, 23 Apr 2022 13:21:35 -0700 Subject: [PATCH 10/15] Cleanup comments --- core/src/main/java/org/apache/iceberg/io/IOUtil.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/IOUtil.java b/core/src/main/java/org/apache/iceberg/io/IOUtil.java index cfda22771259..d60f6983cdd9 100644 --- a/core/src/main/java/org/apache/iceberg/io/IOUtil.java +++ b/core/src/main/java/org/apache/iceberg/io/IOUtil.java @@ -28,10 +28,10 @@ private IOUtil() { } /** - * Reads a buffer from a stream, making multiple read calls if necessary. + * Reads into a buffer from a stream, making multiple read calls if necessary. * * @param stream an InputStream to read from - * @param bytes a buffer + * @param bytes a buffer to write into * @param offset starting offset in the buffer for the data * @param length length of bytes to copy from the input stream to the buffer * @throws IOException if there is an error while reading @@ -44,11 +44,11 @@ public static void readFully(InputStream stream, byte[] bytes, int offset, int l } /** - * Reads a buffer from a stream, making multiple read calls if necessary + * Reads into a buffer from a stream, making multiple read calls if necessary * returning the number of bytes read until end of stream. * * @param stream an InputStream to read from - * @param bytes a buffer + * @param bytes a buffer to write into * @param offset starting offset in the buffer for the data * @param length length of bytes to copy from the input stream to the buffer * @throws IOException if there is an error while reading From ec6c81d9353e7d2356d662f670337320588550b5 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Sat, 23 Apr 2022 18:06:17 -0700 Subject: [PATCH 11/15] Checkstyle --- .../test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java index 17770339be0d..417ba16f71ad 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java @@ -139,7 +139,7 @@ public void testRangeRead() throws Exception { } } - private void readAndCheckRanges ( + private void readAndCheckRanges( RangeReadable in, byte [] original, long position, byte [] buffer, int offset, int length) throws IOException { in.readFully(position, buffer, offset, length); From 859602081169c9ef6acd23e52ad3179958e6bcee Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sun, 24 Apr 2022 09:37:45 -0700 Subject: [PATCH 12/15] Replace readFully implementation with one from Parquet. --- .../apache/iceberg/aws/s3/S3InputStream.java | 2 +- .../java/org/apache/iceberg/io/IOUtil.java | 24 ++-- .../apache/iceberg/io/MockInputStream.java | 57 ++++++++ .../org/apache/iceberg/io/TestIOUtil.java | 122 ++++++++++++++++++ 4 files changed, 196 insertions(+), 9 deletions(-) create mode 100644 core/src/test/java/org/apache/iceberg/io/MockInputStream.java create mode 100644 core/src/test/java/org/apache/iceberg/io/TestIOUtil.java diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java index b62b49f34676..92d0f3f3eae9 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java @@ -128,7 +128,7 @@ public int readTail(byte[] buffer, int offset, int length) throws IOException { String range = String.format("bytes=-%s", length); - return IOUtil.readToEnd(readRange(range), buffer, offset, length); + return IOUtil.readRemaining(readRange(range), buffer, offset, length); } private InputStream readRange(String range) { diff --git a/core/src/main/java/org/apache/iceberg/io/IOUtil.java b/core/src/main/java/org/apache/iceberg/io/IOUtil.java index d60f6983cdd9..0daf4a019358 100644 --- a/core/src/main/java/org/apache/iceberg/io/IOUtil.java +++ b/core/src/main/java/org/apache/iceberg/io/IOUtil.java @@ -19,6 +19,7 @@ package org.apache.iceberg.io; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -34,12 +35,14 @@ private IOUtil() { * @param bytes a buffer to write into * @param offset starting offset in the buffer for the data * @param length length of bytes to copy from the input stream to the buffer + * @throws EOFException if the end of the stream is reached before reading length bytes * @throws IOException if there is an error while reading */ - @SuppressWarnings("checkstyle:InnerAssignment") public static void readFully(InputStream stream, byte[] bytes, int offset, int length) throws IOException { - if (readToEnd(stream, bytes, offset, length) != length) { - throw new IOException("End of stream reached before completing read"); + int bytesRead = readRemaining(stream, bytes, offset, length); + if (bytesRead < length) { + throw new EOFException( + "Reached the end of stream with " + (length - bytesRead) + " bytes left to read"); } } @@ -51,17 +54,22 @@ public static void readFully(InputStream stream, byte[] bytes, int offset, int l * @param bytes a buffer to write into * @param offset starting offset in the buffer for the data * @param length length of bytes to copy from the input stream to the buffer + * @return the number of bytes read * @throws IOException if there is an error while reading */ - @SuppressWarnings("checkstyle:InnerAssignment") - public static int readToEnd(InputStream stream, byte[] bytes, int offset, int length) throws IOException { + public static int readRemaining(InputStream stream, byte[] bytes, int offset, int length) throws IOException { int pos = offset; - int bytesRead = 0; + int remaining = length; + while (remaining > 0) { + int bytesRead = stream.read(bytes, pos, remaining); + if (bytesRead < 0) { + break; + } - while ((length - bytesRead) > 0 && (bytesRead = stream.read(bytes, pos, length - bytesRead)) > 0) { + remaining -= bytesRead; pos += bytesRead; } - return bytesRead; + return length - remaining; } } diff --git a/core/src/test/java/org/apache/iceberg/io/MockInputStream.java b/core/src/test/java/org/apache/iceberg/io/MockInputStream.java new file mode 100644 index 000000000000..03bdc2d97a06 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/MockInputStream.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.ByteArrayInputStream; + +class MockInputStream extends ByteArrayInputStream { + + static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + + private int[] lengths; + private int current = 0; + MockInputStream(int... actualReadLengths) { + super(TEST_ARRAY); + this.lengths = actualReadLengths; + } + + @Override + public synchronized int read(byte[] b, int off, int len) { + if (current < lengths.length) { + if (len <= lengths[current]) { + // when len == lengths[current], the next read will by 0 bytes + int bytesRead = super.read(b, off, len); + lengths[current] -= bytesRead; + return bytesRead; + } else { + int bytesRead = super.read(b, off, lengths[current]); + current += 1; + return bytesRead; + } + } else { + return super.read(b, off, len); + } + } + + public long getPos() { + return this.pos; + } +} + diff --git a/core/src/test/java/org/apache/iceberg/io/TestIOUtil.java b/core/src/test/java/org/apache/iceberg/io/TestIOUtil.java new file mode 100644 index 000000000000..dae92785fc65 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/TestIOUtil.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; +import org.apache.iceberg.AssertHelpers; +import org.junit.Assert; +import org.junit.Test; + +public class TestIOUtil { + @Test + public void testReadFully() throws Exception { + byte[] buffer = new byte[5]; + + MockInputStream stream = new MockInputStream(); + IOUtil.readFully(stream, buffer, 0, buffer.length); + + Assert.assertArrayEquals("Byte array contents should match", + Arrays.copyOfRange(MockInputStream.TEST_ARRAY, 0, 5), buffer); + Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos()); + } + + @Test + public void testReadFullySmallReads() throws Exception { + byte[] buffer = new byte[5]; + + MockInputStream stream = new MockInputStream(2, 3, 3); + IOUtil.readFully(stream, buffer, 0, buffer.length); + + Assert.assertArrayEquals("Byte array contents should match", + Arrays.copyOfRange(MockInputStream.TEST_ARRAY, 0, 5), buffer); + Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos()); + } + + @Test + public void testReadFullyJustRight() throws Exception { + final byte[] buffer = new byte[10]; + + final MockInputStream stream = new MockInputStream(2, 3, 3); + IOUtil.readFully(stream, buffer, 0, buffer.length); + + Assert.assertArrayEquals("Byte array contents should match", MockInputStream.TEST_ARRAY, buffer); + Assert.assertEquals("Stream position should reflect bytes read", 10, stream.getPos()); + + AssertHelpers.assertThrows("Should throw EOFException if no more bytes left", + EOFException.class, () -> { + IOUtil.readFully(stream, buffer, 0, 1); + return null; + }); + } + + @Test + public void testReadFullyUnderflow() { + final byte[] buffer = new byte[11]; + + final MockInputStream stream = new MockInputStream(2, 3, 3); + + AssertHelpers.assertThrows("Should throw EOFException if no more bytes left", + EOFException.class, () -> { + IOUtil.readFully(stream, buffer, 0, buffer.length); + return null; + }); + + Assert.assertArrayEquals("Should have consumed bytes", + MockInputStream.TEST_ARRAY, Arrays.copyOfRange(buffer, 0, 10)); + Assert.assertEquals("Stream position should reflect bytes read", 10, stream.getPos()); + } + + @Test + public void testReadFullyStartAndLength() throws IOException { + byte[] buffer = new byte[10]; + + MockInputStream stream = new MockInputStream(); + IOUtil.readFully(stream, buffer, 2, 5); + + Assert.assertArrayEquals("Byte array contents should match", + Arrays.copyOfRange(MockInputStream.TEST_ARRAY, 0, 5), Arrays.copyOfRange(buffer, 2, 7)); + Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos()); + } + + @Test + public void testReadFullyZeroByteRead() throws IOException { + byte[] buffer = new byte[0]; + + MockInputStream stream = new MockInputStream(); + IOUtil.readFully(stream, buffer, 0, buffer.length); + + Assert.assertEquals("Stream position should reflect bytes read", 0, stream.getPos()); + } + + @Test + public void testReadFullySmallReadsWithStartAndLength() throws IOException { + byte[] buffer = new byte[10]; + + MockInputStream stream = new MockInputStream(2, 2, 3); + IOUtil.readFully(stream, buffer, 2, 5); + + Assert.assertArrayEquals("Byte array contents should match", + Arrays.copyOfRange(MockInputStream.TEST_ARRAY, 0, 5), Arrays.copyOfRange(buffer, 2, 7)); + Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos()); + } + +} From a532db33067c7b6833ee954155225a9179a95024 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sun, 24 Apr 2022 09:50:25 -0700 Subject: [PATCH 13/15] Update LICENSE. --- LICENSE | 1 + 1 file changed, 1 insertion(+) diff --git a/LICENSE b/LICENSE index 82a12b78272b..064892499911 100644 --- a/LICENSE +++ b/LICENSE @@ -228,6 +228,7 @@ This product includes code from Apache Parquet. * DynMethods.java * DynConstructors.java * AssertHelpers.java +* IOUtil.java readFully and tests Copyright: 2014-2017 The Apache Software Foundation. Home page: https://parquet.apache.org/ From 3960e5b4eaf01c7d3523aa62f8810c36bf09b680 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Sun, 24 Apr 2022 14:40:14 -0700 Subject: [PATCH 14/15] Checkstyle --- core/src/test/java/org/apache/iceberg/io/MockInputStream.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/java/org/apache/iceberg/io/MockInputStream.java b/core/src/test/java/org/apache/iceberg/io/MockInputStream.java index 03bdc2d97a06..d022728cb73f 100644 --- a/core/src/test/java/org/apache/iceberg/io/MockInputStream.java +++ b/core/src/test/java/org/apache/iceberg/io/MockInputStream.java @@ -27,6 +27,7 @@ class MockInputStream extends ByteArrayInputStream { private int[] lengths; private int current = 0; + MockInputStream(int... actualReadLengths) { super(TEST_ARRAY); this.lengths = actualReadLengths; From 51c292ee45e8a570252d1124615c093a59fb2478 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sun, 24 Apr 2022 15:08:35 -0700 Subject: [PATCH 15/15] Remove trailing whitespace. --- core/src/test/java/org/apache/iceberg/io/MockInputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/iceberg/io/MockInputStream.java b/core/src/test/java/org/apache/iceberg/io/MockInputStream.java index d022728cb73f..7cc251ebd641 100644 --- a/core/src/test/java/org/apache/iceberg/io/MockInputStream.java +++ b/core/src/test/java/org/apache/iceberg/io/MockInputStream.java @@ -27,7 +27,7 @@ class MockInputStream extends ByteArrayInputStream { private int[] lengths; private int current = 0; - + MockInputStream(int... actualReadLengths) { super(TEST_ARRAY); this.lengths = actualReadLengths;