Skip to content

Commit

Permalink
HBASE-28210 There could be holes in stack ids when loading procedures (
Browse files Browse the repository at this point in the history
…#5531)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
  • Loading branch information
Apache9 committed Nov 21, 2023
1 parent 9631af7 commit e88daed
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1695,9 +1695,6 @@ private void execProcedure(RootProcedureState<TEnvironment> procStack,
}
}

// Add the procedure to the stack
procStack.addRollbackStep(procedure);

// allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery.
if (
Expand All @@ -1715,7 +1712,12 @@ private void execProcedure(RootProcedureState<TEnvironment> procStack,
// Commit the transaction even if a suspend (state may have changed). Note this append
// can take a bunch of time to complete.
if (procedure.needPersistence()) {
updateStoreOnExec(procStack, procedure, subprocs);
// Add the procedure to the stack
// See HBASE-28210 on why we need synchronized here
synchronized (procStack) {
procStack.addRollbackStep(procedure);
updateStoreOnExec(procStack, procedure, subprocs);
}
}

// if the store is not running we are aborting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.LinkedHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.AtomicUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;

/**
* Testcase for HBASE-28210, where we persist the procedure which has been inserted later to
* {@link RootProcedureState} first and then crash, and then cause holes in stack ids when loading,
* and finally fail the start up of master.
*/
@Category({ MasterTests.class, SmallTests.class })
public class TestStackIdHoles {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestStackIdHoles.class);

private final class DummyProcedureStore extends ProcedureStoreBase {

private int numThreads;

private final LinkedHashMap<Long, ProcedureProtos.Procedure> procMap =
new LinkedHashMap<Long, ProcedureProtos.Procedure>();

private final AtomicLong maxProcId = new AtomicLong(0);

private final AtomicBoolean updated = new AtomicBoolean(false);

@Override
public void start(int numThreads) throws IOException {
this.numThreads = numThreads;
setRunning(true);
}

@Override
public void stop(boolean abort) {
}

@Override
public int getNumThreads() {
return numThreads;
}

@Override
public int setRunningProcedureCount(int count) {
return count;
}

@Override
public void recoverLease() throws IOException {
}

@Override
public void load(ProcedureLoader loader) throws IOException {
loader.setMaxProcId(maxProcId.get());
ProcedureTree tree = ProcedureTree.build(procMap.values());
loader.load(tree.getValidProcs());
loader.handleCorrupted(tree.getCorruptedProcs());
}

@Override
public void insert(Procedure<?> proc, Procedure<?>[] subprocs) {
long max = proc.getProcId();
synchronized (procMap) {
try {
procMap.put(proc.getProcId(), ProcedureUtil.convertToProtoProcedure(proc));
if (subprocs != null) {
for (Procedure<?> p : subprocs) {
procMap.put(p.getProcId(), ProcedureUtil.convertToProtoProcedure(p));
max = Math.max(max, p.getProcId());
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
AtomicUtils.updateMax(maxProcId, max);
}

@Override
public void insert(Procedure<?>[] procs) {
long max = -1;
synchronized (procMap) {
try {
for (Procedure<?> p : procs) {
procMap.put(p.getProcId(), ProcedureUtil.convertToProtoProcedure(p));
max = Math.max(max, p.getProcId());
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
AtomicUtils.updateMax(maxProcId, max);
}

@Override
public void update(Procedure<?> proc) {
// inject a sleep to simulate the scenario in HBASE-28210
if (proc.hasParent() && proc.getStackIndexes() != null) {
int lastStackId = proc.getStackIndexes()[proc.getStackIndexes().length - 1];
try {
// sleep more times if the stack id is smaller
Thread.sleep(100L * (10 - lastStackId));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
// simulate the failure when updating the second sub procedure
if (!updated.compareAndSet(false, true)) {
procExec.stop();
throw new RuntimeException("inject error");
}
}
synchronized (procMap) {
try {
procMap.put(proc.getProcId(), ProcedureUtil.convertToProtoProcedure(proc));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

@Override
public void delete(long procId) {
synchronized (procMap) {
procMap.remove(procId);
}
}

@Override
public void delete(Procedure<?> parentProc, long[] subProcIds) {
synchronized (procMap) {
try {
procMap.put(parentProc.getProcId(), ProcedureUtil.convertToProtoProcedure(parentProc));
for (long procId : subProcIds) {
procMap.remove(procId);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

@Override
public void delete(long[] procIds, int offset, int count) {
synchronized (procMap) {
for (int i = 0; i < count; i++) {
long procId = procIds[offset + i];
procMap.remove(procId);
}
}
}
}

private final HBaseCommonTestingUtil HBTU = new HBaseCommonTestingUtil();

private DummyProcedureStore procStore;

private ProcedureExecutor<Void> procExec;

@Before
public void setUp() throws IOException {
procStore = new DummyProcedureStore();
procStore.start(4);
procExec = new ProcedureExecutor<Void>(HBTU.getConfiguration(), null, procStore);
procExec.init(4, true);
procExec.startWorkers();
}

@After
public void tearDown() {
procExec.stop();
}

public static class DummyProcedure extends NoopProcedure<Void> {

@Override
protected Procedure<Void>[] execute(Void env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
return new Procedure[] { new NoopProcedure<Void>(), new NoopProcedure<Void>() };
}
}

@Test
public void testLoad() throws IOException {
procExec.submitProcedure(new DummyProcedure());
// wait for the error
HBTU.waitFor(30000, () -> !procExec.isRunning());
procExec = new ProcedureExecutor<Void>(HBTU.getConfiguration(), null, procStore);
// make sure there is no error while loading
procExec.init(4, true);
}
}

0 comments on commit e88daed

Please sign in to comment.