From c28e285cb5cefd456963d6948ed6ad647c1847a9 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Mon, 5 Feb 2024 08:26:31 -0500 Subject: [PATCH] HBASE-28338 Bounded leak of FSDataInputStream buffers from checksum switching (#5660) Signed-off-by: Duo Zhang --- .../hbase/io/FSDataInputStreamWrapper.java | 46 ++++--------------- .../io/TestFSDataInputStreamWrapper.java | 28 +++++------ 2 files changed, 23 insertions(+), 51 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java index cb9dc84b94be..33eace47d632 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java @@ -19,17 +19,13 @@ import java.io.Closeable; import java.io.IOException; -import java.io.InputStream; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.io.Closeables; @@ -40,8 +36,6 @@ */ @InterfaceAudience.Private public class FSDataInputStreamWrapper implements Closeable { - private static final Logger LOG = LoggerFactory.getLogger(FSDataInputStreamWrapper.class); - private static final boolean isLogTraceEnabled = LOG.isTraceEnabled(); private final HFileSystem hfs; private final Path path; @@ -94,9 +88,6 @@ private static class ReadStatistics { long totalZeroCopyBytesRead; } - private Boolean instanceOfCanUnbuffer = null; - private CanUnbuffer unbuffer = null; - protected Path readerPath; public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { @@ -314,41 +305,22 @@ public HFileSystem getHfs() { * stream, the current socket will be closed and a new socket will be opened to serve the * requests. */ - @SuppressWarnings({ "rawtypes" }) public void unbuffer() { + // todo: it may make sense to always unbuffer both streams. we'd need to carefully + // research the usages to know if that is safe. for now just do the current. FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum()); if (stream != null) { - InputStream wrappedStream = stream.getWrappedStream(); - // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop - // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the - // CanUnbuffer interface or not and based on that call the unbuffer api. - final Class streamClass = wrappedStream.getClass(); - if (this.instanceOfCanUnbuffer == null) { - // To ensure we compute whether the stream is instance of CanUnbuffer only once. - this.instanceOfCanUnbuffer = false; - if (wrappedStream instanceof CanUnbuffer) { - this.unbuffer = (CanUnbuffer) wrappedStream; - this.instanceOfCanUnbuffer = true; - } - } - if (this.instanceOfCanUnbuffer) { - try { - this.unbuffer.unbuffer(); - } catch (UnsupportedOperationException e) { - if (isLogTraceEnabled) { - LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass - + " . So there may be the stream does not support unbuffering.", e); - } - } - } else { - if (isLogTraceEnabled) { - LOG.trace("Failed to find 'unbuffer' method in class " + streamClass); - } - } + stream.unbuffer(); } } public Path getReaderPath() { return readerPath; } + + // For tests + void setShouldUseHBaseChecksum() { + useHBaseChecksumConfigured = true; + useHBaseChecksum = true; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFSDataInputStreamWrapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFSDataInputStreamWrapper.java index bb476670f10f..77aa00ef91f9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFSDataInputStreamWrapper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFSDataInputStreamWrapper.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -31,6 +32,7 @@ import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.io.ByteBufferPool; @@ -48,22 +50,22 @@ public class TestFSDataInputStreamWrapper { @Test public void testUnbuffer() throws Exception { InputStream pc = new ParentClass(); - FSDataInputStreamWrapper fsdisw1 = new FSDataInputStreamWrapper(new FSDataInputStream(pc)); + InputStream noChecksumPc = new ParentClass(); + FSDataInputStreamWrapper fsdisw1 = + new FSDataInputStreamWrapper(new FSDataInputStream(pc), new FSDataInputStream(noChecksumPc)); fsdisw1.unbuffer(); - // parent class should be true + // should have called main stream unbuffer, but not no-checksum assertTrue(((ParentClass) pc).getIsCallUnbuffer()); + assertFalse(((ParentClass) noChecksumPc).getIsCallUnbuffer()); + // switch to checksums and call unbuffer again. should unbuffer the nochecksum stream now + fsdisw1.setShouldUseHBaseChecksum(); + fsdisw1.unbuffer(); + assertTrue(((ParentClass) noChecksumPc).getIsCallUnbuffer()); fsdisw1.close(); - - InputStream cc1 = new ChildClass1(); - FSDataInputStreamWrapper fsdisw2 = new FSDataInputStreamWrapper(new FSDataInputStream(cc1)); - fsdisw2.unbuffer(); - // child1 class should be true - assertTrue(((ChildClass1) cc1).getIsCallUnbuffer()); - fsdisw2.close(); } private class ParentClass extends FSInputStream implements ByteBufferReadable, CanSetDropBehind, - CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer { + CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities { public boolean isCallUnbuffer = false; @@ -122,12 +124,10 @@ public long getPos() throws IOException { public boolean seekToNewSource(long paramLong) throws IOException { return false; } - } - private class ChildClass1 extends ParentClass { @Override - public void unbuffer() { - isCallUnbuffer = true; + public boolean hasCapability(String s) { + return s.equals(StreamCapabilities.UNBUFFER); } } }