Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19102. FooterReadBufferSize should not be greater than readBufferSize #6617

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
458f9bf
test method fixture; set of footerReadBufferSize
saxenapranav Mar 6, 2024
0ec18da
test method refactors
saxenapranav Mar 6, 2024
12ccd7f
made tests more parallelised and faster
saxenapranav Mar 7, 2024
dbca78b
formatting
saxenapranav Mar 8, 2024
0a1491a
set and unset executorservice; magic num for 256 KB
saxenapranav Mar 8, 2024
fd7189a
review refactors
saxenapranav Mar 11, 2024
e0108f8
static consts
saxenapranav Mar 11, 2024
18d88aa
FutureIO APIs added
saxenapranav Mar 13, 2024
6ed1297
test method refactor; FutureIO.awaitFuture new APIs use
saxenapranav Mar 13, 2024
b19fbed
Merge branch 'trunk' into saxenapranav/footerBufferSizeFix
saxenapranav Mar 13, 2024
eaa5550
javadocs fixture
saxenapranav Mar 15, 2024
514699b
Merge branch 'trunk' into saxenapranav/footerBufferSizeFix
saxenapranav Mar 26, 2024
2b4f68e
AbfsInputStreamTestUtil to have common methods
saxenapranav Mar 27, 2024
5a06792
removed ITestAbfsInputStream inheritence from ITestAbfsInputStreamSma…
saxenapranav Mar 27, 2024
b0733c9
Merge branch 'trunk' into saxenapranav/footerBufferSizeFix
saxenapranav Apr 16, 2024
bd4f396
review comments;
saxenapranav Apr 16, 2024
5bf2321
refactor in verifyAbsInputStreamStateAfterSeek
saxenapranav Apr 16, 2024
fbbf9fa
assertion on content read.
saxenapranav Apr 16, 2024
3cf5c72
refactored tests testing readBuffer: javadocs + explained that its fo…
saxenapranav Apr 16, 2024
8aad995
checkstyle
saxenapranav Apr 16, 2024
2354260
import block refactor; log.debug instead of error
saxenapranav Apr 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -34,6 +38,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSBuilder;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Future IO Helper methods.
* <p>
Expand All @@ -55,6 +62,7 @@
@InterfaceStability.Unstable
public final class FutureIO {

private static final Logger LOG = LoggerFactory.getLogger(FutureIO.class.getName());
private FutureIO() {
}

Expand Down Expand Up @@ -114,6 +122,77 @@ public static <T> T awaitFuture(final Future<T> future,
}
}

