From 78c5ac372550835133935a3022a0142880476297 Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Mon, 11 Dec 2023 23:22:13 +0800 Subject: [PATCH] HBASE-28190 Add slow sync log rolling test in TestAsyncLogRolling. (#5507) Signed-off-by: Duo Zhang --- .../hbase/regionserver/wal/AbstractFSWAL.java | 4 + .../hadoop/hbase/regionserver/wal/FSHLog.java | 8 - .../wal/AbstractTestLogRolling.java | 106 +++++++- .../regionserver/wal/TestAsyncLogRolling.java | 65 +++++ .../regionserver/wal/TestLogRolling.java | 234 ++++-------------- 5 files changed, 218 insertions(+), 199 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index acf3231d4e90..1a5b5384b01f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -2245,6 +2245,10 @@ private static void split(final Configuration conf, final Path p) throws IOExcep WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); } + W getWriter() { + return this.writer; + } + private static void usage() { System.err.println("Usage: AbstractFSWAL "); System.err.println("Arguments:"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index d0d5ce5f2e17..131f284557af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -603,14 +603,6 @@ DatanodeInfo[] getPipeline() { return new DatanodeInfo[0]; } - Writer getWriter() { - return this.writer; - } - - void setWriter(Writer writer) { - this.writer = writer; - } - @Override protected Writer createCombinedWriter(Writer localWriter, Writer remoteWriter) { // put remote writer first as usually it will cost more time to finish, so we write to it first diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java index 940dbebf614b..2a5aec458828 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -20,9 +20,13 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseTestingUtil; @@ -31,6 +35,7 @@ import org.apache.hadoop.hbase.SingleProcessHBaseCluster; import org.apache.hadoop.hbase.StartTestingClusterOption; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Get; @@ -48,8 +53,10 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -59,6 +66,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Test log deletion as logs are rolled. */ @@ -74,6 +83,10 @@ public abstract class AbstractTestLogRolling { protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); @Rule public final TestName name = new TestName(); + protected static int syncLatencyMillis; + private static int rowNum = 1; + private static final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); + protected static ScheduledExecutorService EXECUTOR; public AbstractTestLogRolling() { this.server = null; @@ -118,6 +131,17 @@ public static void setUpBeforeClass() throws Exception { // disable low replication check for log roller to get a more stable result // TestWALOpenAfterDNRollingStart will test this option. conf.setLong("hbase.regionserver.hlog.check.lowreplication.interval", 24L * 60 * 60 * 1000); + + // For slow sync threshold test: roll after 5 slow syncs in 10 seconds + conf.setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5); + conf.setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000); + // For slow sync threshold test: roll once after a sync above this threshold + conf.setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000); + + // Slow sync executor. + EXECUTOR = Executors + .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Slow-sync-%d") + .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); } @Before @@ -139,6 +163,11 @@ public void tearDown() throws Exception { TEST_UTIL.shutdownMiniCluster(); } + @AfterClass + public static void tearDownAfterClass() { + EXECUTOR.shutdownNow(); + } + private void startAndWriteData() throws IOException, InterruptedException { this.server = cluster.getRegionServerThreads().get(0).getRegionServer(); @@ -158,6 +187,74 @@ private void startAndWriteData() throws IOException, InterruptedException { } } + private static void setSyncLatencyMillis(int latency) { + syncLatencyMillis = latency; + } + + protected final AbstractFSWAL getWALAndRegisterSlowSyncHook(RegionInfo region) + throws IOException { + // Get a reference to the wal. + final AbstractFSWAL log = (AbstractFSWAL) server.getWAL(region); + + // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested + log.registerWALActionsListener(new WALActionsListener() { + @Override + public void logRollRequested(RollRequestReason reason) { + switch (reason) { + case SLOW_SYNC: + slowSyncHookCalled.lazySet(true); + break; + default: + break; + } + } + }); + return log; + } + + protected final void checkSlowSync(AbstractFSWAL log, Table table, int slowSyncLatency, + int writeCount, boolean slowSync) throws Exception { + if (slowSyncLatency > 0) { + setSyncLatencyMillis(slowSyncLatency); + setSlowLogWriter(log.conf); + } else { + setDefaultLogWriter(log.conf); + } + + // Set up for test + log.rollWriter(true); + slowSyncHookCalled.set(false); + + final WALProvider.WriterBase oldWriter = log.getWriter(); + + // Write some data + for (int i = 0; i < writeCount; i++) { + writeData(table, rowNum++); + } + + if (slowSync) { + TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { + @Override + public boolean evaluate() throws Exception { + return log.getWriter() != oldWriter; + } + + @Override + public String explainFailure() throws Exception { + return "Waited too long for our test writer to get rolled out"; + } + }); + + assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); + } else { + assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); + } + } + + protected abstract void setSlowLogWriter(Configuration conf); + + protected abstract void setDefaultLogWriter(Configuration conf); + /** * Tests that log rolling doesn't hang when no data is written. */ @@ -239,12 +336,10 @@ void validateData(Table table, int rownum) throws IOException { */ @Test public void testCompactionRecordDoesntBlockRolling() throws Exception { - Table table = null; // When the hbase:meta table can be opened, the region servers are running - Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); - try { - table = createTestTable(getName()); + try (Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); + Table table = createTestTable(getName())) { server = TEST_UTIL.getRSForFirstRegionInTable(table.getName()); HRegion region = server.getRegions(table.getName()).get(0); @@ -286,9 +381,6 @@ public void testCompactionRecordDoesntBlockRolling() throws Exception { log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added. assertEquals("Should have 1 WALs at the end", 1, AbstractFSWALProvider.getNumRolledLogFiles(log)); - } finally { - if (t != null) t.close(); - if (table != null) table.close(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java index 9dc27a693a7f..804e93eb8f56 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java @@ -20,10 +20,17 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; @@ -36,6 +43,9 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; + @Category({ VerySlowRegionServerTests.class, LargeTests.class }) public class TestAsyncLogRolling extends AbstractTestLogRolling { @@ -51,6 +61,61 @@ public static void setUpBeforeClass() throws Exception { AbstractTestLogRolling.setUpBeforeClass(); } + public static class SlowSyncLogWriter extends AsyncProtobufLogWriter { + + public SlowSyncLogWriter(EventLoopGroup eventLoopGroup, Class channelClass) { + super(eventLoopGroup, channelClass); + } + + @Override + public CompletableFuture sync(boolean forceSync) { + CompletableFuture future = new CompletableFuture<>(); + super.sync(forceSync).whenCompleteAsync((lengthAfterFlush, error) -> { + EXECUTOR.schedule(() -> { + if (error != null) { + future.completeExceptionally(error); + } else { + future.complete(lengthAfterFlush); + } + }, syncLatencyMillis, TimeUnit.MILLISECONDS); + }); + return future; + } + } + + @Override + protected void setSlowLogWriter(Configuration conf) { + conf.set(AsyncFSWALProvider.WRITER_IMPL, SlowSyncLogWriter.class.getName()); + } + + @Override + protected void setDefaultLogWriter(Configuration conf) { + conf.set(AsyncFSWALProvider.WRITER_IMPL, AsyncProtobufLogWriter.class.getName()); + } + + @Test + public void testSlowSyncLogRolling() throws Exception { + // Create the test table + TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); + admin.createTable(desc); + try (Table table = TEST_UTIL.getConnection().getTable(desc.getTableName())) { + server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); + RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); + final AbstractFSWAL log = getWALAndRegisterSlowSyncHook(region); + + // Set default log writer, no additional latency to any sync on the hlog. + checkSlowSync(log, table, -1, 10, false); + + // Adds 5000 ms of latency to any sync on the hlog. This will trip the other threshold. + // Write some data. Should only take one sync. + checkSlowSync(log, table, 5000, 1, true); + + // Set default log writer, no additional latency to any sync on the hlog. + checkSlowSync(log, table, -1, 10, false); + } + } + @Test public void testLogRollOnDatanodeDeath() throws IOException, InterruptedException { dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 3, true, null, null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index f07a02cb25d1..9caa47e8614b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; @@ -56,10 +55,9 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WALStreamReader; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -98,192 +96,30 @@ public static void setUpBeforeClass() throws Exception { conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); conf.set(WALFactory.WAL_PROVIDER, "filesystem"); AbstractTestLogRolling.setUpBeforeClass(); - - // For slow sync threshold test: roll after 5 slow syncs in 10 seconds - TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5); - TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000); - // For slow sync threshold test: roll once after a sync above this threshold - TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000); } - @Test - public void testSlowSyncLogRolling() throws Exception { - // Create the test table - TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) - .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); - admin.createTable(desc); - Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); - int row = 1; - try { - // Get a reference to the FSHLog - server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); - RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); - final FSHLog log = (FSHLog) server.getWAL(region); - - // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested - - final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); - log.registerWALActionsListener(new WALActionsListener() { - @Override - public void logRollRequested(WALActionsListener.RollRequestReason reason) { - switch (reason) { - case SLOW_SYNC: - slowSyncHookCalled.lazySet(true); - break; - default: - break; - } - } - }); - - // Write some data - - for (int i = 0; i < 10; i++) { - writeData(table, row++); - } - - assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); - - // Set up for test - slowSyncHookCalled.set(false); - - // Wrap the current writer with the anonymous class below that adds 200 ms of - // latency to any sync on the hlog. This should be more than sufficient to trigger - // slow sync warnings. - final Writer oldWriter1 = log.getWriter(); - final Writer newWriter1 = new Writer() { - @Override - public void close() throws IOException { - oldWriter1.close(); - } - - @Override - public void sync(boolean forceSync) throws IOException { - try { - Thread.sleep(200); - } catch (InterruptedException e) { - InterruptedIOException ex = new InterruptedIOException(); - ex.initCause(e); - throw ex; - } - oldWriter1.sync(forceSync); - } - - @Override - public void append(Entry entry) throws IOException { - oldWriter1.append(entry); - } - - @Override - public long getLength() { - return oldWriter1.getLength(); - } - - @Override - public long getSyncedLength() { - return oldWriter1.getSyncedLength(); - } - }; - log.setWriter(newWriter1); - - // Write some data. - // We need to write at least 5 times, but double it. We should only request - // a SLOW_SYNC roll once in the current interval. - for (int i = 0; i < 10; i++) { - writeData(table, row++); - } - - // Wait for our wait injecting writer to get rolled out, as needed. - - TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { - @Override - public boolean evaluate() throws Exception { - return log.getWriter() != newWriter1; - } - - @Override - public String explainFailure() throws Exception { - return "Waited too long for our test writer to get rolled out"; - } - }); - - assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); - - // Set up for test - slowSyncHookCalled.set(false); - - // Wrap the current writer with the anonymous class below that adds 5000 ms of - // latency to any sync on the hlog. - // This will trip the other threshold. - final Writer oldWriter2 = (Writer) log.getWriter(); - final Writer newWriter2 = new Writer() { - @Override - public void close() throws IOException { - oldWriter2.close(); - } - - @Override - public void sync(boolean forceSync) throws IOException { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - InterruptedIOException ex = new InterruptedIOException(); - ex.initCause(e); - throw ex; - } - oldWriter2.sync(forceSync); - } - - @Override - public void append(Entry entry) throws IOException { - oldWriter2.append(entry); - } - - @Override - public long getLength() { - return oldWriter2.getLength(); - } - - @Override - public long getSyncedLength() { - return oldWriter2.getSyncedLength(); - } - }; - log.setWriter(newWriter2); - - // Write some data. Should only take one sync. - - writeData(table, row++); - - // Wait for our wait injecting writer to get rolled out, as needed. - - TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { - @Override - public boolean evaluate() throws Exception { - return log.getWriter() != newWriter2; - } - - @Override - public String explainFailure() throws Exception { - return "Waited too long for our test writer to get rolled out"; - } - }); - - assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); - - // Set up for test - slowSyncHookCalled.set(false); - - // Write some data - for (int i = 0; i < 10; i++) { - writeData(table, row++); + public static class SlowSyncLogWriter extends ProtobufLogWriter { + @Override + public void sync(boolean forceSync) throws IOException { + try { + Thread.sleep(syncLatencyMillis); + } catch (InterruptedException e) { + InterruptedIOException ex = new InterruptedIOException(); + ex.initCause(e); + throw ex; } + super.sync(forceSync); + } + } - assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); + @Override + protected void setSlowLogWriter(Configuration conf) { + conf.set(FSHLogProvider.WRITER_IMPL, SlowSyncLogWriter.class.getName()); + } - } finally { - table.close(); - } + @Override + protected void setDefaultLogWriter(Configuration conf) { + conf.set(FSHLogProvider.WRITER_IMPL, ProtobufLogWriter.class.getName()); } void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout) @@ -313,6 +149,36 @@ void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, } } + @Test + public void testSlowSyncLogRolling() throws Exception { + // Create the test table + TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); + admin.createTable(desc); + try (Table table = TEST_UTIL.getConnection().getTable(desc.getTableName())) { + server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); + RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); + final AbstractFSWAL log = getWALAndRegisterSlowSyncHook(region); + + // Set default log writer, no additional latency to any sync on the hlog. + checkSlowSync(log, table, -1, 10, false); + + // Adds 200 ms of latency to any sync on the hlog. This should be more than sufficient to + // trigger slow sync warnings. + // Write some data. + // We need to write at least 5 times, but double it. We should only request + // a SLOW_SYNC roll once in the current interval. + checkSlowSync(log, table, 200, 10, true); + + // Adds 5000 ms of latency to any sync on the hlog. This will trip the other threshold. + // Write some data. Should only take one sync. + checkSlowSync(log, table, 5000, 1, true); + + // Set default log writer, no additional latency to any sync on the hlog. + checkSlowSync(log, table, -1, 10, false); + } + } + /** * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 & * syncFs() support (HDFS-200)