diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 71c117ab36ad..768f2bd08de6 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -260,6 +260,10 @@ private TableProperties() {} public static final String PARQUET_BATCH_SIZE = "read.parquet.vectorization.batch-size"; public static final int PARQUET_BATCH_SIZE_DEFAULT = 5000; + public static final String READ_SINGLE_FETCH_THRESHOLD_BYTES = + "read.single-fetch-threshold-bytes"; + public static final long READ_SINGLE_FETCH_THRESHOLD_BYTES_DEFAULT = 0L; + public static final String ORC_VECTORIZATION_ENABLED = "read.orc.vectorization.enabled"; public static final boolean ORC_VECTORIZATION_ENABLED_DEFAULT = false; diff --git a/core/src/main/java/org/apache/iceberg/io/SingleFetchInputFile.java b/core/src/main/java/org/apache/iceberg/io/SingleFetchInputFile.java new file mode 100644 index 000000000000..feba58dcc32c --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/SingleFetchInputFile.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.io.IOException; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * A decorator that collapses multiple object-store requests into one by fetching the entire file on + * the first call when the file size is at or below the configured threshold. + */ +public class SingleFetchInputFile implements InputFile { + + private final InputFile delegate; + private final long fileSize; + private final long threshold; + + public SingleFetchInputFile(InputFile delegate, long fileSize, long threshold) { + Preconditions.checkNotNull(delegate, "delegate is null"); + Preconditions.checkArgument(fileSize >= 0, "fileSize is negative: %s", fileSize); + this.delegate = delegate; + this.fileSize = fileSize; + this.threshold = threshold; + } + + @Override + public long getLength() { + return fileSize; + } + + @Override + public String location() { + return delegate.location(); + } + + @Override + public boolean exists() { + return delegate.exists(); + } + + @Override + public SeekableInputStream newStream() { + if (threshold <= 0 || fileSize > threshold || fileSize > Integer.MAX_VALUE) { + return delegate.newStream(); + } + + byte[] bytes = new byte[(int) fileSize]; + try (SeekableInputStream src = delegate.newStream()) { + IOUtil.readFully(src, bytes, 0, bytes.length); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to fetch file: %s", delegate.location()); + } + return new SingleFetchInputStream(bytes); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/SingleFetchInputStream.java b/core/src/main/java/org/apache/iceberg/io/SingleFetchInputStream.java new file mode 100644 index 000000000000..8a67a5fc9526 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/SingleFetchInputStream.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.io.EOFException; +import java.io.IOException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** A seekable, range-readable stream backed by a byte array fetched in a single remote call. */ +class SingleFetchInputStream extends SeekableInputStream implements RangeReadable { + + private final byte[] contents; + private int position; + private boolean closed; + + SingleFetchInputStream(byte[] contents) { + Preconditions.checkNotNull(contents, "contents is null"); + this.contents = contents; + } + + @Override + public long getPos() throws IOException { + checkOpen(); + return position; + } + + @Override + public void seek(long newPos) throws IOException { + checkOpen(); + Preconditions.checkArgument(newPos >= 0, "position is negative: %s", newPos); + if (newPos > contents.length) { + throw new EOFException( + "Cannot seek to position " + newPos + ": exceeds stream length " + contents.length); + } + position = (int) newPos; + } + + @Override + public int read() throws IOException { + checkOpen(); + if (position >= contents.length) { + return -1; + } + return contents[position++] & 0xFF; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + checkOpen(); + Preconditions.checkPositionIndexes(off, off + len, b.length); + if (len == 0) { + return 0; + } + if (position >= contents.length) { + return -1; + } + int bytesToRead = Math.min(len, contents.length - position); + System.arraycopy(contents, position, b, off, bytesToRead); + position += bytesToRead; + return bytesToRead; + } + + @Override + public long skip(long n) throws IOException { + checkOpen(); + if (n <= 0) { + return 0; + } + long bytesToSkip = Math.min(n, (long) contents.length - position); + position += (int) bytesToSkip; + return bytesToSkip; + } + + @Override + public int available() throws IOException { + checkOpen(); + return contents.length - position; + } + + @Override + public void readFully(long pos, byte[] buffer, int offset, int length) throws IOException { + checkOpen(); + Preconditions.checkPositionIndexes(offset, offset + length, buffer.length); + if (pos + length > contents.length) { + throw new EOFException( + "Cannot read " + + length + + " bytes at position " + + pos + + ": exceeds stream length " + + contents.length); + } + System.arraycopy(contents, (int) pos, buffer, offset, length); + } + + @Override + public int readTail(byte[] buffer, int offset, int length) throws IOException { + checkOpen(); + Preconditions.checkPositionIndexes(offset, offset + length, buffer.length); + int bytesToCopy = Math.min(length, contents.length); + System.arraycopy(contents, contents.length - bytesToCopy, buffer, offset, bytesToCopy); + return bytesToCopy; + } + + @Override + public void close() throws IOException { + closed = true; + } + + private void checkOpen() throws IOException { + if (closed) { + throw new IOException("Cannot read: already closed"); + } + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index a4d9766ae713..83191e709787 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -47,14 +47,17 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SingleFetchInputFile; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.SparkExecutorCache; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.PartitionUtil; +import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; import org.slf4j.Logger; @@ -77,6 +80,7 @@ abstract class BaseReader implements Closeable { private final Iterator tasks; private final DeleteCounter counter; private final boolean cacheDeleteFilesOnExecutors; + private final long singleFetchThreshold; private Map lazyInputFiles; private CloseableIterator currentIterator; @@ -102,6 +106,11 @@ abstract class BaseReader implements Closeable { nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null; this.counter = new DeleteCounter(); this.cacheDeleteFilesOnExecutors = cacheDeleteFilesOnExecutors; + this.singleFetchThreshold = + PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.READ_SINGLE_FETCH_THRESHOLD_BYTES, + TableProperties.READ_SINGLE_FETCH_THRESHOLD_BYTES_DEFAULT); } protected abstract CloseableIterator open(TaskT task); @@ -182,9 +191,31 @@ protected InputFile getInputFile(String location) { private Map inputFiles() { if (lazyInputFiles == null) { - this.lazyInputFiles = + Map raw = fileIO.bulkDecrypt( () -> taskGroup.tasks().stream().flatMap(this::referencedFiles).iterator()); + + if (singleFetchThreshold <= 0) { + this.lazyInputFiles = raw; + } else { + Map sizes = Maps.newHashMapWithExpectedSize(raw.size()); + taskGroup.tasks().stream() + .flatMap(this::referencedFiles) + .forEach(file -> sizes.put(file.location(), file.fileSizeInBytes())); + + Map wrapped = Maps.newHashMapWithExpectedSize(raw.size()); + for (Map.Entry entry : raw.entrySet()) { + Long size = sizes.get(entry.getKey()); + if (size == null) { + wrapped.put(entry.getKey(), entry.getValue()); + } else { + wrapped.put( + entry.getKey(), + new SingleFetchInputFile(entry.getValue(), size, singleFetchThreshold)); + } + } + this.lazyInputFiles = wrapped; + } } return lazyInputFiles;