Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-16696: ABFS Always read ahead config, to use read ahead even for non sequential reads. #1708

Open
wants to merge 11 commits into
base: trunk
Choose a base branch
from
Expand Up @@ -146,6 +146,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_READ_AHEAD_QUEUE_DEPTH)
private int readAheadQueueDepth;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ALWAYS_READ_AHEAD,
DefaultValue = DEFAULT_FS_AZURE_ALWAYS_READ_AHEAD)
private boolean alwaysReadAhead;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_FLUSH,
DefaultValue = DEFAULT_ENABLE_FLUSH)
private boolean enableFlush;
Expand Down Expand Up @@ -435,6 +439,10 @@ public int getReadAheadQueueDepth() {
return this.readAheadQueueDepth;
}

public boolean getAlwaysReadAhead(){
return this.alwaysReadAhead;
}

public boolean isFlushEnabled() {
return this.enableFlush;
}
Expand Down
Expand Up @@ -446,9 +446,7 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist

// Add statistics for InputStream
return new AbfsInputStream(client, statistics,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(),
abfsConfiguration.getTolerateOobAppends(), eTag);
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, eTag, abfsConfiguration);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that passing the full config down is/is not the right approach compared to passing the explicit options in. Can you justify this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of options we are passing to the Stream has increased to 4, causing Hadoop check style related issues (total arguments > 8). Also, increasing the number of arguments doesn't feel like a scalable approach, in my opinion. I agree, passing the whole abfsConfig object isn't very elegant either. How would you feel about a new abfsInputStreamConfig structure (class), to be used for passing stream related config options?

Thanks,
Saurabh

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer some structure "ReadContext" to pass in

}
}

Expand Down
Expand Up @@ -51,6 +51,7 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ALWAYS_READ_AHEAD = "fs.azure.always.readahead";
sapant-msft marked this conversation as resolved.
Show resolved Hide resolved
/** Provides a config control to enable or disable ABFS Flush operations -
* HFlush and HSync. Default is true. **/
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
Expand Down
Expand Up @@ -56,6 +56,7 @@ public final class FileSystemConfigurations {
public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase";

public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
public static final boolean DEFAULT_FS_AZURE_ALWAYS_READ_AHEAD = false;
public static final boolean DEFAULT_ENABLE_FLUSH = true;
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
sapant-msft marked this conversation as resolved.
Show resolved Hide resolved

/**
* The AbfsInputStream for AbfsClient.
Expand All @@ -44,6 +45,7 @@ public class AbfsInputStream extends FSInputStream {
private final String eTag; // eTag of the path when InputStream are created
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
private final boolean readAheadEnabled; // whether enable readAhead;
private final boolean alwaysReadAhead; // read ahead even if reads are non sequential

private byte[] buffer = null; // will be initialized on first use

Expand All @@ -59,19 +61,19 @@ public AbfsInputStream(
final Statistics statistics,
final String path,
final long contentLength,
final int bufferSize,
final int readAheadQueueDepth,
final boolean tolerateOobAppends,
final String eTag) {
final String eTag,
final AbfsConfiguration abfsConfiguration) {
this.client = client;
this.statistics = statistics;
this.path = path;
this.contentLength = contentLength;
this.bufferSize = bufferSize;
this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
this.tolerateOobAppends = tolerateOobAppends;
this.bufferSize = abfsConfiguration.getReadBufferSize();
this.readAheadQueueDepth = (abfsConfiguration.getReadAheadQueueDepth() >= 0)
? abfsConfiguration.getReadAheadQueueDepth() : Runtime.getRuntime().availableProcessors();
this.tolerateOobAppends = abfsConfiguration.getTolerateOobAppends();
this.eTag = eTag;
this.readAheadEnabled = true;
this.alwaysReadAhead = abfsConfiguration.getAlwaysReadAhead();
}

public String getPath() {
Expand Down Expand Up @@ -144,7 +146,7 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO
}

// Enable readAhead when reading sequentially
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize || alwaysReadAhead) {
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
Expand Down