Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-25905 Shutdown of WAL stuck at waitForSafePoint #3898

Merged
merged 1 commit into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,20 @@ private int finishSync() {
}
}

// confirm non-empty before calling
private static long getLastTxid(Deque<FSWALEntry> queue) {
return queue.peekLast().getTxid();
}

private void appendAndSync() {
final AsyncWriter writer = this.writer;
// maybe a sync request is not queued when we issue a sync, so check here to see if we could
// finish some.
finishSync();
long newHighestProcessedAppendTxid = -1L;
// this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single
// threaded, this could save us some cycles
boolean addedToUnackedAppends = false;
for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
FSWALEntry entry = iter.next();
boolean appended;
Expand All @@ -467,10 +475,21 @@ private void appendAndSync() {
if (appended) {
// This is possible, when we fail to sync, we will add the unackedAppends back to
// toWriteAppends, so here we may get an entry which is already in the unackedAppends.
if (unackedAppends.isEmpty() || unackedAppends.peekLast().getTxid() < entry.getTxid()) {
if (addedToUnackedAppends || unackedAppends.isEmpty() ||
getLastTxid(unackedAppends) < entry.getTxid()) {
unackedAppends.addLast(entry);
addedToUnackedAppends = true;
}
if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
// See HBASE-25905, here we need to make sure that, we will always write all the entries in
// unackedAppends out. As the code in the consume method will assume that, the entries in
// unackedAppends have all been sent out so if there is roll request and unackedAppends is
// not empty, we could just return as later there will be a syncCompleted call to clear the
// unackedAppends, or a syncFailed to lead us to another state.
// There could be other ways to fix, such as changing the logic in the consume method, but
// it will break the assumption and then (may) lead to a big refactoring. So here let's use
// this way to fix first, can optimize later.
if (writer.getLength() - fileLengthAtLastSync >= batchSize &&
(addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends))) {
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {

private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWALProvider.class);

public static final String WRITER_IMPL = "hbase.regionserver.hlog.async.writer.impl";

// Only public so classes back in regionserver.wal can access
public interface AsyncWriter extends WALProvider.AsyncWriter {
/**
Expand Down Expand Up @@ -98,7 +100,7 @@ public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, P
Class<? extends Channel> channelClass) throws IOException {
// Configuration already does caching for the Class lookup.
Class<? extends AsyncWriter> logWriterClass = conf.getClass(
"hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class);
WRITER_IMPL, AsyncProtobufLogWriter.class, AsyncWriter.class);
try {
AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class)
.newInstance(eventLoopGroup, channelClass);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;

/**
* Testcase for HBASE-25905
*/
@Category({ RegionServerTests.class, MediumTests.class })
public class TestAsyncFSWALRollStuck {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncFSWALRollStuck.class);

private static final Logger LOG = LoggerFactory.getLogger(TestAsyncFSWALRollStuck.class);

private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();

private static EventLoopGroup EVENT_LOOP_GROUP = new NioEventLoopGroup();

private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class;

private static ScheduledExecutorService EXECUTOR;

private static BlockingQueue<CompletableFuture<Long>> FUTURES = new ArrayBlockingQueue<>(3);

private static AtomicInteger SYNC_COUNT = new AtomicInteger(0);

private static CountDownLatch ARRIVE = new CountDownLatch(1);

private static CountDownLatch RESUME = new CountDownLatch(1);

public static final class TestAsyncWriter extends AsyncProtobufLogWriter {

public TestAsyncWriter(EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) {
super(eventLoopGroup, channelClass);
}

@Override
public CompletableFuture<Long> sync(boolean forceSync) {
int count = SYNC_COUNT.incrementAndGet();
if (count < 3) {
// we will mark these two futures as failure, to make sure that we have 2 edits in
// unackedAppends, and trigger a WAL roll
CompletableFuture<Long> f = new CompletableFuture<>();
FUTURES.offer(f);
return f;
} else if (count == 3) {
// for this future, we will mark it as succeeded, but before returning from this method, we
// need to request a roll, to enter the special corner case, where we have left some edits
// in unackedAppends but never tries to write them out which leads to a hang
ARRIVE.countDown();
try {
RESUME.await();
} catch (InterruptedException e) {
}
return super.sync(forceSync);
} else {
return super.sync(forceSync);
}
}
}

private static TableName TN;

private static RegionInfo RI;

private static MultiVersionConcurrencyControl MVCC;

private static AsyncFSWAL WAL;

private static ExecutorService ROLL_EXEC;

@BeforeClass
public static void setUp() throws Exception {
Configuration conf = UTIL.getConfiguration();
conf.setClass(AsyncFSWALProvider.WRITER_IMPL, TestAsyncWriter.class, AsyncWriter.class);
// set a very small size so we will reach the batch size when writing out a single edit
conf.setLong(AsyncFSWAL.WAL_BATCH_SIZE, 1);

TN = TableName.valueOf("test");
RI = RegionInfoBuilder.newBuilder(TN).build();
MVCC = new MultiVersionConcurrencyControl();

EXECUTOR =
Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setDaemon(true).build());

Path rootDir = UTIL.getDataTestDir();
ROLL_EXEC =
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build());
WALActionsListener listener = new WALActionsListener() {

@Override
public void logRollRequested(RollRequestReason reason) {
ROLL_EXEC.execute(() -> {
try {
WAL.rollWriter();
} catch (Exception e) {
LOG.warn("failed to roll writer", e);
}
});
}

};
WAL = new AsyncFSWAL(UTIL.getTestFileSystem(), rootDir, "log", "oldlog", conf,
Arrays.asList(listener), true, null, null, EVENT_LOOP_GROUP, CHANNEL_CLASS);
WAL.init();
}

@AfterClass
public static void tearDown() throws Exception {
EXECUTOR.shutdownNow();
ROLL_EXEC.shutdownNow();
Closeables.close(WAL, true);
UTIL.cleanupTestDir();
}

@Test
public void testRoll() throws Exception {
byte[] row = Bytes.toBytes("family");
WALEdit edit = new WALEdit();
edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setFamily(row)
.setQualifier(row).setRow(row).setValue(row)
.setTimestamp(EnvironmentEdgeManager.currentTime()).setType(Type.Put).build());
WALKeyImpl key1 =
new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, EnvironmentEdgeManager.currentTime(), MVCC);
WAL.appendData(RI, key1, edit);

WALKeyImpl key2 = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, key1.getWriteTime() + 1, MVCC);
long txid = WAL.appendData(RI, key2, edit);

// we need to make sure the two edits have both been added unackedAppends, so we have two syncs
UTIL.waitFor(10000, () -> FUTURES.size() == 2);
FUTURES.poll().completeExceptionally(new IOException("inject error"));
FUTURES.poll().completeExceptionally(new IOException("inject error"));
ARRIVE.await();
// resume after 1 seconds, to give us enough time to enter the roll state
EXECUTOR.schedule(() -> RESUME.countDown(), 1, TimeUnit.SECONDS);
// let's roll the wal, before the fix in HBASE-25905, it will hang forever inside
// waitForSafePoint
WAL.rollWriter();
// make sure we can finally succeed
WAL.sync(txid);
}
}