Skip to content

Commit

Permalink
HADOOP-17296. ABFS: Force reads to be always of buffer size.
Browse files Browse the repository at this point in the history
Contributed by Sneha Vijayarajan.
  • Loading branch information
snvijaya committed Nov 27, 2020
1 parent 03b4e98 commit 142941b
Show file tree
Hide file tree
Showing 11 changed files with 634 additions and 67 deletions.
Expand Up @@ -201,6 +201,16 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_READ_AHEAD_QUEUE_DEPTH)
private int readAheadQueueDepth;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_AHEAD_BLOCK_SIZE,
MinValue = MIN_BUFFER_SIZE,
MaxValue = MAX_BUFFER_SIZE,
DefaultValue = DEFAULT_READ_AHEAD_BLOCK_SIZE)
private int readAheadBlockSize;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ALWAYS_READ_BUFFER_SIZE,
DefaultValue = DEFAULT_ALWAYS_READ_BUFFER_SIZE)
private boolean alwaysReadBufferSize;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_FLUSH,
DefaultValue = DEFAULT_ENABLE_FLUSH)
private boolean enableFlush;
Expand Down Expand Up @@ -599,6 +609,14 @@ public int getReadAheadQueueDepth() {
return this.readAheadQueueDepth;
}

public int getReadAheadBlockSize() {
return this.readAheadBlockSize;
}

public boolean shouldReadBufferSizeAlways() {
return this.alwaysReadBufferSize;
}

public boolean isFlushEnabled() {
return this.enableFlush;
}
Expand Down
Expand Up @@ -644,6 +644,9 @@ private AbfsInputStreamContext populateAbfsInputStreamContext() {
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.withShouldReadBufferSizeAlways(
abfsConfiguration.shouldReadBufferSizeAlways())
.withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize())
.build();
}

