diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java index c3fda19d8d73b..2f043b6499795 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java @@ -21,6 +21,10 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -34,6 +38,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Future IO Helper methods. *

@@ -55,6 +62,7 @@ @InterfaceStability.Unstable public final class FutureIO { + private static final Logger LOG = LoggerFactory.getLogger(FutureIO.class.getName()); private FutureIO() { } @@ -114,6 +122,77 @@ public static T awaitFuture(final Future future, } } + /** + * Evaluates a collection of futures and returns their results as a list. + *

+ * This method blocks until all futures in the collection have completed. + * If any future throws an exception during its execution, this method + * extracts and rethrows that exception. + *

+ * + * @param collection collection of futures to be evaluated + * @param type of the result. + * @return the list of future's result, if all went well. + * @throws InterruptedIOException future was interrupted + * @throws IOException if something went wrong + * @throws RuntimeException any nested RTE thrown + */ + public static List awaitAllFutures(final Collection> collection) + throws InterruptedIOException, IOException, RuntimeException { + List results = new ArrayList<>(); + try { + for (Future future : collection) { + results.add(future.get()); + } + return results; + } catch (InterruptedException e) { + LOG.debug("Execution of future interrupted ", e); + throw (InterruptedIOException) new InterruptedIOException(e.toString()) + .initCause(e); + } catch (ExecutionException e) { + LOG.debug("Execution of future failed with exception", e.getCause()); + return raiseInnerCause(e); + } + } + + /** + * Evaluates a collection of futures and returns their results as a list, + * but only waits up to the specified timeout for each future to complete. + *

+ * This method blocks until all futures in the collection have completed or + * the timeout expires, whichever happens first. If any future throws an + * exception during its execution, this method extracts and rethrows that exception. + *

+ * + * @param collection collection of futures to be evaluated + * @param duration timeout duration + * @param type of the result. + * @return the list of future's result, if all went well. + * @throws InterruptedIOException future was interrupted + * @throws IOException if something went wrong + * @throws RuntimeException any nested RTE thrown + * @throws TimeoutException the future timed out. + */ + public static List awaitAllFutures(final Collection> collection, + final Duration duration) + throws InterruptedIOException, IOException, RuntimeException, + TimeoutException { + List results = new ArrayList<>(); + try { + for (Future future : collection) { + results.add(future.get(duration.toMillis(), TimeUnit.MILLISECONDS)); + } + return results; + } catch (InterruptedException e) { + LOG.debug("Execution of future interrupted ", e); + throw (InterruptedIOException) new InterruptedIOException(e.toString()) + .initCause(e); + } catch (ExecutionException e) { + LOG.debug("Execution of future failed with exception", e.getCause()); + return raiseInnerCause(e); + } + } + /** * From the inner cause of an execution exception, extract the inner cause * if it is an IOE or RTE. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index c1ddfb0447c8e..a1b6fc12a5ce1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -1230,11 +1230,6 @@ public void setOptimizeFooterRead(boolean optimizeFooterRead) { this.optimizeFooterRead = optimizeFooterRead; } - @VisibleForTesting - public void setFooterReadBufferSize(int footerReadBufferSize) { - this.footerReadBufferSize = footerReadBufferSize; - } - @VisibleForTesting public void setEnableAbfsListIterator(boolean enableAbfsListIterator) { this.enableAbfsListIterator = enableAbfsListIterator; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 8b6bc337fb21c..51ba90f8e0a95 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -279,7 +279,7 @@ private FSDataInputStream open(final Path path, try { TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.OPEN, tracingHeaderFormat, listener); - InputStream inputStream = abfsStore + InputStream inputStream = getAbfsStore() .openFileForRead(qualifiedPath, parameters, statistics, tracingContext); return new FSDataInputStream(inputStream); } catch (AzureBlobFileSystemException ex) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 484b1ed9e10b7..5c8a3acbcb023 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -898,21 +898,21 @@ private AbfsInputStreamContext populateAbfsInputStreamContext( .map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false)) .orElse(false); int footerReadBufferSize = options.map(c -> c.getInt( - AZURE_FOOTER_READ_BUFFER_SIZE, abfsConfiguration.getFooterReadBufferSize())) - .orElse(abfsConfiguration.getFooterReadBufferSize()); - return new AbfsInputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds()) - .withReadBufferSize(abfsConfiguration.getReadBufferSize()) - .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) - .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends()) - .isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled()) - .withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely()) - .withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead()) + AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize())) + .orElse(getAbfsConfiguration().getFooterReadBufferSize()); + return new AbfsInputStreamContext(getAbfsConfiguration().getSasTokenRenewPeriodForStreamsInSeconds()) + .withReadBufferSize(getAbfsConfiguration().getReadBufferSize()) + .withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth()) + .withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends()) + .isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled()) + .withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely()) + .withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead()) .withFooterReadBufferSize(footerReadBufferSize) - .withReadAheadRange(abfsConfiguration.getReadAheadRange()) + .withReadAheadRange(getAbfsConfiguration().getReadAheadRange()) .withStreamStatistics(new AbfsInputStreamStatisticsImpl()) .withShouldReadBufferSizeAlways( - abfsConfiguration.shouldReadBufferSizeAlways()) - .withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize()) + getAbfsConfiguration().shouldReadBufferSizeAlways()) + .withReadAheadBlockSize(getAbfsConfiguration().getReadAheadBlockSize()) .withBufferedPreadDisabled(bufferedPreadDisabled) .withEncryptionAdapter(contextEncryptionAdapter) .withAbfsBackRef(fsBackRef) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index af82c3f128413..19c67a8358867 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -141,7 +141,10 @@ public AbfsInputStream( this.path = path; this.contentLength = contentLength; this.bufferSize = abfsInputStreamContext.getReadBufferSize(); - this.footerReadSize = abfsInputStreamContext.getFooterReadBufferSize(); + /* + * FooterReadSize should not be more than bufferSize. + */ + this.footerReadSize = Math.min(bufferSize, abfsInputStreamContext.getFooterReadBufferSize()); this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth(); this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); this.eTag = eTag; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index 3ae3f878a19fa..00d853175108d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -89,7 +89,7 @@ public abstract class AbstractAbfsIntegrationTest extends private AuthType authType; private boolean useConfiguredFileSystem = false; private boolean usingFilesystemForSASTests = false; - private static final int SHORTENED_GUID_LEN = 12; + public static final int SHORTENED_GUID_LEN = 12; protected AbstractAbfsIntegrationTest() throws Exception { fileSystemName = TEST_CONTAINER_PREFIX + UUID.randomUUID().toString(); @@ -366,6 +366,14 @@ public AbfsConfiguration getConfiguration() { return abfsConfig; } + public AbfsConfiguration getConfiguration(AzureBlobFileSystem fs) { + return fs.getAbfsStore().getAbfsConfiguration(); + } + + public Map getInstrumentationMap(AzureBlobFileSystem fs) { + return fs.getInstrumentationMap(); + } + public Configuration getRawConfiguration() { return abfsConfig.getRawConfiguration(); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java new file mode 100644 index 0000000000000..388e662115ed2 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.util.Random; +import java.util.UUID; + +import org.assertj.core.api.Assertions; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.utils.UriUtils; + +import static org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest.SHORTENED_GUID_LEN; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_OPTIMIZE_FOOTER_READ; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_SMALL_FILES_COMPLETELY; + +public class AbfsInputStreamTestUtils { + + public static final int HUNDRED = 100; + + private final AbstractAbfsIntegrationTest abstractAbfsIntegrationTest; + + public AbfsInputStreamTestUtils(AbstractAbfsIntegrationTest abstractAbfsIntegrationTest) { + this.abstractAbfsIntegrationTest = abstractAbfsIntegrationTest; + } + + private Path path(String filepath) throws IOException { + return abstractAbfsIntegrationTest.getFileSystem().makeQualified( + new Path(getTestPath(), getUniquePath(filepath))); + } + + private Path getTestPath() { + Path path = new Path(UriUtils.generateUniqueTestPath()); + return path; + } + + /** + * Generate a unique path using the given filepath. + * @param filepath path string + * @return unique path created from filepath and a GUID + */ + private Path getUniquePath(String filepath) { + if (filepath.equals("/")) { + return new Path(filepath); + } + return new Path(filepath + StringUtils + .right(UUID.randomUUID().toString(), SHORTENED_GUID_LEN)); + } + + /** + * Returns AzureBlobFileSystem instance with the required + * readFullFileOptimization configuration. + * + * @param readSmallFilesCompletely whether to read small files completely + * @return AzureBlobFileSystem instance + * @throws IOException exception in creating fileSystem + */ + public AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely) + throws IOException { + Configuration configuration = new Configuration( + abstractAbfsIntegrationTest.getRawConfiguration()); + configuration.setBoolean(AZURE_READ_SMALL_FILES_COMPLETELY, + readSmallFilesCompletely); + configuration.setBoolean(AZURE_READ_OPTIMIZE_FOOTER_READ, false); + return (AzureBlobFileSystem) FileSystem.newInstance(configuration); + } + + /** + * Return array of random bytes of the given length. + * + * @param length length of the byte array + * @return byte array + */ + public byte[] getRandomBytesArray(int length) { + final byte[] b = new byte[length]; + new Random().nextBytes(b); + return b; + } + + /** + * Create a file on the file system with the given file name and content. + * + * @param fs fileSystem that stores the file + * @param fileName name of the file + * @param fileContent content of the file + * + * @return path of the file created + * @throws IOException exception in writing file on fileSystem + */ + public Path createFileWithContent(FileSystem fs, String fileName, + byte[] fileContent) throws IOException { + Path testFilePath = path(fileName); + try (FSDataOutputStream oStream = fs.create(testFilePath)) { + oStream.write(fileContent); + oStream.flush(); + } + return testFilePath; + } + + /** + * Assert that the content read from the subsection of a file is correct. + * + * @param actualFileContent actual content of the file + * @param from start index of the content read + * @param len length of the content read + * @param contentRead content read from the file + * @param testFilePath path of the file + */ + public void assertContentReadCorrectly(byte[] actualFileContent, int from, + int len, byte[] contentRead, Path testFilePath) { + Assertions.assertThat(actualFileContent.length) + .describedAs("From + len should be less than or equal to " + + "the actual file content length") + .isGreaterThanOrEqualTo(from + len); + Assertions.assertThat(contentRead.length) + .describedAs("Content read length should be greater than or " + + "equal to the len") + .isGreaterThanOrEqualTo(len); + for (int i = 0; i < len; i++) { + Assertions.assertThat(contentRead[i]) + .describedAs( + "The test file path is " + testFilePath + ". Equality failed" + + "at index " + i + + " of the contentRead array. ActualFileContent is being compared from index " + + from) + .isEqualTo(actualFileContent[i + from]); + } + } + + /** + * Assert that the readBuffer in AbfsInputStream contain the correct starting + * subsequence of the file content. + * + * @param actualContent actual content of the file + * @param abfsInputStream abfsInputStream whose buffer to be asserted + * @param conf configuration + * @param testFilePath path of the file + */ + public void assertAbfsInputStreamBufferEqualToContentStartSubsequence(byte[] actualContent, + AbfsInputStream abfsInputStream, + AbfsConfiguration conf, + Path testFilePath) { + Assertions.assertThat(abfsInputStream.getBuffer().length) + .describedAs("ReadBuffer should be lesser than or equal to " + + "readBufferSize") + .isLessThanOrEqualTo(conf.getReadBufferSize()); + assertAbfsInputStreamBufferEqualityWithContentStartingSubSequence( + actualContent, abfsInputStream.getBuffer(), conf, + false, testFilePath); + } + + /** + * Assert that the readBuffer in AbfsInputStream contain the incorrect starting + * subsequence of the file content. + * + * @param actualContent actual content of the file + * @param abfsInputStream abfsInputStream whose buffer to be asserted + * @param conf configuration + * @param testFilePath path of the file + */ + public void assertAbfsInputStreamBufferNotEqualToContentStartSubsequence(byte[] actualContent, + AbfsInputStream abfsInputStream, + AbfsConfiguration conf, + Path testFilePath) { + Assertions.assertThat(abfsInputStream.getBuffer().length) + .describedAs("ReadBuffer should be lesser than or equal to " + + "readBufferSize") + .isLessThanOrEqualTo(conf.getReadBufferSize()); + assertAbfsInputStreamBufferEqualityWithContentStartingSubSequence( + actualContent, abfsInputStream.getBuffer(), conf, true, + testFilePath); + } + + /** + * Assert the equality or inequality of abfsInputStreamReadBuffer with the + * starting subsequence of the fileContent. + * + * @param actualContent actual content of the file + * @param abfsInputStreamReadBuffer buffer read from the abfsInputStream + * @param conf configuration + * @param assertEqual whether to assert equality or inequality + * @param testFilePath path of the file + */ + private void assertAbfsInputStreamBufferEqualityWithContentStartingSubSequence( + byte[] actualContent, + byte[] abfsInputStreamReadBuffer, + AbfsConfiguration conf, + boolean assertEqual, + Path testFilePath) { + int bufferSize = conf.getReadBufferSize(); + int actualContentSize = actualContent.length; + int n = Math.min(actualContentSize, bufferSize); + int matches = 0; + for (int i = 0; i < n && i < abfsInputStreamReadBuffer.length; i++) { + if (actualContent[i] == abfsInputStreamReadBuffer[i]) { + matches++; + } + } + if (assertEqual) { + Assertions.assertThat(matches).describedAs( + "The test file path is " + testFilePath).isEqualTo(n); + } else { + Assertions.assertThat(matches).describedAs( + "The test file path is " + testFilePath).isNotEqualTo(n); + } + } + + /** + * Seek inputStream to the given seekPos. + * + * @param iStream inputStream to seek + * @param seekPos position to seek + * @throws IOException exception in seeking inputStream + */ + public void seek(FSDataInputStream iStream, long seekPos) + throws IOException { + AbfsInputStream abfsInputStream + = (AbfsInputStream) iStream.getWrappedStream(); + verifyAbfsInputStreamBaseStateBeforeSeek(abfsInputStream); + iStream.seek(seekPos); + verifyAbsInputStreamStateAfterSeek(abfsInputStream, seekPos); + } + + /** + * Verifies that the pointers in AbfsInputStream state are unchanged and are + * equal to that of a newly created inputStream. + * + * @param abfsInputStream inputStream to verify + */ + public void verifyAbfsInputStreamBaseStateBeforeSeek(AbfsInputStream abfsInputStream) { + Assertions.assertThat(abfsInputStream.getFCursor()) + .describedAs("FCursor should be 0 at the inputStream open") + .isEqualTo(0); + Assertions.assertThat(abfsInputStream.getFCursorAfterLastRead()) + .describedAs( + "FCursorAfterLastRead should be -1 at the inputStream open") + .isEqualTo(-1); + Assertions.assertThat(abfsInputStream.getLimit()) + .describedAs("Limit should be 0 at the inputStream open") + .isEqualTo(0); + Assertions.assertThat(abfsInputStream.getBCursor()) + .describedAs("BCursor should be 0 at the inputStream open") + .isEqualTo(0); + } + + /** + * Verifies that only the FCursor is updated after seek and all other pointers + * are in their initial state. + * + * @param abfsInputStream inputStream to verify + * @param seekPos position to seek + * + * @throws IOException exception in inputStream operations + */ + public void verifyAbsInputStreamStateAfterSeek(AbfsInputStream abfsInputStream, + long seekPos) throws IOException { + Assertions.assertThat(abfsInputStream.getPos()) + .describedAs("InputStream's pos should be " + seekPos + " after seek") + .isEqualTo(seekPos); + Assertions.assertThat(abfsInputStream.getFCursorAfterLastRead()) + .describedAs("FCursorAfterLastRead should be -1 after seek") + .isEqualTo(-1); + Assertions.assertThat(abfsInputStream.getLimit()) + .describedAs("Limit should be 0 after seek") + .isEqualTo(0); + Assertions.assertThat(abfsInputStream.getBCursor()) + .describedAs("BCursor should be 0 after seek") + .isEqualTo(0); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java index b27d92c319ce5..d14ac05d5f5aa 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java @@ -19,24 +19,20 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; -import java.lang.reflect.Field; -import java.util.Map; -import java.util.Random; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; -import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.assertj.core.api.Assertions; import org.junit.Test; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; +import static org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamTestUtils.HUNDRED; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -45,9 +41,9 @@ public class ITestAbfsInputStream extends AbstractAbfsIntegrationTest { - protected static final int HUNDRED = 100; - + private final AbfsInputStreamTestUtils abfsInputStreamTestUtils; public ITestAbfsInputStream() throws Exception { + this.abfsInputStreamTestUtils = new AbfsInputStreamTestUtils(this); } @Test @@ -56,8 +52,8 @@ public void testWithNoOptimization() throws Exception { int fileSize = i * ONE_MB; final AzureBlobFileSystem fs = getFileSystem(false, false, fileSize); String fileName = methodName.getMethodName() + i; - byte[] fileContent = getRandomBytesArray(fileSize); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); + byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize); + Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, fileName, fileContent); testWithNoOptimization(fs, testFilePath, HUNDRED, fileContent); } } @@ -71,7 +67,7 @@ protected void testWithNoOptimization(final FileSystem fs, .getWrappedStream(); iStream = new FSDataInputStream(abfsInputStream); - seek(iStream, seekPos); + abfsInputStreamTestUtils.seek(iStream, seekPos); long totalBytesRead = 0; int length = HUNDRED * HUNDRED; do { @@ -82,7 +78,7 @@ protected void testWithNoOptimization(final FileSystem fs, length = (fileContent.length - seekPos) % length; } assertEquals(length, bytesRead); - assertContentReadCorrectly(fileContent, + abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, (int) (seekPos + totalBytesRead - length), length, buffer, testFilePath); assertTrue(abfsInputStream.getFCursor() >= seekPos + totalBytesRead); @@ -101,8 +97,8 @@ public void testExceptionInOptimization() throws Exception { int fileSize = i * ONE_MB; final AzureBlobFileSystem fs = getFileSystem(true, true, fileSize); String fileName = methodName.getMethodName() + i; - byte[] fileContent = getRandomBytesArray(fileSize); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); + byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize); + Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, fileName, fileContent); testExceptionInOptimization(fs, testFilePath, fileSize - HUNDRED, fileSize / 4, fileContent); } @@ -144,8 +140,8 @@ private void testExceptionInOptimization(final FileSystem fs, any(TracingContext.class)); iStream = new FSDataInputStream(abfsInputStream); - verifyBeforeSeek(abfsInputStream); - seek(iStream, seekPos); + abfsInputStreamTestUtils.verifyAbfsInputStreamBaseStateBeforeSeek(abfsInputStream); + abfsInputStreamTestUtils.seek(iStream, seekPos); byte[] buffer = new byte[length]; int bytesRead = iStream.read(buffer, 0, length); long actualLength = length; @@ -154,7 +150,8 @@ private void testExceptionInOptimization(final FileSystem fs, actualLength = length - delta; } assertEquals(bytesRead, actualLength); - assertContentReadCorrectly(fileContent, seekPos, (int) actualLength, buffer, testFilePath); + abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, seekPos, + (int) actualLength, buffer, testFilePath); assertEquals(fileContent.length, abfsInputStream.getFCursor()); assertEquals(fileContent.length, abfsInputStream.getFCursorAfterLastRead()); assertEquals(actualLength, abfsInputStream.getBCursor()); @@ -164,18 +161,6 @@ private void testExceptionInOptimization(final FileSystem fs, } } - protected AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely) - throws IOException { - final AzureBlobFileSystem fs = getFileSystem(); - getAbfsStore(fs).getAbfsConfiguration() - .setReadSmallFilesCompletely(readSmallFilesCompletely); - getAbfsStore(fs).getAbfsConfiguration() - .setOptimizeFooterRead(false); - getAbfsStore(fs).getAbfsConfiguration() - .setIsChecksumValidationEnabled(true); - return fs; - } - private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead, boolean readSmallFileCompletely, int fileSize) throws IOException { final AzureBlobFileSystem fs = getFileSystem(); @@ -190,96 +175,4 @@ private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead, } return fs; } - - protected byte[] getRandomBytesArray(int length) { - final byte[] b = new byte[length]; - new Random().nextBytes(b); - return b; - } - - protected Path createFileWithContent(FileSystem fs, String fileName, - byte[] fileContent) throws IOException { - Path testFilePath = path(fileName); - try (FSDataOutputStream oStream = fs.create(testFilePath)) { - oStream.write(fileContent); - oStream.flush(); - } - return testFilePath; - } - - protected AzureBlobFileSystemStore getAbfsStore(FileSystem fs) - throws NoSuchFieldException, IllegalAccessException { - AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs; - Field abfsStoreField = AzureBlobFileSystem.class - .getDeclaredField("abfsStore"); - abfsStoreField.setAccessible(true); - return (AzureBlobFileSystemStore) abfsStoreField.get(abfs); - } - - protected Map getInstrumentationMap(FileSystem fs) - throws NoSuchFieldException, IllegalAccessException { - AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs; - Field abfsCountersField = AzureBlobFileSystem.class - .getDeclaredField("abfsCounters"); - abfsCountersField.setAccessible(true); - AbfsCounters abfsCounters = (AbfsCounters) abfsCountersField.get(abfs); - return abfsCounters.toMap(); - } - - protected void assertContentReadCorrectly(byte[] actualFileContent, int from, - int len, byte[] contentRead, Path testFilePath) { - for (int i = 0; i < len; i++) { - assertEquals("The test file path is " + testFilePath, contentRead[i], actualFileContent[i + from]); - } - } - - protected void assertBuffersAreNotEqual(byte[] actualContent, - byte[] contentRead, AbfsConfiguration conf, Path testFilePath) { - assertBufferEquality(actualContent, contentRead, conf, false, testFilePath); - } - - protected void assertBuffersAreEqual(byte[] actualContent, byte[] contentRead, - AbfsConfiguration conf, Path testFilePath) { - assertBufferEquality(actualContent, contentRead, conf, true, testFilePath); - } - - private void assertBufferEquality(byte[] actualContent, byte[] contentRead, - AbfsConfiguration conf, boolean assertEqual, Path testFilePath) { - int bufferSize = conf.getReadBufferSize(); - int actualContentSize = actualContent.length; - int n = (actualContentSize < bufferSize) ? actualContentSize : bufferSize; - int matches = 0; - for (int i = 0; i < n; i++) { - if (actualContent[i] == contentRead[i]) { - matches++; - } - } - if (assertEqual) { - assertEquals("The test file path is " + testFilePath, n, matches); - } else { - assertNotEquals("The test file path is " + testFilePath, n, matches); - } - } - - protected void seek(FSDataInputStream iStream, long seekPos) - throws IOException { - AbfsInputStream abfsInputStream = (AbfsInputStream) iStream.getWrappedStream(); - verifyBeforeSeek(abfsInputStream); - iStream.seek(seekPos); - verifyAfterSeek(abfsInputStream, seekPos); - } - - private void verifyBeforeSeek(AbfsInputStream abfsInputStream){ - assertEquals(0, abfsInputStream.getFCursor()); - assertEquals(-1, abfsInputStream.getFCursorAfterLastRead()); - assertEquals(0, abfsInputStream.getLimit()); - assertEquals(0, abfsInputStream.getBCursor()); - } - - private void verifyAfterSeek(AbfsInputStream abfsInputStream, long seekPos) throws IOException { - assertEquals(seekPos, abfsInputStream.getPos()); - assertEquals(-1, abfsInputStream.getFCursorAfterLastRead()); - assertEquals(0, abfsInputStream.getLimit()); - assertEquals(0, abfsInputStream.getBCursor()); - } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java index bf205879cb4a0..c7c9da94ab2ed 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java @@ -19,14 +19,23 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FutureDataInputStreamBuilder; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsScaleTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.assertj.core.api.Assertions; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -34,12 +43,15 @@ import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.util.functional.FutureIO; import static java.lang.Math.max; import static java.lang.Math.min; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FOOTER_READ_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; +import static org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamTestUtils.HUNDRED; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -49,44 +61,112 @@ import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; -public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream { +public class ITestAbfsInputStreamReadFooter extends AbstractAbfsScaleTest { private static final int TEN = 10; private static final int TWENTY = 20; + private static ExecutorService executorService; + + private static final int SIZE_256_KB = 256 * ONE_KB; + + private static final Integer[] FILE_SIZES = { + SIZE_256_KB, + 2 * SIZE_256_KB, + ONE_MB, + 4 * ONE_MB + }; + + private static final Integer[] READ_BUFFER_SIZE = { + SIZE_256_KB, + 2 * SIZE_256_KB, + ONE_MB, + 4 * ONE_MB + }; + + private static final Integer[] FOOTER_READ_BUFFER_SIZE = { + SIZE_256_KB, + 2 * SIZE_256_KB, + ONE_MB + }; + + private final AbfsInputStreamTestUtils abfsInputStreamTestUtils; + public ITestAbfsInputStreamReadFooter() throws Exception { + this.abfsInputStreamTestUtils = new AbfsInputStreamTestUtils(this); + } + + @BeforeClass + public static void init() { + executorService = Executors.newFixedThreadPool( + 2 * Runtime.getRuntime().availableProcessors()); + } + + @AfterClass + public static void close() { + executorService.shutdown(); } @Test public void testOnlyOneServerCallIsMadeWhenTheConfIsTrue() throws Exception { - testNumBackendCalls(true); + validateNumBackendCalls(true); } @Test public void testMultipleServerCallsAreMadeWhenTheConfIsFalse() throws Exception { - testNumBackendCalls(false); + validateNumBackendCalls(false); } - private void testNumBackendCalls(boolean optimizeFooterRead) + + /** + * For different combination of file sizes, read buffer sizes and footer read + * buffer size, assert the number of server calls made when the optimization + * is enabled and disabled. + *

+ * If the footer optimization is on, if the first read on the file is within the + * footer range (given by {@link AbfsInputStream#FOOTER_SIZE}, then the last block + * of size footerReadBufferSize is read from the server, and then subsequent + * inputStream reads from that block is returned from the buffer maintained by the + * AbfsInputStream. So, those reads will not result in server calls. + */ + private void validateNumBackendCalls(boolean optimizeFooterRead) throws Exception { int fileIdx = 0; - for (int i = 0; i <= 4; i++) { - for (int j = 0; j <= 2; j++) { - int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; - int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB; - final AzureBlobFileSystem fs = getFileSystem( - optimizeFooterRead, fileSize); - Path testFilePath = createPathAndFileWithContent( - fs, fileIdx++, fileSize); + final List> futureList = new ArrayList<>(); + for (int fileSize : FILE_SIZES) { + final int fileId = fileIdx++; + Future future = executorService.submit(() -> { + try (AzureBlobFileSystem spiedFs = createSpiedFs( + getRawConfiguration())) { + Path testPath = createPathAndFileWithContent( + spiedFs, fileId, fileSize); + validateNumBackendCalls(spiedFs, optimizeFooterRead, fileSize, + testPath); + return null; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }); + futureList.add(future); + } + FutureIO.awaitAllFutures(futureList); + } + + private void validateNumBackendCalls(final AzureBlobFileSystem spiedFs, + final boolean optimizeFooterRead, final int fileSize, final Path testFilePath) throws Exception { + for (int readBufferSize : READ_BUFFER_SIZE) { + for (int footerReadBufferSize : FOOTER_READ_BUFFER_SIZE) { + changeFooterConfigs(spiedFs, optimizeFooterRead, fileSize, readBufferSize); int length = AbfsInputStream.FOOTER_SIZE; FutureDataInputStreamBuilder builder = getParameterizedBuilder( - testFilePath, fs, footerReadBufferSize); + testFilePath, spiedFs, footerReadBufferSize); try (FSDataInputStream iStream = builder.build().get()) { verifyConfigValueInStream(iStream, footerReadBufferSize); byte[] buffer = new byte[length]; - Map metricMap = getInstrumentationMap(fs); + Map metricMap = + getInstrumentationMap(spiedFs); long requestsMadeBeforeTest = metricMap .get(CONNECTIONS_MADE.getStatName()); @@ -99,7 +179,7 @@ private void testNumBackendCalls(boolean optimizeFooterRead) iStream.seek(fileSize - (TWENTY * ONE_KB)); iStream.read(buffer, 0, length); - metricMap = getInstrumentationMap(fs); + metricMap = getInstrumentationMap(spiedFs); long requestsMadeAfterTest = metricMap .get(CONNECTIONS_MADE.getStatName()); @@ -117,74 +197,104 @@ private void testNumBackendCalls(boolean optimizeFooterRead) @Test public void testSeekToBeginAndReadWithConfTrue() throws Exception { - testSeekAndReadWithConf(true, SeekTo.BEGIN); + validateSeekAndReadWithConf(true, SeekTo.BEGIN); } @Test public void testSeekToBeginAndReadWithConfFalse() throws Exception { - testSeekAndReadWithConf(false, SeekTo.BEGIN); + validateSeekAndReadWithConf(false, SeekTo.BEGIN); } @Test public void testSeekToBeforeFooterAndReadWithConfTrue() throws Exception { - testSeekAndReadWithConf(true, SeekTo.BEFORE_FOOTER_START); + validateSeekAndReadWithConf(true, SeekTo.BEFORE_FOOTER_START); } @Test public void testSeekToBeforeFooterAndReadWithConfFalse() throws Exception { - testSeekAndReadWithConf(false, SeekTo.BEFORE_FOOTER_START); + validateSeekAndReadWithConf(false, SeekTo.BEFORE_FOOTER_START); } @Test public void testSeekToFooterAndReadWithConfTrue() throws Exception { - testSeekAndReadWithConf(true, SeekTo.AT_FOOTER_START); + validateSeekAndReadWithConf(true, SeekTo.AT_FOOTER_START); } @Test public void testSeekToFooterAndReadWithConfFalse() throws Exception { - testSeekAndReadWithConf(false, SeekTo.AT_FOOTER_START); + validateSeekAndReadWithConf(false, SeekTo.AT_FOOTER_START); } @Test public void testSeekToAfterFooterAndReadWithConfTrue() throws Exception { - testSeekAndReadWithConf(true, SeekTo.AFTER_FOOTER_START); + validateSeekAndReadWithConf(true, SeekTo.AFTER_FOOTER_START); } @Test public void testSeekToToAfterFooterAndReadWithConfFalse() throws Exception { - testSeekAndReadWithConf(false, SeekTo.AFTER_FOOTER_START); + validateSeekAndReadWithConf(false, SeekTo.AFTER_FOOTER_START); } @Test public void testSeekToEndAndReadWithConfTrue() throws Exception { - testSeekAndReadWithConf(true, SeekTo.END); + validateSeekAndReadWithConf(true, SeekTo.END); } @Test public void testSeekToEndAndReadWithConfFalse() throws Exception { - testSeekAndReadWithConf(false, SeekTo.END); + validateSeekAndReadWithConf(false, SeekTo.END); } - private void testSeekAndReadWithConf(boolean optimizeFooterRead, + /** + * For different combination of file sizes, read buffer sizes and footer read + * buffer size, and read from different seek positions, validate the internal + * state of AbfsInputStream. + */ + private void validateSeekAndReadWithConf(boolean optimizeFooterRead, SeekTo seekTo) throws Exception { + int fileIdx = 0; + List> futureList = new ArrayList<>(); + for (int fileSize : FILE_SIZES) { + final int fileId = fileIdx++; + futureList.add(executorService.submit(() -> { + try (AzureBlobFileSystem spiedFs = createSpiedFs( + getRawConfiguration())) { + String fileName = methodName.getMethodName() + fileId; + byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize); + Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName, + fileContent); + for (int readBufferSize : READ_BUFFER_SIZE) { + validateSeekAndReadWithConf(spiedFs, optimizeFooterRead, seekTo, + readBufferSize, fileSize, testFilePath, fileContent); + } + return null; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + })); + } + FutureIO.awaitAllFutures(futureList); + } + + private void validateSeekAndReadWithConf(final AzureBlobFileSystem spiedFs, + final boolean optimizeFooterRead, + final SeekTo seekTo, + final int readBufferSize, + final int fileSize, + final Path testFilePath, + final byte[] fileContent) + throws Exception { // Running the test for file sizes ranging from 256 KB to 4 MB with // Footer Read Buffer size ranging from 256 KB to 1 MB // This will cover files less than footer read buffer size, // Files between footer read buffer and read buffer size // Files bigger than read buffer size - int fileIdx = 0; - for (int i = 0; i <= 4; i++) { - for (int j = 0; j <= 2; j++) { - int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; - int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB; - final AzureBlobFileSystem fs = getFileSystem( - optimizeFooterRead, fileSize); - String fileName = methodName.getMethodName() + fileIdx++; - byte[] fileContent = getRandomBytesArray(fileSize); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - seekReadAndTest(fs, testFilePath, seekPos(seekTo, fileSize), HUNDRED, - fileContent, footerReadBufferSize); - } + for (int footerReadBufferSize : FOOTER_READ_BUFFER_SIZE) { + changeFooterConfigs(spiedFs, optimizeFooterRead, fileSize, + readBufferSize); + + seekReadAndTest(spiedFs, testFilePath, seekPos(seekTo, fileSize), HUNDRED, + fileContent, footerReadBufferSize); } } @@ -216,7 +326,7 @@ private void seekReadAndTest(final AzureBlobFileSystem fs, AbfsInputStream abfsInputStream = (AbfsInputStream) iStream.getWrappedStream(); verifyConfigValueInStream(iStream, footerReadBufferSize); long readBufferSize = abfsInputStream.getBufferSize(); - seek(iStream, seekPos); + abfsInputStreamTestUtils.seek(iStream, seekPos); byte[] buffer = new byte[length]; long bytesRead = iStream.read(buffer, 0, length); @@ -260,13 +370,13 @@ private void seekReadAndTest(final AzureBlobFileSystem fs, assertEquals(expectedBCursor, abfsInputStream.getBCursor()); assertEquals(actualLength, bytesRead); // Verify user-content read - assertContentReadCorrectly(fileContent, seekPos, (int) actualLength, buffer, testFilePath); + abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, seekPos, (int) actualLength, buffer, testFilePath); // Verify data read to AbfsInputStream buffer int from = seekPos; if (optimizationOn) { from = (int) max(0, actualContentLength - footerReadBufferSize); } - assertContentReadCorrectly(fileContent, from, (int) abfsInputStream.getLimit(), + abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, from, (int) abfsInputStream.getLimit(), abfsInputStream.getBuffer(), testFilePath); } } @@ -274,44 +384,67 @@ private void seekReadAndTest(final AzureBlobFileSystem fs, @Test public void testPartialReadWithNoData() throws Exception { int fileIdx = 0; - for (int i = 0; i <= 4; i++) { - for (int j = 0; j <= 2; j++) { - int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; - int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB; - final AzureBlobFileSystem fs = getFileSystem( - true, fileSize, footerReadBufferSize); - String fileName = methodName.getMethodName() + fileIdx++; - byte[] fileContent = getRandomBytesArray(fileSize); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - testPartialReadWithNoData(fs, testFilePath, - fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE, - fileContent, footerReadBufferSize); + List> futureList = new ArrayList<>(); + for (int fileSize : FILE_SIZES) { + final int fileId = fileIdx++; + final String fileName = methodName.getMethodName() + fileId; + futureList.add(executorService.submit(() -> { + try (AzureBlobFileSystem spiedFs = createSpiedFs( + getRawConfiguration())) { + byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize); + Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName, + fileContent); + validatePartialReadWithNoData(spiedFs, fileSize, fileContent, + testFilePath); + return null; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + })); + FutureIO.awaitAllFutures(futureList); + } + } + + private void validatePartialReadWithNoData(final AzureBlobFileSystem spiedFs, + final int fileSize, + final byte[] fileContent, + Path testFilePath) throws IOException { + for (int readBufferSize : READ_BUFFER_SIZE) { + for (int footerReadBufferSize : FOOTER_READ_BUFFER_SIZE) { + changeFooterConfigs(spiedFs, true, fileSize, + footerReadBufferSize, readBufferSize); + + validatePartialReadWithNoData(spiedFs, testFilePath, + fileSize - AbfsInputStream.FOOTER_SIZE, + AbfsInputStream.FOOTER_SIZE, + fileContent, footerReadBufferSize, readBufferSize); } } } - private void testPartialReadWithNoData(final FileSystem fs, + private void validatePartialReadWithNoData(final FileSystem fs, final Path testFilePath, final int seekPos, final int length, - final byte[] fileContent, int footerReadBufferSize) throws IOException { + final byte[] fileContent, int footerReadBufferSize, final int readBufferSize) throws IOException { FSDataInputStream iStream = fs.open(testFilePath); try { AbfsInputStream abfsInputStream = (AbfsInputStream) iStream .getWrappedStream(); + int footerBufferSizeAssert = Math.min(readBufferSize, footerReadBufferSize); Assertions.assertThat(abfsInputStream.getFooterReadBufferSize()) .describedAs("Footer Read Buffer Size Should be same as what set in builder") - .isEqualTo(footerReadBufferSize); + .isEqualTo(footerBufferSizeAssert); abfsInputStream = spy(abfsInputStream); doReturn(10).doReturn(10).doCallRealMethod().when(abfsInputStream) .readRemote(anyLong(), any(), anyInt(), anyInt(), any(TracingContext.class)); iStream = new FSDataInputStream(abfsInputStream); - seek(iStream, seekPos); + abfsInputStreamTestUtils.seek(iStream, seekPos); byte[] buffer = new byte[length]; int bytesRead = iStream.read(buffer, 0, length); assertEquals(length, bytesRead); - assertContentReadCorrectly(fileContent, seekPos, length, buffer, testFilePath); + abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, seekPos, length, buffer, testFilePath); assertEquals(fileContent.length, abfsInputStream.getFCursor()); assertEquals(length, abfsInputStream.getBCursor()); assertTrue(abfsInputStream.getLimit() >= length); @@ -322,28 +455,51 @@ private void testPartialReadWithNoData(final FileSystem fs, @Test public void testPartialReadWithSomeData() throws Exception { - for (int i = 0; i <= 4; i++) { - for (int j = 0; j <= 2; j++) { - int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; - int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB; - final AzureBlobFileSystem fs = getFileSystem(true, - fileSize, footerReadBufferSize); - String fileName = methodName.getMethodName() + i; - byte[] fileContent = getRandomBytesArray(fileSize); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - testPartialReadWithSomeData(fs, testFilePath, - fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE, - fileContent, footerReadBufferSize); + int fileIdx = 0; + List> futureList = new ArrayList<>(); + for (int fileSize : FILE_SIZES) { + final int fileId = fileIdx++; + futureList.add(executorService.submit(() -> { + try (AzureBlobFileSystem spiedFs = createSpiedFs( + getRawConfiguration())) { + String fileName = methodName.getMethodName() + fileId; + byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize); + Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName, + fileContent); + validatePartialReadWithSomeData(spiedFs, fileSize, testFilePath, + fileContent); + return null; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + })); + } + FutureIO.awaitAllFutures(futureList); + } + + private void validatePartialReadWithSomeData(final AzureBlobFileSystem spiedFs, + final int fileSize, final Path testFilePath, final byte[] fileContent) + throws IOException { + for (int readBufferSize : READ_BUFFER_SIZE) { + for (int footerReadBufferSize : FOOTER_READ_BUFFER_SIZE) { + changeFooterConfigs(spiedFs, true, + fileSize, footerReadBufferSize, readBufferSize); + + validatePartialReadWithSomeData(spiedFs, testFilePath, + fileSize - AbfsInputStream.FOOTER_SIZE, + AbfsInputStream.FOOTER_SIZE, + fileContent, footerReadBufferSize, readBufferSize); } } } - private void testPartialReadWithSomeData(final FileSystem fs, + private void validatePartialReadWithSomeData(final FileSystem fs, final Path testFilePath, final int seekPos, final int length, - final byte[] fileContent, final int footerReadBufferSize) throws IOException { + final byte[] fileContent, final int footerReadBufferSize, + final int readBufferSize) throws IOException { FSDataInputStream iStream = fs.open(testFilePath); try { - verifyConfigValueInStream(iStream, footerReadBufferSize); + verifyConfigValueInStream(iStream, Math.min(footerReadBufferSize, readBufferSize)); AbfsInputStream abfsInputStream = spy((AbfsInputStream) iStream .getWrappedStream()); // first readRemote, will return first 10 bytes @@ -358,7 +514,7 @@ private void testPartialReadWithSomeData(final FileSystem fs, any(TracingContext.class)); iStream = new FSDataInputStream(abfsInputStream); - seek(iStream, seekPos); + abfsInputStreamTestUtils.seek(iStream, seekPos); byte[] buffer = new byte[length]; int bytesRead = iStream.read(buffer, 0, length); @@ -379,7 +535,7 @@ private void testPartialReadWithSomeData(final FileSystem fs, public void testFooterReadBufferSizeConfiguration() throws Exception { Configuration config = new Configuration(this.getRawConfiguration()); config.unset(AZURE_FOOTER_READ_BUFFER_SIZE); - try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(config)){ + try (AzureBlobFileSystem fs = createSpiedFs(config)){ Path testFilePath = createPathAndFileWithContent(fs, 0, ONE_KB); final int footerReadBufferSizeConfig = 4 * ONE_KB; final int footerReadBufferSizeBuilder = 5 * ONE_KB; @@ -389,13 +545,13 @@ public void testFooterReadBufferSizeConfiguration() throws Exception { verifyConfigValueInStream(iStream, DEFAULT_FOOTER_READ_BUFFER_SIZE); // Verify that value set in config is used if builder is not used - getAbfsStore(fs).getAbfsConfiguration() - .setFooterReadBufferSize(footerReadBufferSizeConfig); + AbfsConfiguration spiedConfig = fs.getAbfsStore().getAbfsConfiguration(); + Mockito.doReturn(footerReadBufferSizeConfig).when(spiedConfig).getFooterReadBufferSize(); iStream = fs.open(testFilePath); verifyConfigValueInStream(iStream, footerReadBufferSizeConfig); // Verify that when builder is used value set in parameters is used - getAbfsStore(fs).getAbfsConfiguration().unset(AZURE_FOOTER_READ_BUFFER_SIZE); + spiedConfig.unset(AZURE_FOOTER_READ_BUFFER_SIZE); FutureDataInputStreamBuilder builder = fs.openFile(testFilePath); builder.opt(AZURE_FOOTER_READ_BUFFER_SIZE, footerReadBufferSizeBuilder); @@ -404,15 +560,13 @@ public void testFooterReadBufferSizeConfiguration() throws Exception { // Verify that when builder is used value set in parameters is used // even if config is set - getAbfsStore(fs).getAbfsConfiguration() - .setFooterReadBufferSize(footerReadBufferSizeConfig); + Mockito.doReturn(footerReadBufferSizeConfig).when(spiedConfig).getFooterReadBufferSize(); iStream = builder.build().get(); verifyConfigValueInStream(iStream, footerReadBufferSizeBuilder); // Verify that when the builder is used and parameter in builder is not set, // the value set in configuration is used - getAbfsStore(fs).getAbfsConfiguration() - .setFooterReadBufferSize(footerReadBufferSizeConfig); + Mockito.doReturn(footerReadBufferSizeConfig).when(spiedConfig).getFooterReadBufferSize(); builder = fs.openFile(testFilePath); iStream = builder.build().get(); verifyConfigValueInStream(iStream, footerReadBufferSizeConfig); @@ -431,8 +585,8 @@ private void verifyConfigValueInStream(final FSDataInputStream inputStream, private Path createPathAndFileWithContent(final AzureBlobFileSystem fs, final int fileIdx, final int fileSize) throws Exception { String fileName = methodName.getMethodName() + fileIdx; - byte[] fileContent = getRandomBytesArray(fileSize); - return createFileWithContent(fs, fileName, fileContent); + byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize); + return abfsInputStreamTestUtils.createFileWithContent(fs, fileName, fileContent); } private FutureDataInputStreamBuilder getParameterizedBuilder(final Path path, @@ -443,27 +597,45 @@ private FutureDataInputStreamBuilder getParameterizedBuilder(final Path path, return builder; } - private AzureBlobFileSystem getFileSystem(final boolean optimizeFooterRead, - final int fileSize) throws IOException { - final AzureBlobFileSystem fs = getFileSystem(); - AzureBlobFileSystemStore store = getAbfsStore(fs); - store.getAbfsConfiguration().setOptimizeFooterRead(optimizeFooterRead); - if (fileSize <= store.getAbfsConfiguration().getReadBufferSize()) { - store.getAbfsConfiguration().setReadSmallFilesCompletely(false); + private void changeFooterConfigs(final AzureBlobFileSystem spiedFs, + final boolean optimizeFooterRead, final int fileSize, + final int readBufferSize) { + AbfsConfiguration configuration = spiedFs.getAbfsStore() + .getAbfsConfiguration(); + Mockito.doReturn(optimizeFooterRead) + .when(configuration) + .optimizeFooterRead(); + if (fileSize <= readBufferSize) { + Mockito.doReturn(false).when(configuration).readSmallFilesCompletely(); } - return fs; } - private AzureBlobFileSystem getFileSystem(final boolean optimizeFooterRead, - final int fileSize, final int footerReadBufferSize) throws IOException { - final AzureBlobFileSystem fs = getFileSystem(); - AzureBlobFileSystemStore store = getAbfsStore(fs); - store.getAbfsConfiguration().setOptimizeFooterRead(optimizeFooterRead); - store.getAbfsConfiguration().setFooterReadBufferSize(footerReadBufferSize); - if (fileSize <= store.getAbfsConfiguration().getReadBufferSize()) { - store.getAbfsConfiguration().setReadSmallFilesCompletely(false); + private AzureBlobFileSystem createSpiedFs(Configuration configuration) + throws IOException { + AzureBlobFileSystem spiedFs = Mockito.spy( + (AzureBlobFileSystem) FileSystem.newInstance(configuration)); + AzureBlobFileSystemStore store = Mockito.spy(spiedFs.getAbfsStore()); + Mockito.doReturn(store).when(spiedFs).getAbfsStore(); + AbfsConfiguration spiedConfig = Mockito.spy(store.getAbfsConfiguration()); + Mockito.doReturn(spiedConfig).when(store).getAbfsConfiguration(); + return spiedFs; + } + + private void changeFooterConfigs(final AzureBlobFileSystem spiedFs, + final boolean optimizeFooterRead, final int fileSize, + final int footerReadBufferSize, final int readBufferSize) { + AbfsConfiguration configuration = spiedFs.getAbfsStore() + .getAbfsConfiguration(); + Mockito.doReturn(optimizeFooterRead) + .when(configuration) + .optimizeFooterRead(); + Mockito.doReturn(footerReadBufferSize) + .when(configuration) + .getFooterReadBufferSize(); + Mockito.doReturn(readBufferSize).when(configuration).getReadBufferSize(); + if (fileSize <= readBufferSize) { + Mockito.doReturn(false).when(configuration).readSmallFilesCompletely(); } - return fs; } private enum SeekTo { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java index baa664d6e7fee..64fac9ca94ed8 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; @@ -40,9 +41,12 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; -public class ITestAbfsInputStreamSmallFileReads extends ITestAbfsInputStream { +public class ITestAbfsInputStreamSmallFileReads extends + AbstractAbfsIntegrationTest { + private final AbfsInputStreamTestUtils abfsInputStreamTestUtils; public ITestAbfsInputStreamSmallFileReads() throws Exception { + this.abfsInputStreamTestUtils = new AbfsInputStreamTestUtils(this); } @Test @@ -58,12 +62,20 @@ public void testMultipleServerCallsAreMadeWhenTheConfIsFalse() private void testNumBackendCalls(boolean readSmallFilesCompletely) throws Exception { - final AzureBlobFileSystem fs = getFileSystem(readSmallFilesCompletely); + try (AzureBlobFileSystem fs = abfsInputStreamTestUtils.getFileSystem( + readSmallFilesCompletely)) { + validateNumBackendCalls(readSmallFilesCompletely, fs); + } + } + + private void validateNumBackendCalls(final boolean readSmallFilesCompletely, + final AzureBlobFileSystem fs) + throws IOException, NoSuchFieldException, IllegalAccessException { for (int i = 1; i <= 4; i++) { String fileName = methodName.getMethodName() + i; int fileSize = i * ONE_MB; - byte[] fileContent = getRandomBytesArray(fileSize); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); + byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize); + Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, fileName, fileContent); int length = ONE_KB; try (FSDataInputStream iStream = fs.open(testFilePath)) { byte[] buffer = new byte[length]; @@ -158,12 +170,23 @@ public void testSeekToMiddleAndReadBigFileWithConfFalse() throws Exception { private void testSeekAndReadWithConf(SeekTo seekTo, int startFileSizeInMB, int endFileSizeInMB, boolean readSmallFilesCompletely) throws Exception { - final AzureBlobFileSystem fs = getFileSystem(readSmallFilesCompletely); + try (AzureBlobFileSystem fs = abfsInputStreamTestUtils.getFileSystem( + readSmallFilesCompletely)) { + validateSeekAndReadWithConf(seekTo, startFileSizeInMB, endFileSizeInMB, + fs); + } + } + + private void validateSeekAndReadWithConf(final SeekTo seekTo, + final int startFileSizeInMB, + final int endFileSizeInMB, + final AzureBlobFileSystem fs) + throws IOException, NoSuchFieldException, IllegalAccessException { for (int i = startFileSizeInMB; i <= endFileSizeInMB; i++) { String fileName = methodName.getMethodName() + i; int fileSize = i * ONE_MB; - byte[] fileContent = getRandomBytesArray(fileSize); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); + byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize); + Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, fileName, fileContent); int length = ONE_KB; int seekPos = seekPos(seekTo, fileSize, length); seekReadAndTest(fs, testFilePath, seekPos, length, fileContent); @@ -183,13 +206,13 @@ private int seekPos(SeekTo seekTo, int fileSize, int length) { private void seekReadAndTest(FileSystem fs, Path testFilePath, int seekPos, int length, byte[] fileContent) throws IOException, NoSuchFieldException, IllegalAccessException { - AbfsConfiguration conf = getAbfsStore(fs).getAbfsConfiguration(); + AbfsConfiguration conf = getConfiguration((AzureBlobFileSystem) fs); try (FSDataInputStream iStream = fs.open(testFilePath)) { - seek(iStream, seekPos); + abfsInputStreamTestUtils.seek(iStream, seekPos); byte[] buffer = new byte[length]; int bytesRead = iStream.read(buffer, 0, length); assertEquals(bytesRead, length); - assertContentReadCorrectly(fileContent, seekPos, length, buffer, testFilePath); + abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, seekPos, length, buffer, testFilePath); AbfsInputStream abfsInputStream = (AbfsInputStream) iStream .getWrappedStream(); @@ -199,15 +222,15 @@ private void seekReadAndTest(FileSystem fs, Path testFilePath, int seekPos, int expectedLimit, expectedFCursor; int expectedBCursor; if (conf.readSmallFilesCompletely() && smallFile) { - assertBuffersAreEqual(fileContent, abfsInputStream.getBuffer(), conf, testFilePath); + abfsInputStreamTestUtils.assertAbfsInputStreamBufferNotEqualToContentStartSubsequence(fileContent, abfsInputStream, conf, testFilePath); expectedFCursor = fileContentLength; expectedLimit = fileContentLength; expectedBCursor = seekPos + length; } else { if ((seekPos == 0)) { - assertBuffersAreEqual(fileContent, abfsInputStream.getBuffer(), conf, testFilePath); + abfsInputStreamTestUtils.assertAbfsInputStreamBufferNotEqualToContentStartSubsequence(fileContent, abfsInputStream, conf, testFilePath); } else { - assertBuffersAreNotEqual(fileContent, abfsInputStream.getBuffer(), + abfsInputStreamTestUtils.assertAbfsInputStreamBufferEqualToContentStartSubsequence(fileContent, abfsInputStream, conf, testFilePath); } expectedBCursor = length; @@ -229,12 +252,16 @@ private void seekReadAndTest(FileSystem fs, Path testFilePath, int seekPos, public void testPartialReadWithNoData() throws Exception { for (int i = 2; i <= 4; i++) { int fileSize = i * ONE_MB; - final AzureBlobFileSystem fs = getFileSystem(true); - String fileName = methodName.getMethodName() + i; - byte[] fileContent = getRandomBytesArray(fileSize); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - partialReadWithNoData(fs, testFilePath, fileSize / 2, fileSize / 4, - fileContent); + try (AzureBlobFileSystem fs = abfsInputStreamTestUtils.getFileSystem( + true)) { + String fileName = methodName.getMethodName() + i; + byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray( + fileSize); + Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, + fileName, fileContent); + partialReadWithNoData(fs, testFilePath, fileSize / 2, fileSize / 4, + fileContent); + } } } @@ -256,11 +283,11 @@ private void partialReadWithNoData(final FileSystem fs, any(TracingContext.class)); iStream = new FSDataInputStream(abfsInputStream); - seek(iStream, seekPos); + abfsInputStreamTestUtils.seek(iStream, seekPos); byte[] buffer = new byte[length]; int bytesRead = iStream.read(buffer, 0, length); assertEquals(bytesRead, length); - assertContentReadCorrectly(fileContent, seekPos, length, buffer, testFilePath); + abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, seekPos, length, buffer, testFilePath); assertEquals(fileContent.length, abfsInputStream.getFCursor()); assertEquals(fileContent.length, abfsInputStream.getFCursorAfterLastRead()); @@ -275,12 +302,16 @@ private void partialReadWithNoData(final FileSystem fs, public void testPartialReadWithSomeData() throws Exception { for (int i = 2; i <= 4; i++) { int fileSize = i * ONE_MB; - final AzureBlobFileSystem fs = getFileSystem(true); - String fileName = methodName.getMethodName() + i; - byte[] fileContent = getRandomBytesArray(fileSize); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - partialReadWithSomeData(fs, testFilePath, fileSize / 2, - fileSize / 4, fileContent); + try (AzureBlobFileSystem fs = abfsInputStreamTestUtils.getFileSystem( + true)) { + String fileName = methodName.getMethodName() + i; + byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray( + fileSize); + Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, + fileName, fileContent); + partialReadWithSomeData(fs, testFilePath, fileSize / 2, + fileSize / 4, fileContent); + } } } @@ -307,7 +338,7 @@ private void partialReadWithSomeData(final FileSystem fs, any(TracingContext.class)); iStream = new FSDataInputStream(abfsInputStream); - seek(iStream, seekPos); + abfsInputStreamTestUtils.seek(iStream, seekPos); byte[] buffer = new byte[length]; int bytesRead = iStream.read(buffer, 0, length);