Skip to content

Commit

Permalink
always read ahead config added to abfs, to use read ahead even for no…
Browse files Browse the repository at this point in the history
…n sequential reads
  • Loading branch information
saurabhpant93 authored and sapant-msft committed Nov 20, 2019
1 parent 3aa737f commit 14444a8
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 5 deletions.
Expand Up @@ -146,6 +146,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_READ_AHEAD_QUEUE_DEPTH)
private int readAheadQueueDepth;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ALWAYS_READ_AHEAD,
DefaultValue = DEFAULT_FS_AZURE_ALWAYS_READ_AHEAD)
private boolean alwaysReadAhead;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_FLUSH,
DefaultValue = DEFAULT_ENABLE_FLUSH)
private boolean enableFlush;
Expand Down Expand Up @@ -435,6 +439,10 @@ public int getReadAheadQueueDepth() {
return this.readAheadQueueDepth;
}

public boolean getAlwaysReadAhead(){
return this.alwaysReadAhead;
}

public boolean isFlushEnabled() {
return this.enableFlush;
}
Expand Down
Expand Up @@ -446,9 +446,9 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist

// Add statistics for InputStream
return new AbfsInputStream(client, statistics,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(),
abfsConfiguration.getTolerateOobAppends(), eTag);
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(),
abfsConfiguration.getTolerateOobAppends(), eTag, abfsConfiguration.getAlwaysReadAhead());
}
}

Expand Down
Expand Up @@ -51,6 +51,7 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ALWAYS_READ_AHEAD = "fs.azure.always.readahead";
/** Provides a config control to enable or disable ABFS Flush operations -
* HFlush and HSync. Default is true. **/
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
Expand Down
Expand Up @@ -56,6 +56,7 @@ public final class FileSystemConfigurations {
public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase";

public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
public static final boolean DEFAULT_FS_AZURE_ALWAYS_READ_AHEAD = false;
public static final boolean DEFAULT_ENABLE_FLUSH = true;
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
Expand Down
Expand Up @@ -44,6 +44,7 @@ public class AbfsInputStream extends FSInputStream {
private final String eTag; // eTag of the path when InputStream are created
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
private final boolean readAheadEnabled; // whether enable readAhead;
private final boolean alwaysReadAhead; // read ahead even if reads are non sequential

private byte[] buffer = null; // will be initialized on first use

Expand All @@ -62,7 +63,8 @@ public AbfsInputStream(
final int bufferSize,
final int readAheadQueueDepth,
final boolean tolerateOobAppends,
final String eTag) {
final String eTag,
final boolean alwaysReadAhead) {
this.client = client;
this.statistics = statistics;
this.path = path;
Expand All @@ -72,6 +74,7 @@ public AbfsInputStream(
this.tolerateOobAppends = tolerateOobAppends;
this.eTag = eTag;
this.readAheadEnabled = true;
this.alwaysReadAhead = alwaysReadAhead;
}

public String getPath() {
Expand Down Expand Up @@ -144,7 +147,7 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO
}

// Enable readAhead when reading sequentially
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize || alwaysReadAhead) {
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
Expand Down

0 comments on commit 14444a8

Please sign in to comment.