Skip to content

Commit

Permalink
HBASE-28338 Bounded leak of FSDataInputStream buffers from checksum s…
Browse files Browse the repository at this point in the history
…witching (#5660)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
bbeaudreault committed Feb 5, 2024
1 parent 972471b commit c28e285
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,13 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.io.Closeables;

Expand All @@ -40,8 +36,6 @@
*/
@InterfaceAudience.Private
public class FSDataInputStreamWrapper implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FSDataInputStreamWrapper.class);
private static final boolean isLogTraceEnabled = LOG.isTraceEnabled();

private final HFileSystem hfs;
private final Path path;
Expand Down Expand Up @@ -94,9 +88,6 @@ private static class ReadStatistics {
long totalZeroCopyBytesRead;
}

private Boolean instanceOfCanUnbuffer = null;
private CanUnbuffer unbuffer = null;

protected Path readerPath;

public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
Expand Down Expand Up @@ -314,41 +305,22 @@ public HFileSystem getHfs() {
* stream, the current socket will be closed and a new socket will be opened to serve the
* requests.
*/
@SuppressWarnings({ "rawtypes" })
public void unbuffer() {
// todo: it may make sense to always unbuffer both streams. we'd need to carefully
// research the usages to know if that is safe. for now just do the current.
FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
if (stream != null) {
InputStream wrappedStream = stream.getWrappedStream();
// CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop
// 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the
// CanUnbuffer interface or not and based on that call the unbuffer api.
final Class<? extends InputStream> streamClass = wrappedStream.getClass();
if (this.instanceOfCanUnbuffer == null) {
// To ensure we compute whether the stream is instance of CanUnbuffer only once.
this.instanceOfCanUnbuffer = false;
if (wrappedStream instanceof CanUnbuffer) {
this.unbuffer = (CanUnbuffer) wrappedStream;
this.instanceOfCanUnbuffer = true;
}
}
if (this.instanceOfCanUnbuffer) {
try {
this.unbuffer.unbuffer();
} catch (UnsupportedOperationException e) {
if (isLogTraceEnabled) {
LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass
+ " . So there may be the stream does not support unbuffering.", e);
}
}
} else {
if (isLogTraceEnabled) {
LOG.trace("Failed to find 'unbuffer' method in class " + streamClass);
}
}
stream.unbuffer();
}
}

public Path getReaderPath() {
return readerPath;
}

// For tests
void setShouldUseHBaseChecksum() {
useHBaseChecksumConfigured = true;
useHBaseChecksum = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
Expand All @@ -31,6 +32,7 @@
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.io.ByteBufferPool;
Expand All @@ -48,22 +50,22 @@ public class TestFSDataInputStreamWrapper {
@Test
public void testUnbuffer() throws Exception {
InputStream pc = new ParentClass();
FSDataInputStreamWrapper fsdisw1 = new FSDataInputStreamWrapper(new FSDataInputStream(pc));
InputStream noChecksumPc = new ParentClass();
FSDataInputStreamWrapper fsdisw1 =
new FSDataInputStreamWrapper(new FSDataInputStream(pc), new FSDataInputStream(noChecksumPc));
fsdisw1.unbuffer();
// parent class should be true
// should have called main stream unbuffer, but not no-checksum
assertTrue(((ParentClass) pc).getIsCallUnbuffer());
assertFalse(((ParentClass) noChecksumPc).getIsCallUnbuffer());
// switch to checksums and call unbuffer again. should unbuffer the nochecksum stream now
fsdisw1.setShouldUseHBaseChecksum();
fsdisw1.unbuffer();
assertTrue(((ParentClass) noChecksumPc).getIsCallUnbuffer());
fsdisw1.close();

InputStream cc1 = new ChildClass1();
FSDataInputStreamWrapper fsdisw2 = new FSDataInputStreamWrapper(new FSDataInputStream(cc1));
fsdisw2.unbuffer();
// child1 class should be true
assertTrue(((ChildClass1) cc1).getIsCallUnbuffer());
fsdisw2.close();
}

private class ParentClass extends FSInputStream implements ByteBufferReadable, CanSetDropBehind,
CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer {
CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {

public boolean isCallUnbuffer = false;

Expand Down Expand Up @@ -122,12 +124,10 @@ public long getPos() throws IOException {
public boolean seekToNewSource(long paramLong) throws IOException {
return false;
}
}

private class ChildClass1 extends ParentClass {
@Override
public void unbuffer() {
isCallUnbuffer = true;
public boolean hasCapability(String s) {
return s.equals(StreamCapabilities.UNBUFFER);
}
}
}

0 comments on commit c28e285

Please sign in to comment.