Expand Down
Expand Up @@ -75,6 +75,8 @@ public final class ConfigurationKeys {
* Default is empty. **/
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize";
public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize";
/** Provides a config control to enable or disable ABFS Flush operations -
* HFlush and HSync. Default is true. **/
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
Expand Down
Expand Up @@ -57,6 +57,8 @@ public final class FileSystemConfigurations {
public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB
public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false;
public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB;
public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB
public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB
public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB
Expand All @@ -74,6 +76,7 @@ public final class FileSystemConfigurations {
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";

public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;

public static final boolean DEFAULT_ENABLE_FLUSH = true;
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
Expand Down
Expand Up @@ -47,6 +47,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
StreamCapabilities {
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);

private int readAheadBlockSize;
private final AbfsClient client;
private final Statistics statistics;
private final String path;
Expand All @@ -56,6 +57,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private final String eTag; // eTag of the path when InputStream are created
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
private final boolean readAheadEnabled; // whether enable readAhead;
private final boolean alwaysReadBufferSize;

// SAS tokens can be re-used until they expire
private CachedSASToken cachedSasToken;
Expand Down Expand Up @@ -89,9 +91,16 @@ public AbfsInputStream(
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag;
this.readAheadEnabled = true;
this.alwaysReadBufferSize
= abfsInputStreamContext.shouldReadBufferSizeAlways();
this.cachedSasToken = new CachedSASToken(
abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize();

// Propagate the config values to ReadBufferManager so that the first instance
// to initialize can set the readAheadBlockSize
ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize);
}

public String getPath() {
Expand Down Expand Up @@ -178,11 +187,15 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO
buffer = new byte[bufferSize];
}

// Enable readAhead when reading sequentially
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
if (alwaysReadBufferSize) {
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
// Enable readAhead when reading sequentially
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
}
}

if (bytesRead == -1) {
Expand Down Expand Up @@ -223,16 +236,19 @@ private int readInternal(final long position, final byte[] b, final int offset,

// queue read-aheads
int numReadAheads = this.readAheadQueueDepth;
long nextSize;
long nextOffset = position;
// First read to queue needs to be of readBufferSize and later
// of readAhead Block size
long nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads);
while (numReadAheads > 0 && nextOffset < contentLength) {
nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
nextOffset, nextSize);
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
nextOffset = nextOffset + nextSize;
numReadAheads--;
// From next round onwards should be of readahead block size.
nextSize = Math.min((long) readAheadBlockSize, contentLength - nextOffset);
}

// try reading from buffers first
Expand Down Expand Up @@ -527,6 +543,21 @@ public long getBytesFromRemoteRead() {
return bytesFromRemoteRead;
}

@VisibleForTesting
public int getBufferSize() {
return bufferSize;
}

@VisibleForTesting
public int getReadAheadQueueDepth() {
return readAheadQueueDepth;
}

@VisibleForTesting
public boolean shouldAlwaysReadBufferSize() {
return alwaysReadBufferSize;
}

/**
* Get the statistics of the stream.
* @return a string value.
Expand Down
Expand Up @@ -18,17 +18,26 @@

package org.apache.hadoop.fs.azurebfs.services;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class to hold extra input stream configs.
*/
public class AbfsInputStreamContext extends AbfsStreamContext {
// Retaining logger of AbfsInputStream
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);

private int readBufferSize;

private int readAheadQueueDepth;

private boolean tolerateOobAppends;

private boolean alwaysReadBufferSize;

private int readAheadBlockSize;

private AbfsInputStreamStatistics streamStatistics;

public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
Expand Down Expand Up @@ -60,7 +69,27 @@ public AbfsInputStreamContext withStreamStatistics(
return this;
}

public AbfsInputStreamContext withShouldReadBufferSizeAlways(
final boolean alwaysReadBufferSize) {
this.alwaysReadBufferSize = alwaysReadBufferSize;
return this;
}

public AbfsInputStreamContext withReadAheadBlockSize(
final int readAheadBlockSize) {
this.readAheadBlockSize = readAheadBlockSize;
return this;
}

public AbfsInputStreamContext build() {
if (readBufferSize > readAheadBlockSize) {
LOG.debug(
"fs.azure.read.request.size[={}] is configured for higher size than "
+ "fs.azure.read.readahead.blocksize[={}]. Auto-align "
+ "readAhead block size to be same as readRequestSize.",
readBufferSize, readAheadBlockSize);
readAheadBlockSize = readBufferSize;
}
// Validation of parameters to be done here.
return this;
}
Expand All @@ -80,4 +109,13 @@ public boolean isTolerateOobAppends() {
public AbfsInputStreamStatistics getStreamStatistics() {
return streamStatistics;
}

public boolean shouldReadBufferSizeAlways() {
return alwaysReadBufferSize;
}

public int getReadAheadBlockSize() {
return readAheadBlockSize;
}

}
Expand Up @@ -28,6 +28,7 @@
import java.util.Queue;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;

Expand All @@ -36,12 +37,14 @@
*/
final class ReadBufferManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
private static final int ONE_KB = 1024;
private static final int ONE_MB = ONE_KB * ONE_KB;

private static final int NUM_BUFFERS = 16;
private static final int BLOCK_SIZE = 4 * 1024 * 1024;
private static final int NUM_THREADS = 8;
private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold

private static int blockSize = 4 * ONE_MB;
private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS;
private Thread[] threads = new Thread[NUM_THREADS];
private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
Expand All @@ -50,21 +53,37 @@ final class ReadBufferManager {
private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
private static ReadBufferManager bufferManager; // singleton, initialized in static initialization block
private static final ReentrantLock LOCK = new ReentrantLock();

static {
BUFFER_MANAGER = new ReadBufferManager();
BUFFER_MANAGER.init();
static ReadBufferManager getBufferManager() {
if (bufferManager == null) {
LOCK.lock();
try {
if (bufferManager == null) {
bufferManager = new ReadBufferManager();
bufferManager.init();
}
} finally {
LOCK.unlock();
}
}
return bufferManager;
}

static ReadBufferManager getBufferManager() {
return BUFFER_MANAGER;
static void setReadBufferManagerConfigs(int readAheadBlockSize) {
if (bufferManager == null) {
LOGGER.debug(
"ReadBufferManager not initialized yet. Overriding readAheadBlockSize as {}",
readAheadBlockSize);
blockSize = readAheadBlockSize;
}
}

private void init() {
buffers = new byte[NUM_BUFFERS][];
for (int i = 0; i < NUM_BUFFERS; i++) {
buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC
buffers[i] = new byte[blockSize]; // same buffers are reused. The byte array never goes back to GC
freeList.add(i);
}
for (int i = 0; i < NUM_THREADS; i++) {
Expand Down Expand Up @@ -124,10 +143,10 @@ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, fi
buffer.setBufferindex(bufferIndex);
readAheadQueue.add(buffer);
notifyAll();
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
stream.getPath(), requestedOffset, buffer.getBufferindex());
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
stream.getPath(), requestedOffset, buffer.getBufferindex());
}
}
}

Expand Down Expand Up @@ -272,6 +291,7 @@ private synchronized boolean tryEvict() {
return evict(nodeToEvict);
}

LOGGER.trace("No buffer eligible for eviction");
// nothing can be evicted
return false;
}
Expand Down Expand Up @@ -483,6 +503,67 @@ void callTryEvict() {
tryEvict();
}

/**
* Test method that can clean up the current state of readAhead buffers and
* the lists. Will also trigger a fresh init.
*/
@VisibleForTesting
void testResetReadBufferManager() {
synchronized (this) {
ArrayList<ReadBuffer> completedBuffers = new ArrayList<>();
for (ReadBuffer buf : completedReadList) {
if (buf != null) {
completedBuffers.add(buf);
}
}

for (ReadBuffer buf : completedBuffers) {
evict(buf);
}

readAheadQueue.clear();
inProgressList.clear();
completedReadList.clear();
freeList.clear();
for (int i = 0; i < NUM_BUFFERS; i++) {
buffers[i] = null;
}
buffers = null;
resetBufferManager();
}
}

/**
* Reset buffer manager to null.
*/
@VisibleForTesting
static void resetBufferManager() {
bufferManager = null;
}

/**
* Reset readAhead buffer to needed readAhead block size and
* thresholdAgeMilliseconds.
* @param readAheadBlockSize
* @param thresholdAgeMilliseconds
*/
@VisibleForTesting
void testResetReadBufferManager(int readAheadBlockSize, int thresholdAgeMilliseconds) {
setBlockSize(readAheadBlockSize);
setThresholdAgeMilliseconds(thresholdAgeMilliseconds);
testResetReadBufferManager();
}

@VisibleForTesting
static void setBlockSize(int readAheadBlockSize) {
blockSize = readAheadBlockSize;
}

@VisibleForTesting
int getReadAheadBlockSize() {
return blockSize;
}

/**
* Test method that can mimic no free buffers scenario and also add a ReadBuffer
* into completedReadList. This readBuffer will get picked up by TryEvict()
Expand Down

0 comments on commit 142941b

Please sign in to comment.