Skip to content

Commit

Permalink
review comments;
Browse files Browse the repository at this point in the history
  • Loading branch information
saxenapranav committed Apr 16, 2024
1 parent b0733c9 commit bd4f396
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -31,6 +32,8 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -58,6 +61,7 @@
@InterfaceStability.Unstable
public final class FutureIO {

private static final Logger log = LoggerFactory.getLogger(FutureIO.class.getName());
private FutureIO() {
}

Expand Down Expand Up @@ -132,7 +136,7 @@ public static <T> T awaitFuture(final Future<T> future,
* @throws IOException if something went wrong
* @throws RuntimeException any nested RTE thrown
*/
public static <T> List<T> awaitFuture(final Collection<Future<T>> collection)
public static <T> List<T> awaitAllFutures(final Collection<Future<T>> collection)
throws InterruptedIOException, IOException, RuntimeException {
List<T> results = new ArrayList<>();
try {
Expand All @@ -141,9 +145,11 @@ public static <T> List<T> awaitFuture(final Collection<Future<T>> collection)
}
return results;
} catch (InterruptedException e) {
log.error("Execution of future interrupted ", e);
throw (InterruptedIOException) new InterruptedIOException(e.toString())
.initCause(e);
} catch (ExecutionException e) {
log.error("Execution of future failed with exception", e.getCause());
return raiseInnerCause(e);
}
}
Expand All @@ -158,30 +164,30 @@ public static <T> List<T> awaitFuture(final Collection<Future<T>> collection)
* </p>
*
* @param collection collection of futures to be evaluated
* @param timeout timeout to wait
* @param unit time unit.
* @param duration timeout duration
* @param <T> type of the result.
* @return the list of future's result, if all went well.
* @throws InterruptedIOException future was interrupted
* @throws IOException if something went wrong
* @throws RuntimeException any nested RTE thrown
* @throws TimeoutException the future timed out.
*/
public static <T> List<T> awaitFuture(final Collection<Future<T>> collection,
final long timeout,
final TimeUnit unit)
public static <T> List<T> awaitAllFutures(final Collection<Future<T>> collection,
final Duration duration)
throws InterruptedIOException, IOException, RuntimeException,
TimeoutException {
List<T> results = new ArrayList<>();
try {
for (Future<T> future : collection) {
results.add(future.get(timeout, unit));
results.add(future.get(duration.toMillis(), TimeUnit.MILLISECONDS));
}
return results;
} catch (InterruptedException e) {
log.error("Execution of future interrupted ", e);
throw (InterruptedIOException) new InterruptedIOException(e.toString())
.initCause(e);
} catch (ExecutionException e) {
log.error("Execution of future failed with exception", e.getCause());
return raiseInnerCause(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,14 @@ public AbfsConfiguration getConfiguration() {
return abfsConfig;
}

public AbfsConfiguration getConfiguration(AzureBlobFileSystem fs) {
return fs.getAbfsStore().getAbfsConfiguration();
}

public Map<String, Long> getInstrumentationMap(AzureBlobFileSystem fs) {
return fs.getInstrumentationMap();
}

public Configuration getRawConfiguration() {
return abfsConfig.getRawConfiguration();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,22 @@
import java.util.Random;
import java.util.UUID;

import org.assertj.core.api.Assertions;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;

import static org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest.SHORTENED_GUID_LEN;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_OPTIMIZE_FOOTER_READ;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_SMALL_FILES_COMPLETELY;

public class AbfsInputStreamTestUtils {

Expand Down Expand Up @@ -72,24 +74,46 @@ private Path getUniquePath(String filepath) {
.right(UUID.randomUUID().toString(), SHORTENED_GUID_LEN));
}

/**
* Returns AzureBlobFileSystem instance with the required
* readFullFileOptimization configuration.
*
* @param readSmallFilesCompletely whether to read small files completely
* @return AzureBlobFileSystem instance
* @throws IOException exception in creating fileSystem
*/
public AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely)
throws IOException {
final AzureBlobFileSystem fs = abstractAbfsIntegrationTest.getFileSystem();
abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
.setReadSmallFilesCompletely(readSmallFilesCompletely);
abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
.setOptimizeFooterRead(false);
abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
.setIsChecksumValidationEnabled(true);
return fs;
Configuration configuration = new Configuration(
abstractAbfsIntegrationTest.getRawConfiguration());
configuration.setBoolean(AZURE_READ_SMALL_FILES_COMPLETELY,
readSmallFilesCompletely);
configuration.setBoolean(AZURE_READ_OPTIMIZE_FOOTER_READ, false);
return (AzureBlobFileSystem) FileSystem.newInstance(configuration);
}

/**
* Return array of random bytes of the given length.
*
* @param length length of the byte array
* @return byte array
*/
public byte[] getRandomBytesArray(int length) {
final byte[] b = new byte[length];
new Random().nextBytes(b);
return b;
}

/**
* Create a file on the file system with the given file name and content.
*
* @param fs fileSystem that stores the file
* @param fileName name of the file
* @param fileContent content of the file
*
* @return path of the file created
* @throws IOException exception in writing file on fileSystem
*/
public Path createFileWithContent(FileSystem fs, String fileName,
byte[] fileContent) throws IOException {
Path testFilePath = path(fileName);
Expand All @@ -100,38 +124,52 @@ public Path createFileWithContent(FileSystem fs, String fileName,
return testFilePath;
}

public AzureBlobFileSystemStore getAbfsStore(FileSystem fs)
throws NoSuchFieldException, IllegalAccessException {
AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
Field abfsStoreField = AzureBlobFileSystem.class
.getDeclaredField("abfsStore");
abfsStoreField.setAccessible(true);
return (AzureBlobFileSystemStore) abfsStoreField.get(abfs);
}

public Map<String, Long> getInstrumentationMap(FileSystem fs)
throws NoSuchFieldException, IllegalAccessException {
AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
Field abfsCountersField = AzureBlobFileSystem.class
.getDeclaredField("abfsCounters");
abfsCountersField.setAccessible(true);
AbfsCounters abfsCounters = (AbfsCounters) abfsCountersField.get(abfs);
return abfsCounters.toMap();
}

/**
* Assert that the content read from the subsection of a file is correct.
*
* @param actualFileContent actual content of the file
* @param from start index of the content read
* @param len length of the content read
* @param contentRead content read from the file
* @param testFilePath path of the file
*/
public void assertContentReadCorrectly(byte[] actualFileContent, int from,
int len, byte[] contentRead, Path testFilePath) {
Assertions.assertThat(actualFileContent.length)
.describedAs("From + len should be less than the actual file content length")
.isGreaterThanOrEqualTo(from + len);
for (int i = 0; i < len; i++) {
assertEquals("The test file path is " + testFilePath, contentRead[i],
actualFileContent[i + from]);
Assertions.assertThat(contentRead[i])
.describedAs(
"The test file path is " + testFilePath + ". Equality failed"
+ "at index " + i
+ " of the contentRead array. ActualFileContent is being compared from index "
+ from)
.isEqualTo(actualFileContent[i + from]);
}
}

/**
* Assert that the content read is not equal to the actual content of the file.
*
* @param actualContent actual content of the file
* @param contentRead content read from the file
* @param conf configuration
* @param testFilePath path of the file
*/
public void assertBuffersAreNotEqual(byte[] actualContent,
byte[] contentRead, AbfsConfiguration conf, Path testFilePath) {
assertBufferEquality(actualContent, contentRead, conf, false, testFilePath);
}

/**
* Assert that the content read is equal to the actual content of the file.
*
* @param actualContent actual content of the file
* @param contentRead content read from the file
* @param conf configuration
* @param testFilePath path of the file
*/
public void assertBuffersAreEqual(byte[] actualContent, byte[] contentRead,
AbfsConfiguration conf, Path testFilePath) {
assertBufferEquality(actualContent, contentRead, conf, true, testFilePath);
Expand All @@ -141,41 +179,80 @@ private void assertBufferEquality(byte[] actualContent, byte[] contentRead,
AbfsConfiguration conf, boolean assertEqual, Path testFilePath) {
int bufferSize = conf.getReadBufferSize();
int actualContentSize = actualContent.length;
int n = (actualContentSize < bufferSize) ? actualContentSize : bufferSize;
int n = Math.min(actualContentSize, bufferSize);
int matches = 0;
for (int i = 0; i < n; i++) {
for (int i = 0; i < n && i < contentRead.length; i++) {
if (actualContent[i] == contentRead[i]) {
matches++;
}
}
if (assertEqual) {
assertEquals("The test file path is " + testFilePath, n, matches);
Assertions.assertThat(matches).describedAs(
"The test file path is " + testFilePath).isEqualTo(n);
} else {
assertNotEquals("The test file path is " + testFilePath, n, matches);
Assertions.assertThat(matches).describedAs(
"The test file path is " + testFilePath).isNotEqualTo(n);
}
}

/**
* Seek inputStream to the given seekPos.
*
* @param iStream inputStream to seek
* @param seekPos position to seek
* @throws IOException exception in seeking inputStream
*/
public void seek(FSDataInputStream iStream, long seekPos)
throws IOException {
AbfsInputStream abfsInputStream
= (AbfsInputStream) iStream.getWrappedStream();
verifyBeforeSeek(abfsInputStream);
verifyAbfsInputStreamBaseStateBeforeSeek(abfsInputStream);
iStream.seek(seekPos);
verifyAfterSeek(abfsInputStream, seekPos);
verifyAbfsInputSteramStateAfterSeek(abfsInputStream, seekPos);
}

public void verifyBeforeSeek(AbfsInputStream abfsInputStream) {
assertEquals(0, abfsInputStream.getFCursor());
assertEquals(-1, abfsInputStream.getFCursorAfterLastRead());
assertEquals(0, abfsInputStream.getLimit());
assertEquals(0, abfsInputStream.getBCursor());
/**
* Verifies that the pointers in AbfsInputStream state are unchanged and are
* equal to that of a newly created inputStream.
*
* @param abfsInputStream inputStream to verify
*/
public void verifyAbfsInputStreamBaseStateBeforeSeek(AbfsInputStream abfsInputStream) {
Assertions.assertThat(abfsInputStream.getFCursor())
.describedAs("FCursor should be 0 at the inputStream open")
.isEqualTo(0);
Assertions.assertThat(abfsInputStream.getFCursorAfterLastRead())
.describedAs(
"FCursorAfterLastRead should be -1 at the inputStream open")
.isEqualTo(-1);
Assertions.assertThat(abfsInputStream.getLimit())
.describedAs("Limit should be 0 at the inputStream open")
.isEqualTo(0);
Assertions.assertThat(abfsInputStream.getBCursor())
.describedAs("BCursor should be 0 at the inputStream open")
.isEqualTo(0);
}

public void verifyAfterSeek(AbfsInputStream abfsInputStream, long seekPos)
throws IOException {
assertEquals(seekPos, abfsInputStream.getPos());
assertEquals(-1, abfsInputStream.getFCursorAfterLastRead());
assertEquals(0, abfsInputStream.getLimit());
assertEquals(0, abfsInputStream.getBCursor());
/**
* Verifies that only the FCursor is updated after seek and all other pointers
* are in their initial state.
*
* @param abfsInputStream inputStream to verify
* @param seekPos position to seek
*/
public void verifyAbfsInputSteramStateAfterSeek(AbfsInputStream abfsInputStream,
long seekPos) {
Assertions.assertThat(abfsInputStream.getFCursor())
.describedAs("FCursor should be " + seekPos + " after seek")
.isEqualTo(seekPos);
Assertions.assertThat(abfsInputStream.getFCursorAfterLastRead())
.describedAs("FCursorAfterLastRead should be -1 after seek")
.isEqualTo(-1);
Assertions.assertThat(abfsInputStream.getLimit())
.describedAs("Limit should be 0 after seek")
.isEqualTo(0);
Assertions.assertThat(abfsInputStream.getBCursor())
.describedAs("BCursor should be 0 after seek")
.isEqualTo(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void testExceptionInOptimization(final FileSystem fs,
any(TracingContext.class));

iStream = new FSDataInputStream(abfsInputStream);
abfsInputStreamTestUtils.verifyBeforeSeek(abfsInputStream);
abfsInputStreamTestUtils.verifyAbfsInputStreamBaseStateBeforeSeek(abfsInputStream);
abfsInputStreamTestUtils.seek(iStream, seekPos);
byte[] buffer = new byte[length];
int bytesRead = iStream.read(buffer, 0, length);
Expand Down

0 comments on commit bd4f396

Please sign in to comment.