Skip to content

Commit

Permalink
HBASE-27230 RegionServer should be aborted when WAL.sync throws Timeo… (
Browse files Browse the repository at this point in the history
#4641)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
comnetwork committed Jul 24, 2022
1 parent 31fc97e commit 0ae42dd
Show file tree
Hide file tree
Showing 10 changed files with 307 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
Expand Down Expand Up @@ -1367,7 +1368,9 @@ public RegionInfo getRegionInfo() {
return this.fs.getRegionInfo();
}

/** Returns Instance of {@link RegionServerServices} used by this HRegion. Can be null. */
/**
* Returns Instance of {@link RegionServerServices} used by this HRegion. Can be null.
*/
RegionServerServices getRegionServerServices() {
return this.rsServices;
}
Expand Down Expand Up @@ -2863,7 +2866,7 @@ private boolean writeCanNotFlushMarkerToWAL(WriteEntry flushOpSeqIdMVCCEntry, WA
if (sink != null && !writeFlushWalMarker) {
/**
* Here for replication to secondary region replica could use {@link FlushAction#CANNOT_FLUSH}
* to recover writeFlushWalMarker is false, we create {@link WALEdit} for
* to recover when writeFlushWalMarker is false, we create {@link WALEdit} for
* {@link FlushDescriptor} and attach the {@link RegionReplicationSink#add} to the
* flushOpSeqIdMVCCEntry,see HBASE-26960 for more details.
*/
Expand Down Expand Up @@ -3694,7 +3697,7 @@ public void doPostOpCleanupForMiniBatch(
* @param familyMap Map of Cells by family
*/
protected void applyFamilyMapToMemStore(Map<byte[], List<Cell>> familyMap,
MemStoreSizing memstoreAccounting) throws IOException {
MemStoreSizing memstoreAccounting) {
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List<Cell> cells = e.getValue();
Expand Down Expand Up @@ -5231,7 +5234,7 @@ public void setReadsEnabled(boolean readsEnabled) {
* scenario but that do not make sense otherwise.
*/
private void applyToMemStore(HStore store, List<Cell> cells, boolean delta,
MemStoreSizing memstoreAccounting) throws IOException {
MemStoreSizing memstoreAccounting) {
// Any change in how we update Store/MemStore needs to also be done in other applyToMemStore!!!!
boolean upsert = delta && store.getColumnFamilyDescriptor().getMaxVersions() == 1;
if (upsert) {
Expand Down Expand Up @@ -8047,6 +8050,19 @@ private WriteEntry doWALAppend(WALEdit walEdit, BatchOperation<?> batchOp,
if (walKey.getWriteEntry() != null) {
mvcc.complete(walKey.getWriteEntry());
}

/**
* If {@link WAL#sync} get a timeout exception, the only correct way is to abort the region
* server, as the design of {@link WAL#sync}, is to succeed or die, there is no 'failure'. It
* is usually not a big deal is because we set a very large default value(5 minutes) for
* {@link AbstractFSWAL#WAL_SYNC_TIMEOUT_MS}, usually the WAL system will abort the region
* server if it can not finish the sync within 5 minutes.
*/
if (ioe instanceof WALSyncTimeoutIOException) {
if (rsServices != null) {
rsServices.abort("WAL sync timeout,forcing server shutdown", ioe);
}
}
throw ioe;
}
}
Expand All @@ -8057,7 +8073,7 @@ private WriteEntry doWALAppend(WALEdit walEdit, BatchOperation<?> batchOp,
*/
private void attachRegionReplicationInWALAppend(BatchOperation<?> batchOp,
MiniBatchOperationInProgress<Mutation> miniBatchOp, WALKeyImpl walKey, WALEdit walEdit,
WriteEntry writeEntry) throws IOException {
WriteEntry writeEntry) {
if (!regionReplicationSink.isPresent()) {
return;
}
Expand Down Expand Up @@ -8086,7 +8102,7 @@ private void attachRegionReplicationInWALAppend(BatchOperation<?> batchOp,
* replica.
*/
private void doAttachReplicateRegionReplicaAction(WALKeyImpl walKey, WALEdit walEdit,
WriteEntry writeEntry) throws IOException {
WriteEntry writeEntry) {
if (walEdit == null || walEdit.isEmpty()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1880,8 +1880,7 @@ public long getSmallestReadPoint() {
* across all of them.
* @param readpoint readpoint below which we can safely remove duplicate KVs
*/
public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing)
throws IOException {
public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) {
this.storeEngine.readLock();
try {
this.memstore.upsert(cells, readpoint, memstoreSizing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
"hbase.regionserver.wal.slowsync.roll.interval.ms";
protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute

protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min

public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier";
Expand Down Expand Up @@ -881,7 +881,7 @@ protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
}
}
} catch (TimeoutIOException tioe) {
throw tioe;
throw new WALSyncTimeoutIOException(tioe);
} catch (InterruptedException ie) {
LOG.warn("Interrupted", ie);
throw convertInterruptedExceptionToIOException(ie);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;

import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Thrown when {@link WAL#sync} timeout.
*/
@InterfaceAudience.Private
public class WALSyncTimeoutIOException extends HBaseIOException {

private static final long serialVersionUID = 5067699288291906985L;

public WALSyncTimeoutIOException() {
super();
}

public WALSyncTimeoutIOException(String message, Throwable cause) {
super(message, cause);
}

public WALSyncTimeoutIOException(String message) {
super(message);
}

public WALSyncTimeoutIOException(Throwable cause) {
super(cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal,
if (walKey.getWriteEntry() != null) {
mvcc.complete(walKey.getWriteEntry());
}
/**
* Here we do not abort the RegionServer for {@link WALSyncTimeoutIOException} as
* {@link HRegion#doWALAppend} does,because WAL Marker just records the internal state and
* seems it is no need to always abort the RegionServer when {@link WAL#sync} timeout,it is
* the internal state transition that determines whether RegionServer is aborted or not.
*/
throw ioe;
}
return walKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,15 @@ void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long
StreamSlowMonitor monitor) throws IOException, CommonFSUtils.StreamLacksCapabilityException;
}

private EventLoopGroup eventLoopGroup;
/**
* Protected visibility for used in tests.
*/
protected EventLoopGroup eventLoopGroup;

private Class<? extends Channel> channelClass;
/**
* Protected visibility for used in tests.
*/
protected Class<? extends Channel> channelClass;

@Override
protected AsyncFSWAL createWAL() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
Expand Down Expand Up @@ -133,18 +134,21 @@ void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,

/**
* Sync what we have in the WAL.
* @throws when timeout, it would throw {@link WALSyncTimeoutIOException}.
*/
void sync() throws IOException;

/**
* Sync the WAL if the txId was not already sync'd.
* @param txid Transaction id to sync to.
* @throws when timeout, it would throw {@link WALSyncTimeoutIOException}.
*/
void sync(long txid) throws IOException;

/**
* @param forceSync Flag to force sync rather than flushing to the buffer. Example - Hadoop hflush
* vs hsync.
* @throws when timeout, it would throw {@link WALSyncTimeoutIOException}.
*/
default void sync(boolean forceSync) throws IOException {
sync();
Expand All @@ -154,6 +158,7 @@ default void sync(boolean forceSync) throws IOException {
* @param txid Transaction id to sync to.
* @param forceSync Flag to force sync rather than flushing to the buffer. Example - Hadoop hflush
* vs hsync.
* @throws when timeout, it would throw {@link WALSyncTimeoutIOException}.
*/
default void sync(long txid, boolean forceSync) throws IOException {
sync(txid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.wal;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
Expand Down Expand Up @@ -505,4 +506,10 @@ public final WALProvider getMetaWALProvider() {
public ExcludeDatanodeManager getExcludeDatanodeManager() {
return excludeDatanodeManager;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public String getFactoryId() {
return this.factoryId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.CompactionState;
Expand Down Expand Up @@ -54,7 +52,6 @@
* named for the method and does its stuff against that.
*/
@Category({ MasterTests.class, LargeTests.class })
@SuppressWarnings("deprecation")
public class TestWarmupRegion {

@ClassRule
Expand All @@ -66,20 +63,17 @@ public class TestWarmupRegion {
protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static byte[] ROW = Bytes.toBytes("testRow");
private static byte[] FAMILY = Bytes.toBytes("testFamily");
private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte[] VALUE = Bytes.toBytes("testValue");
private static byte[] COLUMN = Bytes.toBytes("column");
private static int numRows = 10000;
protected static int SLAVES = 3;
private static SingleProcessHBaseCluster myCluster;
private static Table table;

/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
TEST_UTIL.startMiniCluster(SLAVES);
}

Expand Down
Loading

0 comments on commit 0ae42dd

Please sign in to comment.