Skip to content

Commit

Permalink
HADOOP-17158. Test timeout for ITestAbfsInputStreamStatistics#testRea…
Browse files Browse the repository at this point in the history
…dAheadCounters (#2272)


Contributed by: Mehakmeet Singh.
  • Loading branch information
mehakmeet committed Sep 8, 2020
1 parent c4fb404 commit 84ed6ad
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,

/** Stream statistics. */
private final AbfsInputStreamStatistics streamStatistics;
private long bytesFromReadAhead; // bytes read from readAhead; for testing
private long bytesFromRemoteRead; // bytes read remotely; for testing

public AbfsInputStream(
final AbfsClient client,
Expand Down Expand Up @@ -235,6 +237,7 @@ private int readInternal(final long position, final byte[] b, final int offset,

// try reading from buffers first
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
bytesFromReadAhead += receivedBytes;
if (receivedBytes > 0) {
incrementReadOps();
LOG.debug("Received data from read ahead, not doing remote read");
Expand Down Expand Up @@ -302,6 +305,7 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
throw new IOException("Unexpected Content-Length");
}
LOG.debug("HTTP request read bytes = {}", bytesRead);
bytesFromRemoteRead += bytesRead;
return (int) bytesRead;
}

Expand Down Expand Up @@ -503,6 +507,26 @@ public AbfsInputStreamStatistics getStreamStatistics() {
return streamStatistics;
}

/**
* Getter for bytes read from readAhead buffer that fills asynchronously.
*
* @return value of the counter in long.
*/
@VisibleForTesting
public long getBytesFromReadAhead() {
return bytesFromReadAhead;
}

/**
* Getter for bytes read remotely from the data store.
*
* @return value of the counter in long.
*/
@VisibleForTesting
public long getBytesFromRemoteRead() {
return bytesFromRemoteRead;
}

/**
* Get the statistics of the stream.
* @return a string value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ public class ITestAbfsInputStreamStatistics
private static final int ONE_MB = 1024 * 1024;
private static final int ONE_KB = 1024;
private static final int CUSTOM_BLOCK_BUFFER_SIZE = 4 * 1024;
private static final int CUSTOM_READ_AHEAD_BUFFER_SIZE = 8 * CUSTOM_BLOCK_BUFFER_SIZE;
private static final int THREAD_SLEEP_10_SECONDS = 10;
private static final int TIMEOUT_30_SECONDS = 30000;
private byte[] defBuffer = new byte[ONE_MB];

public ITestAbfsInputStreamStatistics() throws Exception {
Expand Down Expand Up @@ -295,8 +292,8 @@ public void testWithNullStreamStatistics() throws IOException {
/**
* Testing readAhead counters in AbfsInputStream with 30 seconds timeout.
*/
@Test(timeout = TIMEOUT_30_SECONDS)
public void testReadAheadCounters() throws IOException, InterruptedException {
@Test
public void testReadAheadCounters() throws IOException {
describe("Test to check correct values for readAhead counters in "
+ "AbfsInputStream");

Expand Down Expand Up @@ -334,46 +331,35 @@ public void testReadAheadCounters() throws IOException, InterruptedException {
AbfsInputStreamStatisticsImpl stats =
(AbfsInputStreamStatisticsImpl) in.getStreamStatistics();

/*
* Since, readAhead is done in background threads. Sometimes, the
* threads aren't finished in the background and could result in
* inaccurate results. So, we wait till we have the accurate values
* with a limit of 30 seconds as that's when the test times out.
*
*/
while (stats.getRemoteBytesRead() < CUSTOM_READ_AHEAD_BUFFER_SIZE
|| stats.getReadAheadBytesRead() < CUSTOM_BLOCK_BUFFER_SIZE) {
Thread.sleep(THREAD_SLEEP_10_SECONDS);
}

/*
* Verifying the counter values of readAheadBytesRead and remoteBytesRead.
*
* readAheadBytesRead : Since, we read 1KBs 5 times, that means we go
* from 0 to 5KB in the file. The bufferSize is set to 4KB, and since
* we have 8 blocks of readAhead buffer. We would have 8 blocks of 4KB
* buffer. Our read is till 5KB, hence readAhead would ideally read 2
* blocks of 4KB which is equal to 8KB. But, sometimes to get more than
* one block from readAhead buffer we might have to wait for background
* blocks of 4KB which is equal to 8KB. But, sometimes to get blocks
* from readAhead buffer we might have to wait for background
* threads to fill the buffer and hence we might do remote read which
* would be faster. Therefore, readAheadBytesRead would be equal to or
* greater than 4KB.
* would be faster. Therefore, readAheadBytesRead would be greater than
* or equal to the value of bytesFromReadAhead at the point we measure it.
*
* remoteBytesRead : Since, the bufferSize is set to 4KB and the number
* of blocks or readAheadQueueDepth is equal to 8. We would read 8 * 4
* KB buffer on the first read, which is equal to 32KB. But, if we are not
* able to read some bytes that were in the buffer after doing
* readAhead, we might use remote read again. Thus, the bytes read
* remotely could also be greater than 32Kb.
* remotely would be greater than or equal to the bytesFromRemoteRead
* value that we measure at some point of the operation.
*
*/
Assertions.assertThat(stats.getReadAheadBytesRead()).describedAs(
"Mismatch in readAheadBytesRead counter value")
.isGreaterThanOrEqualTo(CUSTOM_BLOCK_BUFFER_SIZE);
.isGreaterThanOrEqualTo(in.getBytesFromReadAhead());

Assertions.assertThat(stats.getRemoteBytesRead()).describedAs(
"Mismatch in remoteBytesRead counter value")
.isGreaterThanOrEqualTo(CUSTOM_READ_AHEAD_BUFFER_SIZE);
.isGreaterThanOrEqualTo(in.getBytesFromRemoteRead());

} finally {
IOUtils.cleanupWithLogger(LOG, out, in);
Expand Down

0 comments on commit 84ed6ad

Please sign in to comment.