Skip to content

Commit

Permalink
HBASE-28190 Add slow sync log rolling test in TestAsyncLogRolling. (#…
Browse files Browse the repository at this point in the history
…5507)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
zhuyaogai committed Dec 11, 2023
1 parent 25c639f commit 78c5ac3
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2245,6 +2245,10 @@ private static void split(final Configuration conf, final Path p) throws IOExcep
WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
}

W getWriter() {
return this.writer;
}

private static void usage() {
System.err.println("Usage: AbstractFSWAL <ARGS>");
System.err.println("Arguments:");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,14 +603,6 @@ DatanodeInfo[] getPipeline() {
return new DatanodeInfo[0];
}

Writer getWriter() {
return this.writer;
}

void setWriter(Writer writer) {
this.writer = writer;
}

@Override
protected Writer createCombinedWriter(Writer localWriter, Writer remoteWriter) {
// put remote writer first as usually it will cost more time to finish, so we write to it first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseTestingUtil;
Expand All @@ -31,6 +35,7 @@
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
Expand All @@ -48,8 +53,10 @@
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
Expand All @@ -59,6 +66,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* Test log deletion as logs are rolled.
*/
Expand All @@ -74,6 +83,10 @@ public abstract class AbstractTestLogRolling {
protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
@Rule
public final TestName name = new TestName();
protected static int syncLatencyMillis;
private static int rowNum = 1;
private static final AtomicBoolean slowSyncHookCalled = new AtomicBoolean();
protected static ScheduledExecutorService EXECUTOR;

public AbstractTestLogRolling() {
this.server = null;
Expand Down Expand Up @@ -118,6 +131,17 @@ public static void setUpBeforeClass() throws Exception {
// disable low replication check for log roller to get a more stable result
// TestWALOpenAfterDNRollingStart will test this option.
conf.setLong("hbase.regionserver.hlog.check.lowreplication.interval", 24L * 60 * 60 * 1000);

// For slow sync threshold test: roll after 5 slow syncs in 10 seconds
conf.setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5);
conf.setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000);
// For slow sync threshold test: roll once after a sync above this threshold
conf.setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000);

// Slow sync executor.
EXECUTOR = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Slow-sync-%d")
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

@Before
Expand All @@ -139,6 +163,11 @@ public void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}

@AfterClass
public static void tearDownAfterClass() {
EXECUTOR.shutdownNow();
}

private void startAndWriteData() throws IOException, InterruptedException {
this.server = cluster.getRegionServerThreads().get(0).getRegionServer();

Expand All @@ -158,6 +187,74 @@ private void startAndWriteData() throws IOException, InterruptedException {
}
}

private static void setSyncLatencyMillis(int latency) {
syncLatencyMillis = latency;
}

protected final AbstractFSWAL<?> getWALAndRegisterSlowSyncHook(RegionInfo region)
throws IOException {
// Get a reference to the wal.
final AbstractFSWAL<?> log = (AbstractFSWAL<?>) server.getWAL(region);

// Register a WALActionsListener to observe if a SLOW_SYNC roll is requested
log.registerWALActionsListener(new WALActionsListener() {
@Override
public void logRollRequested(RollRequestReason reason) {
switch (reason) {
case SLOW_SYNC:
slowSyncHookCalled.lazySet(true);
break;
default:
break;
}
}
});
return log;
}

protected final void checkSlowSync(AbstractFSWAL<?> log, Table table, int slowSyncLatency,
int writeCount, boolean slowSync) throws Exception {
if (slowSyncLatency > 0) {
setSyncLatencyMillis(slowSyncLatency);
setSlowLogWriter(log.conf);
} else {
setDefaultLogWriter(log.conf);
}

// Set up for test
log.rollWriter(true);
slowSyncHookCalled.set(false);

final WALProvider.WriterBase oldWriter = log.getWriter();

// Write some data
for (int i = 0; i < writeCount; i++) {
writeData(table, rowNum++);
}

if (slowSync) {
TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return log.getWriter() != oldWriter;
}

@Override
public String explainFailure() throws Exception {
return "Waited too long for our test writer to get rolled out";
}
});

assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());
} else {
assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());
}
}

protected abstract void setSlowLogWriter(Configuration conf);

protected abstract void setDefaultLogWriter(Configuration conf);

/**
* Tests that log rolling doesn't hang when no data is written.
*/
Expand Down Expand Up @@ -239,12 +336,10 @@ void validateData(Table table, int rownum) throws IOException {
*/
@Test
public void testCompactionRecordDoesntBlockRolling() throws Exception {
Table table = null;

// When the hbase:meta table can be opened, the region servers are running
Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
try {
table = createTestTable(getName());
try (Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
Table table = createTestTable(getName())) {

server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
HRegion region = server.getRegions(table.getName()).get(0);
Expand Down Expand Up @@ -286,9 +381,6 @@ public void testCompactionRecordDoesntBlockRolling() throws Exception {
log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added.
assertEquals("Should have 1 WALs at the end", 1,
AbstractFSWALProvider.getNumRolledLogFiles(log));
} finally {
if (t != null) t.close();
if (table != null) table.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@
import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
Expand All @@ -36,6 +43,9 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;

@Category({ VerySlowRegionServerTests.class, LargeTests.class })
public class TestAsyncLogRolling extends AbstractTestLogRolling {

Expand All @@ -51,6 +61,61 @@ public static void setUpBeforeClass() throws Exception {
AbstractTestLogRolling.setUpBeforeClass();
}

public static class SlowSyncLogWriter extends AsyncProtobufLogWriter {

public SlowSyncLogWriter(EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) {
super(eventLoopGroup, channelClass);
}

@Override
public CompletableFuture<Long> sync(boolean forceSync) {
CompletableFuture<Long> future = new CompletableFuture<>();
super.sync(forceSync).whenCompleteAsync((lengthAfterFlush, error) -> {
EXECUTOR.schedule(() -> {
if (error != null) {
future.completeExceptionally(error);
} else {
future.complete(lengthAfterFlush);
}
}, syncLatencyMillis, TimeUnit.MILLISECONDS);
});
return future;
}
}

@Override
protected void setSlowLogWriter(Configuration conf) {
conf.set(AsyncFSWALProvider.WRITER_IMPL, SlowSyncLogWriter.class.getName());
}

@Override
protected void setDefaultLogWriter(Configuration conf) {
conf.set(AsyncFSWALProvider.WRITER_IMPL, AsyncProtobufLogWriter.class.getName());
}

@Test
public void testSlowSyncLogRolling() throws Exception {
// Create the test table
TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
admin.createTable(desc);
try (Table table = TEST_UTIL.getConnection().getTable(desc.getTableName())) {
server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
final AbstractFSWAL<?> log = getWALAndRegisterSlowSyncHook(region);

// Set default log writer, no additional latency to any sync on the hlog.
checkSlowSync(log, table, -1, 10, false);

// Adds 5000 ms of latency to any sync on the hlog. This will trip the other threshold.
// Write some data. Should only take one sync.
checkSlowSync(log, table, 5000, 1, true);

// Set default log writer, no additional latency to any sync on the hlog.
checkSlowSync(log, table, -1, 10, false);
}
}

@Test
public void testLogRollOnDatanodeDeath() throws IOException, InterruptedException {
dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 3, true, null, null);
Expand Down
Loading

0 comments on commit 78c5ac3

Please sign in to comment.