/**
* Evaluates a collection of futures and returns their results as a list.
* <p>
* This method blocks until all futures in the collection have completed.
* If any future throws an exception during its execution, this method
* extracts and rethrows that exception.
* </p>
*
* @param collection collection of futures to be evaluated
* @param <T> type of the result.
* @return the list of future's result, if all went well.
* @throws InterruptedIOException future was interrupted
* @throws IOException if something went wrong
* @throws RuntimeException any nested RTE thrown
*/
public static <T> List<T> awaitAllFutures(final Collection<Future<T>> collection)
throws InterruptedIOException, IOException, RuntimeException {
List<T> results = new ArrayList<>();
try {
for (Future<T> future : collection) {
results.add(future.get());
}
return results;
} catch (InterruptedException e) {
LOG.debug("Execution of future interrupted ", e);
throw (InterruptedIOException) new InterruptedIOException(e.toString())
.initCause(e);
} catch (ExecutionException e) {
LOG.debug("Execution of future failed with exception", e.getCause());
return raiseInnerCause(e);
steveloughran marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* Evaluates a collection of futures and returns their results as a list,
* but only waits up to the specified timeout for each future to complete.
* <p>
* This method blocks until all futures in the collection have completed or
* the timeout expires, whichever happens first. If any future throws an
* exception during its execution, this method extracts and rethrows that exception.
* </p>
*
* @param collection collection of futures to be evaluated
* @param duration timeout duration
* @param <T> type of the result.
* @return the list of future's result, if all went well.
* @throws InterruptedIOException future was interrupted
* @throws IOException if something went wrong
* @throws RuntimeException any nested RTE thrown
* @throws TimeoutException the future timed out.
*/
public static <T> List<T> awaitAllFutures(final Collection<Future<T>> collection,
final Duration duration)
throws InterruptedIOException, IOException, RuntimeException,
TimeoutException {
List<T> results = new ArrayList<>();
try {
for (Future<T> future : collection) {
results.add(future.get(duration.toMillis(), TimeUnit.MILLISECONDS));
}
return results;
} catch (InterruptedException e) {
LOG.debug("Execution of future interrupted ", e);
throw (InterruptedIOException) new InterruptedIOException(e.toString())
.initCause(e);
} catch (ExecutionException e) {
LOG.debug("Execution of future failed with exception", e.getCause());
return raiseInnerCause(e);
}
}

/**
* From the inner cause of an execution exception, extract the inner cause
* if it is an IOE or RTE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1230,11 +1230,6 @@ public void setOptimizeFooterRead(boolean optimizeFooterRead) {
this.optimizeFooterRead = optimizeFooterRead;
}

@VisibleForTesting
public void setFooterReadBufferSize(int footerReadBufferSize) {
this.footerReadBufferSize = footerReadBufferSize;
}

@VisibleForTesting
public void setEnableAbfsListIterator(boolean enableAbfsListIterator) {
this.enableAbfsListIterator = enableAbfsListIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private FSDataInputStream open(final Path path,
try {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.OPEN, tracingHeaderFormat, listener);
InputStream inputStream = abfsStore
InputStream inputStream = getAbfsStore()
.openFileForRead(qualifiedPath, parameters, statistics, tracingContext);
return new FSDataInputStream(inputStream);
} catch (AzureBlobFileSystemException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,21 +898,21 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
.map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false))
.orElse(false);
int footerReadBufferSize = options.map(c -> c.getInt(
AZURE_FOOTER_READ_BUFFER_SIZE, abfsConfiguration.getFooterReadBufferSize()))
.orElse(abfsConfiguration.getFooterReadBufferSize());
return new AbfsInputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
.isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled())
.withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
.withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize()))
.orElse(getAbfsConfiguration().getFooterReadBufferSize());
return new AbfsInputStreamContext(getAbfsConfiguration().getSasTokenRenewPeriodForStreamsInSeconds())
.withReadBufferSize(getAbfsConfiguration().getReadBufferSize())
.withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth())
.withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends())
.isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled())
.withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely())
.withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead())
.withFooterReadBufferSize(footerReadBufferSize)
.withReadAheadRange(abfsConfiguration.getReadAheadRange())
.withReadAheadRange(getAbfsConfiguration().getReadAheadRange())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.withShouldReadBufferSizeAlways(
abfsConfiguration.shouldReadBufferSizeAlways())
.withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize())
getAbfsConfiguration().shouldReadBufferSizeAlways())
.withReadAheadBlockSize(getAbfsConfiguration().getReadAheadBlockSize())
.withBufferedPreadDisabled(bufferedPreadDisabled)
.withEncryptionAdapter(contextEncryptionAdapter)
.withAbfsBackRef(fsBackRef)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ public AbfsInputStream(
this.path = path;
this.contentLength = contentLength;
this.bufferSize = abfsInputStreamContext.getReadBufferSize();
this.footerReadSize = abfsInputStreamContext.getFooterReadBufferSize();
/*
* FooterReadSize should not be more than bufferSize.
*/
this.footerReadSize = Math.min(bufferSize, abfsInputStreamContext.getFooterReadBufferSize());
this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public abstract class AbstractAbfsIntegrationTest extends
private AuthType authType;
private boolean useConfiguredFileSystem = false;
private boolean usingFilesystemForSASTests = false;
private static final int SHORTENED_GUID_LEN = 12;
public static final int SHORTENED_GUID_LEN = 12;

protected AbstractAbfsIntegrationTest() throws Exception {
fileSystemName = TEST_CONTAINER_PREFIX + UUID.randomUUID().toString();
Expand Down Expand Up @@ -366,6 +366,14 @@ public AbfsConfiguration getConfiguration() {
return abfsConfig;
}

public AbfsConfiguration getConfiguration(AzureBlobFileSystem fs) {
return fs.getAbfsStore().getAbfsConfiguration();
}

public Map<String, Long> getInstrumentationMap(AzureBlobFileSystem fs) {
return fs.getInstrumentationMap();
}

public Configuration getRawConfiguration() {
return abfsConfig.getRawConfiguration();
}
Expand Down
Loading
Loading