Skip to content

Commit

Permalink
Revert "HBASE-16092 Procedure v2 - complete child procedure support"
Browse files Browse the repository at this point in the history
This reverts commit 96c4054.
  • Loading branch information
Matteo Bertozzi committed Jun 24, 2016
1 parent e8599a2 commit 1d06850
Show file tree
Hide file tree
Showing 15 changed files with 78 additions and 678 deletions.
Expand Up @@ -589,11 +589,6 @@ protected synchronized boolean childrenCountDown() {
return --childrenLatch == 0; return --childrenLatch == 0;
} }


@InterfaceAudience.Private
protected synchronized boolean hasChildren() {
return childrenLatch > 0;
}

/** /**
* Called by the RootProcedureState on procedure execution. * Called by the RootProcedureState on procedure execution.
* Each procedure store its stack-index positions. * Each procedure store its stack-index positions.
Expand All @@ -611,7 +606,7 @@ protected synchronized void addStackIndex(final int index) {


@InterfaceAudience.Private @InterfaceAudience.Private
protected synchronized boolean removeStackIndex() { protected synchronized boolean removeStackIndex() {
if (stackIndexes != null && stackIndexes.length > 1) { if (stackIndexes.length > 1) {
stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1); stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
return false; return false;
} else { } else {
Expand Down
Expand Up @@ -18,18 +18,16 @@


package org.apache.hadoop.hbase.procedure2; package org.apache.hadoop.hbase.procedure2;


import com.google.common.base.Preconditions;

import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -57,6 +55,8 @@
import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;


import com.google.common.base.Preconditions;

/** /**
* Thread Pool that executes the submitted procedures. * Thread Pool that executes the submitted procedures.
* The executor has a ProcedureStore associated. * The executor has a ProcedureStore associated.
Expand Down Expand Up @@ -314,7 +314,7 @@ public void handleCorrupted(ProcedureIterator procIter) throws IOException {
corruptedCount++; corruptedCount++;
} }
if (abortOnCorruption && corruptedCount > 0) { if (abortOnCorruption && corruptedCount > 0) {
throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay"); throw new IOException("found " + corruptedCount + " procedures on replay");
} }
} }
}); });
Expand Down Expand Up @@ -388,10 +388,10 @@ private void loadProcedures(final ProcedureIterator procIter,
continue; continue;
} }


if (proc.hasParent()) { if (proc.hasParent() && !proc.isFinished()) {
Procedure parent = procedures.get(proc.getParentProcId()); Procedure parent = procedures.get(proc.getParentProcId());
// corrupted procedures are handled later at step 3 // corrupted procedures are handled later at step 3
if (parent != null && !proc.isFinished()) { if (parent != null) {
parent.incChildrenLatch(); parent.incChildrenLatch();
} }
} }
Expand All @@ -403,11 +403,6 @@ private void loadProcedures(final ProcedureIterator procIter,
case RUNNABLE: case RUNNABLE:
runnableList.add(proc); runnableList.add(proc);
break; break;
case WAITING:
if (!proc.hasChildren()) {
runnableList.add(proc);
}
break;
case WAITING_TIMEOUT: case WAITING_TIMEOUT:
if (waitingSet == null) { if (waitingSet == null) {
waitingSet = new HashSet<Procedure>(); waitingSet = new HashSet<Procedure>();
Expand All @@ -418,8 +413,8 @@ private void loadProcedures(final ProcedureIterator procIter,
if (proc.hasException()) { if (proc.hasException()) {
// add the proc to the runnables to perform the rollback // add the proc to the runnables to perform the rollback
runnables.addBack(proc); runnables.addBack(proc);
break;
} }
break;
case ROLLEDBACK: case ROLLEDBACK:
case INITIALIZING: case INITIALIZING:
String msg = "Unexpected " + proc.getState() + " state for " + proc; String msg = "Unexpected " + proc.getState() + " state for " + proc;
Expand All @@ -438,7 +433,7 @@ private void loadProcedures(final ProcedureIterator procIter,
RootProcedureState procStack = entry.getValue(); RootProcedureState procStack = entry.getValue();
if (procStack.isValid()) continue; if (procStack.isValid()) continue;


for (Procedure proc: procStack.getSubproceduresStack()) { for (Procedure proc: procStack.getSubprocedures()) {
LOG.error("corrupted procedure: " + proc); LOG.error("corrupted procedure: " + proc);
procedures.remove(proc.getProcId()); procedures.remove(proc.getProcId());
runnableList.remove(proc); runnableList.remove(proc);
Expand Down Expand Up @@ -945,7 +940,7 @@ private boolean executeRollback(final long rootProcId, final RootProcedureState
store.update(rootProc); store.update(rootProc);
} }


List<Procedure> subprocStack = procStack.getSubproceduresStack(); List<Procedure> subprocStack = procStack.getSubprocedures();
assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc; assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;


int stackTail = subprocStack.size(); int stackTail = subprocStack.size();
Expand Down Expand Up @@ -1027,12 +1022,7 @@ private boolean executeRollback(final Procedure proc) {
store.delete(proc.getProcId()); store.delete(proc.getProcId());
procedures.remove(proc.getProcId()); procedures.remove(proc.getProcId());
} else { } else {
final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds(); store.update(proc);
if (childProcIds != null) {
store.delete(proc, childProcIds);
} else {
store.update(proc);
}
} }
} else { } else {
store.update(proc); store.update(proc);
Expand Down Expand Up @@ -1112,7 +1102,6 @@ private void execProcedure(final RootProcedureState procStack, final Procedure p
assert subproc.getState() == ProcedureState.INITIALIZING : subproc; assert subproc.getState() == ProcedureState.INITIALIZING : subproc;
subproc.setParentProcId(procedure.getProcId()); subproc.setParentProcId(procedure.getProcId());
subproc.setProcId(nextProcId()); subproc.setProcId(nextProcId());
procStack.addSubProcedure(subproc);
} }


if (!procedure.isFailed()) { if (!procedure.isFailed()) {
Expand Down Expand Up @@ -1149,7 +1138,17 @@ private void execProcedure(final RootProcedureState procStack, final Procedure p
} }


// Commit the transaction // Commit the transaction
updateStoreOnExec(procStack, procedure, subprocs); if (subprocs != null && !procedure.isFailed()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs));
}
store.insert(procedure, subprocs);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Store update " + procedure);
}
store.update(procedure);
}


// if the store is not running we are aborting // if the store is not running we are aborting
if (!store.isRunning()) { if (!store.isRunning()) {
Expand Down Expand Up @@ -1199,34 +1198,6 @@ private void execProcedure(final RootProcedureState procStack, final Procedure p
} }
} }


private void updateStoreOnExec(final RootProcedureState procStack,
final Procedure procedure, final Procedure[] subprocs) {
if (subprocs != null && !procedure.isFailed()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs));
}
store.insert(procedure, subprocs);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Store update " + procedure);
}
if (procedure.isFinished() && !procedure.hasParent()) {
// remove child procedures
final long[] childProcIds = procStack.getSubprocedureIds();
if (childProcIds != null) {
store.delete(procedure, childProcIds);
for (int i = 0; i < childProcIds.length; ++i) {
procedures.remove(childProcIds[i]);
}
} else {
store.update(procedure);
}
} else {
store.update(procedure);
}
}
}

private void handleInterruptedException(final Procedure proc, final InterruptedException e) { private void handleInterruptedException(final Procedure proc, final InterruptedException e) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("got an interrupt during " + proc + ". suspend and retry it later.", e); LOG.trace("got an interrupt during " + proc + ". suspend and retry it later.", e);
Expand Down
Expand Up @@ -19,9 +19,7 @@
package org.apache.hadoop.hbase.procedure2; package org.apache.hadoop.hbase.procedure2;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;


import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -51,8 +49,7 @@ private enum State {
ROLLINGBACK, // The Procedure failed and the execution was rolledback ROLLINGBACK, // The Procedure failed and the execution was rolledback
} }


private Set<Procedure> subprocs = null; private ArrayList<Procedure> subprocedures = null;
private ArrayList<Procedure> subprocStack = null;
private State state = State.RUNNING; private State state = State.RUNNING;
private int running = 0; private int running = 0;


Expand Down Expand Up @@ -90,23 +87,13 @@ protected synchronized void unsetRollback() {
state = State.FAILED; state = State.FAILED;
} }


protected synchronized long[] getSubprocedureIds() { protected synchronized List<Procedure> getSubprocedures() {
if (subprocs == null) return null; return subprocedures;
int index = 0;
final long[] subIds = new long[subprocs.size()];
for (Procedure proc: subprocs) {
subIds[index++] = proc.getProcId();
}
return subIds;
}

protected synchronized List<Procedure> getSubproceduresStack() {
return subprocStack;
} }


protected synchronized RemoteProcedureException getException() { protected synchronized RemoteProcedureException getException() {
if (subprocStack != null) { if (subprocedures != null) {
for (Procedure proc: subprocStack) { for (Procedure proc: subprocedures) {
if (proc.hasException()) { if (proc.hasException()) {
return proc.getException(); return proc.getException();
} }
Expand Down Expand Up @@ -146,19 +133,11 @@ protected synchronized void addRollbackStep(final Procedure proc) {
if (proc.isFailed()) { if (proc.isFailed()) {
state = State.FAILED; state = State.FAILED;
} }
if (subprocStack == null) { if (subprocedures == null) {
subprocStack = new ArrayList<Procedure>(); subprocedures = new ArrayList<Procedure>();
}
proc.addStackIndex(subprocStack.size());
subprocStack.add(proc);
}

protected synchronized void addSubProcedure(final Procedure proc) {
if (!proc.hasParent()) return;
if (subprocs == null) {
subprocs = new HashSet<Procedure>();
} }
subprocs.add(proc); proc.addStackIndex(subprocedures.size());
subprocedures.add(proc);
} }


/** /**
Expand All @@ -169,19 +148,18 @@ protected synchronized void addSubProcedure(final Procedure proc) {
* on load we recreate the full stack by aggregating each procedure stack-positions. * on load we recreate the full stack by aggregating each procedure stack-positions.
*/ */
protected synchronized void loadStack(final Procedure proc) { protected synchronized void loadStack(final Procedure proc) {
addSubProcedure(proc);
int[] stackIndexes = proc.getStackIndexes(); int[] stackIndexes = proc.getStackIndexes();
if (stackIndexes != null) { if (stackIndexes != null) {
if (subprocStack == null) { if (subprocedures == null) {
subprocStack = new ArrayList<Procedure>(); subprocedures = new ArrayList<Procedure>();
} }
int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocStack.size(); int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocedures.size();
if (diff > 0) { if (diff > 0) {
subprocStack.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]); subprocedures.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]);
while (diff-- > 0) subprocStack.add(null); while (diff-- > 0) subprocedures.add(null);
} }
for (int i = 0; i < stackIndexes.length; ++i) { for (int i = 0; i < stackIndexes.length; ++i) {
subprocStack.set(stackIndexes[i], proc); subprocedures.set(stackIndexes[i], proc);
} }
} }
if (proc.getState() == ProcedureState.ROLLEDBACK) { if (proc.getState() == ProcedureState.ROLLEDBACK) {
Expand All @@ -195,8 +173,8 @@ protected synchronized void loadStack(final Procedure proc) {
* Called on store load by the ProcedureExecutor to validate the procedure stack. * Called on store load by the ProcedureExecutor to validate the procedure stack.
*/ */
protected synchronized boolean isValid() { protected synchronized boolean isValid() {
if (subprocStack != null) { if (subprocedures != null) {
for (Procedure proc: subprocStack) { for (Procedure proc: subprocedures) {
if (proc == null) { if (proc == null) {
return false; return false;
} }
Expand Down
Expand Up @@ -70,9 +70,4 @@ public void update(Procedure proc) {
public void delete(long procId) { public void delete(long procId) {
// no-op // no-op
} }

@Override
public void delete(Procedure proc, long[] subprocs) {
// no-op
}
} }
Expand Up @@ -20,9 +20,9 @@


import java.io.IOException; import java.io.IOException;


import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.Procedure;


/** /**
Expand Down Expand Up @@ -188,12 +188,4 @@ public interface ProcedureLoader {
* @param procId the ID of the procedure to remove. * @param procId the ID of the procedure to remove.
*/ */
void delete(long procId); void delete(long procId);

/**
* The parent procedure completed.
* Update the state and mark all the child deleted.
* @param parentProc the parent procedure to serialize and write to the store.
* @param subProcIds the IDs of the sub-procedure to remove.
*/
void delete(Procedure parentProc, long[] subProcIds);
} }
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
Expand Down Expand Up @@ -395,14 +394,6 @@ public void delete(long procId) {
trackProcIds(procId); trackProcIds(procId);
} }


public void delete(long[] procIds) {
// TODO: optimize
Arrays.sort(procIds);
for (int i = 0; i < procIds.length; ++i) {
delete(procIds[i]);
}
}

private void trackProcIds(long procId) { private void trackProcIds(long procId) {
minUpdatedProcId = Math.min(minUpdatedProcId, procId); minUpdatedProcId = Math.min(minUpdatedProcId, procId);
maxUpdatedProcId = Math.max(maxUpdatedProcId, procId); maxUpdatedProcId = Math.max(maxUpdatedProcId, procId);
Expand Down
Expand Up @@ -18,8 +18,6 @@


package org.apache.hadoop.hbase.procedure2.store.wal; package org.apache.hadoop.hbase.procedure2.store.wal;


import com.google.protobuf.InvalidProtocolBufferException;

import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
Expand All @@ -38,6 +36,8 @@
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;


import com.google.protobuf.InvalidProtocolBufferException;

/** /**
* Helper class that contains the WAL serialization utils. * Helper class that contains the WAL serialization utils.
*/ */
Expand Down Expand Up @@ -231,18 +231,4 @@ public static void writeDelete(ByteSlot slot, long procId)
builder.setProcId(procId); builder.setProcId(procId);
builder.build().writeDelimitedTo(slot); builder.build().writeDelimitedTo(slot);
} }

public static void writeDelete(ByteSlot slot, Procedure proc, long[] subprocs)
throws IOException {
ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE);
builder.setProcId(proc.getProcId());
if (subprocs != null) {
builder.addProcedure(Procedure.convert(proc));
for (int i = 0; i < subprocs.length; ++i) {
builder.addChildId(subprocs[i]);
}
}
builder.build().writeDelimitedTo(slot);
}
} }

0 comments on commit 1d06850

Please sign in to comment.