diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java index 2d9e2cc33..ef13bc870 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java @@ -27,21 +27,20 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.function.Consumer; +import java.util.function.Supplier; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.ConditionalWriter.Result; import org.apache.accumulo.core.client.ConditionalWriter.Status; import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.ColumnUpdate; @@ -105,6 +104,9 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra private static final Bytes RLOCK_VAL = Bytes.of("special rlock value 94da84e7796ff3b23b779805d820a33f1997cb8b"); + // added to avoid findbugs false positive + private static final Supplier NULLS = () -> null; + private static boolean isWrite(Bytes val) { return val != NTFY_VAL && val != RLOCK_VAL; } @@ -117,7 +119,7 @@ private static boolean isReadLock(Bytes val) { return val == RLOCK_VAL; } - private static enum TxStatus { + private enum TxStatus { OPEN, COMMIT_STARTED, COMMITTED, CLOSED } @@ -136,10 +138,6 @@ private static enum TxStatus { private TxStatus status = TxStatus.OPEN; private boolean commitAttempted = false; - // for testing - private boolean stopAfterPreCommit = false; - private boolean stopAfterPrimaryCommit = false; - public TransactionImpl(Environment env, Notification trigger, long startTs) { Objects.requireNonNull(env, "environment cannot be null"); Preconditions.checkArgument(startTs >= 0, "startTs cannot be negative"); @@ -522,12 +520,13 @@ public boolean preCommit(CommitData cd, RowColumn primary) { checkIfOpen(); status = TxStatus.COMMIT_STARTED; commitAttempted = true; - - stopAfterPreCommit = true; } SyncCommitObserver sco = new SyncCommitObserver(); - beginCommitAsync(cd, sco, primary); + cd = setUpBeginCommitAsync(cd, sco, primary); + if (cd != null) { + beginCommitAsyncTest(cd); + } try { sco.waitForCommit(); } catch (AlreadyAcknowledgedException e) { @@ -554,7 +553,7 @@ public boolean preCommit(CommitData cd, RowColumn primary) { * * @param cd Commit data */ - private void readUnread(CommitData cd, Consumer> locksSeen) throws Exception { + private void readUnread(CommitData cd, Consumer> locksSeen) { // TODO make async // TODO need to keep track of ranges read (not ranges passed in, but actual data read... user // may not iterate over entire range @@ -619,14 +618,13 @@ private void checkForOrphanedReadLocks(CommitData cd, Map> lo if (rowColsToCheck.size() > 0) { - long startTime = System.currentTimeMillis(); long waitTime = SnapshotScanner.INITIAL_WAIT_TIME; boolean resolved = false; List> openReadLocks = LockResolver.getOpenReadLocks(env, rowColsToCheck); - startTime = System.currentTimeMillis(); + long startTime = System.currentTimeMillis(); while (!resolved) { resolved = LockResolver.resolveLocks(env, startTs, stats, openReadLocks, startTime); @@ -701,23 +699,6 @@ private boolean checkForAckCollision(ConditionalMutation cm) { return false; } - @VisibleForTesting - public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp) { - stopAfterPrimaryCommit = true; - - SyncCommitObserver sco = new SyncCommitObserver(); - cd.commitObserver = sco; - try { - beginSecondCommitPhase(cd, commitStamp); - sco.waitForCommit(); - } catch (CommitException e) { - return false; - } catch (Exception e) { - throw new FluoException(e); - } - return true; - } - public CommitData createCommitData() { CommitData cd = new CommitData(); cd.cw = env.getSharedResources().getConditionalWriter(); @@ -728,9 +709,8 @@ public CommitData createCommitData() { @Override public synchronized void commit() throws CommitException { - SyncCommitObserver sco = null; try { - sco = new SyncCommitObserver(); + SyncCommitObserver sco = new SyncCommitObserver(); commitAsync(sco); sco.waitForCommit(); } finally { @@ -807,7 +787,7 @@ private synchronized void checkIfOpen() { // CHECKSTYLE:OFF @Override - protected void finalize() throws Throwable { + protected void finalize() { // CHECKSTYLE:ON // TODO Log an error if transaction is not closed (See FLUO-486) close(false); @@ -818,34 +798,6 @@ public long getStartTimestamp() { return startTs; } - /** - * Funcitonal interface to provide next step of asynchronous commit on successful completion of - * the previous one - */ - private static interface OnSuccessInterface { - public void onSuccess(V result) throws Exception; - } - - private abstract static class SynchronousCommitTask implements Runnable { - - private CommitData cd; - - SynchronousCommitTask(CommitData cd) { - this.cd = cd; - } - - protected abstract void runCommitStep(CommitData cd) throws Exception; - - @Override - public void run() { - try { - runCommitStep(cd); - } catch (Exception e) { - cd.commitObserver.failed(e); - } - } - } - @Override public int getSize() { // TODO could calculate as items are added/set @@ -874,25 +826,6 @@ public int getSize() { return size; } - private void addCallback(CompletableFuture cfuture, CommitData cd, - OnSuccessInterface onSuccessInterface) { - cfuture.handleAsync((result, exception) -> { - if (exception != null) { - cd.commitObserver.failed(exception); - return null; - } else { - try { - onSuccessInterface.onSuccess(result); - return null; - } catch (Exception e) { - cd.commitObserver.failed(e); - return null; - } - } - }, env.getSharedResources().getAsyncCommitExecutor()); - } - - // TODO exception handling!!!! How????? abstract class CommitStep { private CommitStep nextStep; @@ -927,13 +860,13 @@ CompletableFuture compose(CommitData cd) { abstract class ConditionalStep extends CommitStep { - CommitData cd; - public abstract Collection createMutations(CommitData cd); - public abstract Iterator handleUnknown(CommitData cd, Iterator results); + public abstract Iterator handleUnknown(CommitData cd, Iterator results) + throws Exception; - public abstract boolean processResults(CommitData cd, Iterator results); + public abstract boolean processResults(CommitData cd, Iterator results) + throws Exception; public AsyncConditionalWriter getACW(CommitData cd) { return cd.acw; @@ -942,7 +875,6 @@ public AsyncConditionalWriter getACW(CommitData cd) { @Override CompletableFuture getMainOp(CommitData cd) { // TODO not sure threading is correct - // TODO handle unknown Executor ace = env.getSharedResources().getAsyncCommitExecutor(); return getACW(cd).apply(createMutations(cd)).thenCompose(results -> { // ugh icky that this is an iterator, forces copy to inspect.. could refactor async CW to @@ -951,17 +883,32 @@ CompletableFuture getMainOp(CommitData cd) { Iterators.addAll(resultsList, results); boolean containsUknown = false; for (Result result : resultsList) { - containsUknown |= result.getStatus() == Status.UNKNOWN; + try { + containsUknown |= result.getStatus() == Status.UNKNOWN; + } catch (Exception e) { + throw new CompletionException(e); + } } - if (containsUknown) { // process unknown in sync executor Executor se = env.getSharedResources().getSyncCommitExecutor(); - return CompletableFuture.supplyAsync(() -> handleUnknown(cd, resultsList.iterator()), se); + return CompletableFuture.supplyAsync(() -> { + try { + return handleUnknown(cd, resultsList.iterator()); + } catch (Exception e) { + throw new CompletionException(e); + } + }, se); } else { return CompletableFuture.completedFuture(resultsList.iterator()); } - }).thenApplyAsync(results -> processResults(cd, results), ace); + }).thenApplyAsync(results -> { + try { + return processResults(cd, results); + } catch (Exception e) { + throw new CompletionException(e); + } + }, ace); } @@ -976,7 +923,8 @@ public Collection createMutations(CommitData cd) { } @Override - public Iterator handleUnknown(CommitData cd, Iterator results) { + public Iterator handleUnknown(CommitData cd, Iterator results) + throws Exception { Result result = Iterators.getOnlyElement(results); Status mutationStatus = result.getStatus(); @@ -986,14 +934,20 @@ public Iterator handleUnknown(CommitData cd, Iterator results) { switch (txInfo.status) { case LOCKED: - return Collections.singleton(new Result(Status.ACCEPTED, result.getMutation(), result.getTabletServer())).iterator(); + return Collections + .singleton( + new Result(Status.ACCEPTED, result.getMutation(), result.getTabletServer())) + .iterator(); case ROLLED_BACK: - return Collections.singleton(new Result(Status.REJECTED, result.getMutation(), result.getTabletServer())).iterator(); + return Collections + .singleton( + new Result(Status.REJECTED, result.getMutation(), result.getTabletServer())) + .iterator(); case UNKNOWN: // TODO async Result newResult = cd.cw.write(result.getMutation()); mutationStatus = newResult.getStatus(); - if(mutationStatus != Status.UNKNOWN) { + if (mutationStatus != Status.UNKNOWN) { return Collections.singleton(newResult).iterator(); } // TODO handle case were data other tx has lock @@ -1006,26 +960,30 @@ public Iterator handleUnknown(CommitData cd, Iterator results) { } } - //TODO + // TODO throw new IllegalStateException(); } @Override - public boolean processResults(CommitData cd, Iterator results) { + public boolean processResults(CommitData cd, Iterator results) throws Exception { Result result = Iterators.getOnlyElement(results); return result.getStatus() == Status.ACCEPTED; } @Override CompletableFuture getFailureOp(CommitData cd) { - //TODO can this be simplified by pushing some code to the superclass? + // TODO can this be simplified by pushing some code to the superclass? return CompletableFuture.supplyAsync(() -> { - ConditionalMutation pcm = Iterables.getOnlyElement(createMutations(cd)); + final ConditionalMutation pcm = Iterables.getOnlyElement(createMutations(cd)); cd.addPrimaryToRejected(); getStats().setRejected(cd.getRejected()); // TODO do async - checkForOrphanedLocks(cd); + try { + checkForOrphanedLocks(cd); + } catch (Exception e) { + throw new CompletionException(e); + } if (checkForAckCollision(pcm)) { cd.commitObserver.alreadyAcknowledged(); } else { @@ -1040,6 +998,12 @@ CompletableFuture getFailureOp(CommitData cd) { class LockOtherStep extends ConditionalStep { + @Override + public AsyncConditionalWriter getACW(CommitData cd) { + return cd.bacw; + } + + @Override public Collection createMutations(CommitData cd) { @@ -1072,7 +1036,7 @@ public Iterator handleUnknown(CommitData cd, Iterator results) { } @Override - public boolean processResults(CommitData cd, Iterator results) { + public boolean processResults(CommitData cd, Iterator results) throws Exception { while (results.hasNext()) { Result result = results.next(); @@ -1092,13 +1056,15 @@ public boolean processResults(CommitData cd, Iterator results) { CompletableFuture getFailureOp(CommitData cd) { return CompletableFuture.supplyAsync(() -> { getStats().setRejected(cd.getRejected()); - checkForOrphanedLocks(cd); + try { + // Does this need to be async? + checkForOrphanedLocks(cd); + } catch (Exception e) { + throw new CompletionException(e); + } return null; }, env.getSharedResources().getSyncCommitExecutor()).thenCompose(v -> rollbackLocks(cd)); - - } - } abstract class BatchWriterStep extends CommitStep { @@ -1119,23 +1085,130 @@ CompletableFuture getFailureOp(CommitData cd) { private CompletableFuture rollbackLocks(CommitData cd) { - // TODO - return null; + CommitStep firstStep = new RollbackOtherLocks(); + firstStep.andThen(new RollbackPrimaryLock()); + + return firstStep.compose(cd) + .thenRun(() -> cd.commitObserver.commitFailed(cd.getShortCollisionMessage())); + + } + + + class RollbackOtherLocks extends BatchWriterStep { + + @Override + public Collection createMutations(CommitData cd) { + // roll back locks + + // TODO let rollback be done lazily? this makes GC more difficult + + Flutation m; + + ArrayList mutations = new ArrayList<>(cd.acceptedRows.size()); + for (Bytes row : cd.acceptedRows) { + m = new Flutation(env, row); + for (Entry entry : updates.get(row).entrySet()) { + if (isReadLock(entry.getValue())) { + m.put(entry.getKey(), + ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, true), + DelReadLockValue.encodeRollback()); + } else { + m.put(entry.getKey(), ColumnConstants.DEL_LOCK_PREFIX | startTs, + DelLockValue.encodeRollback(false, true)); + } + } + mutations.add(m); + } + + return mutations; + } + } + + class RollbackPrimaryLock extends BatchWriterStep { + + @Override + public Collection createMutations(CommitData cd) { + // mark transaction as complete for garbage collection purposes + Flutation m = new Flutation(env, cd.prow); + + m.put(cd.pcol, ColumnConstants.DEL_LOCK_PREFIX | startTs, + DelLockValue.encodeRollback(startTs, true, true)); + m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY); + + return Collections.singletonList(m); + } + } + + class CommittedTestStep extends CommitStep { + CompletableFuture getMainOp(CommitData cd) { + cd.commitObserver.committed(); + return CompletableFuture.completedFuture(true); + } + + CompletableFuture getFailureOp(CommitData cd) { + throw new IllegalStateException("Failure not expected"); + } + } + + @VisibleForTesting + public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp) { + + SyncCommitObserver sco = new SyncCommitObserver(); + cd.commitObserver = sco; + try { + CommitStep firstStep = new GetCommitStampStepTest(commitStamp); + + firstStep.andThen(new WriteNotificationsStep()).andThen(new CommitPrimaryStep()) + .andThen(new CommittedTestStep()); + + firstStep.compose(cd).exceptionally(throwable -> { + cd.commitObserver.failed(throwable); + return null; + }); + sco.waitForCommit(); + } catch (CommitException e) { + return false; + } catch (Exception e) { + throw new FluoException(e); + } + return true; } class GetCommitStampStep extends CommitStep { + protected CompletableFuture getStampOp() { + return env.getSharedResources().getOracleClient().getStampAsync(); + } @Override CompletableFuture getMainOp(CommitData cd) { - // TODO Auto-generated method stub - // TODO set commitTs on commit data - return null; + + return getStampOp().thenApply(commitStamp -> { + if (startTs < commitStamp.getGcTimestamp()) { + return false; + } else { + getStats().setCommitTs(commitStamp.getTxTimestamp()); + return true; + } + }); } @Override CompletableFuture getFailureOp(CommitData cd) { - // TODO Auto-generated method stub - return null; + return rollbackLocks(cd); + } + + } + + class GetCommitStampStepTest extends GetCommitStampStep { + private final Stamp testStamp; + + public GetCommitStampStepTest(Stamp testStamp) { + this.testStamp = testStamp; + } + + @Override + protected CompletableFuture getStampOp() { + return CompletableFuture.completedFuture(testStamp); } } @@ -1144,8 +1217,42 @@ class WriteNotificationsStep extends BatchWriterStep { @Override public Collection createMutations(CommitData cd) { + long commitTs = getStats().getCommitTs(); HashMap mutations = new HashMap<>(); - // TODO copy code from writeNotificationsAsync() + + if (observedColumns.contains(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval)) { + Flutation m = new Flutation(env, cd.prow); + Notification.put(env, m, cd.pcol, commitTs); + mutations.put(cd.prow, m); + } + + for (Entry> rowUpdates : updates.entrySet()) { + + for (Entry colUpdates : rowUpdates.getValue().entrySet()) { + if (observedColumns.contains(colUpdates.getKey())) { + Bytes val = colUpdates.getValue(); + if (isWrite(val) && !isDelete(val)) { + Mutation m = mutations.get(rowUpdates.getKey()); + if (m == null) { + m = new Flutation(env, rowUpdates.getKey()); + mutations.put(rowUpdates.getKey(), m); + } + Notification.put(env, m, colUpdates.getKey(), commitTs); + } + } + } + } + + for (Entry> entry : weakNotifications.entrySet()) { + Mutation m = mutations.get(entry.getKey()); + if (m == null) { + m = new Flutation(env, entry.getKey()); + mutations.put(entry.getKey(), m); + } + for (Column col : entry.getValue()) { + Notification.put(env, m, col, commitTs); + } + } return mutations.values(); } @@ -1155,46 +1262,141 @@ class CommitPrimaryStep extends ConditionalStep { @Override public Collection createMutations(CommitData cd) { - // TODO Auto-generated method stub - return null; + long commitTs = getStats().getCommitTs(); + IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class); + PrewriteIterator.setSnaptime(iterConf, startTs); + boolean isTrigger = isTriggerRow(cd.prow) && cd.pcol.equals(notification.getColumn()); + + Condition lockCheck = + new FluoCondition(env, cd.pcol).setIterators(iterConf).setValue(LockValue.encode(cd.prow, + cd.pcol, isWrite(cd.pval), isDelete(cd.pval), isTrigger, getTransactorID())); + final ConditionalMutation delLockMutation = new ConditionalFlutation(env, cd.prow, lockCheck); + + ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), isDelete(cd.pval), + isReadLock(cd.pval), startTs, commitTs, observedColumns, delLockMutation); + + return Collections.singletonList(delLockMutation); } @Override - public Iterator handleUnknown(CommitData cd, Iterator results) { - // TODO Auto-generated method stub - return null; + public Iterator handleUnknown(CommitData cd, Iterator results) + throws Exception { + // the code for handing this is synchronous and needs to be handled in another thread pool + // TODO - how do we do the above without return a CF? + long commitTs = getStats().getCommitTs(); + Result result = Iterators.getOnlyElement(results); + Status ms = result.getStatus(); + + while (ms == Status.UNKNOWN) { + + // TODO async + TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs); + + switch (txInfo.status) { + case COMMITTED: + if (txInfo.commitTs != commitTs) { + throw new IllegalStateException( + cd.prow + " " + cd.pcol + " " + txInfo.commitTs + "!=" + commitTs); + } + ms = Status.ACCEPTED; + break; + case LOCKED: + // TODO async + ConditionalMutation delLockMutation = result.getMutation(); + ms = cd.cw.write(delLockMutation).getStatus(); + break; + default: + ms = Status.REJECTED; + } + } + Result newResult = new Result(ms, result.getMutation(), result.getTabletServer()); + return Collections.singletonList(newResult).iterator(); } @Override - public boolean processResults(CommitData cd, Iterator results) { - // TODO Auto-generated method stub - return false; + public boolean processResults(CommitData cd, Iterator results) throws Exception { + Result result = Iterators.getOnlyElement(results); + return result.getStatus() == Status.ACCEPTED; } @Override CompletableFuture getFailureOp(CommitData cd) { - // TODO Auto-generated method stub - return null; + cd.commitObserver.commitFailed(cd.getShortCollisionMessage()); + return CompletableFuture.completedFuture(NULLS.get()); } } + @VisibleForTesting + public boolean finishCommit(CommitData cd, Stamp commitStamp) { + getStats().setCommitTs(commitStamp.getTxTimestamp()); + + CommitStep firstStep = new DeleteLocksStep(); + firstStep.andThen(new FinishCommitStep()); + firstStep.compose(cd).exceptionally(throwable -> { + System.err.println("Unexpected exception in finish commit test method : "); + throwable.printStackTrace(); + return null; + }); + + return true; + } + + class DeleteLocksStep extends BatchWriterStep { @Override public Collection createMutations(CommitData cd) { - // TODO Auto-generated method stub - return null; + long commitTs = getStats().getCommitTs(); + ArrayList mutations = new ArrayList<>(updates.size() + 1); + for (Entry> rowUpdates : updates.entrySet()) { + Flutation m = new Flutation(env, rowUpdates.getKey()); + boolean isTriggerRow = isTriggerRow(rowUpdates.getKey()); + for (Entry colUpdates : rowUpdates.getValue().entrySet()) { + ColumnUtil.commitColumn(env, + isTriggerRow && colUpdates.getKey().equals(notification.getColumn()), false, + colUpdates.getKey(), isWrite(colUpdates.getValue()), isDelete(colUpdates.getValue()), + isReadLock(colUpdates.getValue()), startTs, commitTs, observedColumns, m); + } + + mutations.add(m); + } + + return mutations; } } class FinishCommitStep extends BatchWriterStep { + @Override + CompletableFuture getMainOp(CommitData cd) { + return super.getMainOp(cd).thenApply(b -> { + Preconditions.checkArgument(b); + cd.commitObserver.committed(); + return true; + }); + } + @Override public Collection createMutations(CommitData cd) { - // TODO Auto-generated method stub - return null; + long commitTs = getStats().getCommitTs(); + ArrayList afterFlushMutations = new ArrayList<>(2); + + Flutation m = new Flutation(env, cd.prow); + // mark transaction as complete for garbage collection purposes + m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | commitTs, EMPTY); + afterFlushMutations.add(m); + + if (weakNotification != null) { + afterFlushMutations.add(weakNotification.newDelete(env, startTs)); + } + + if (notification != null) { + afterFlushMutations.add(notification.newDelete(env, startTs)); + } + + return afterFlushMutations; } } @@ -1208,21 +1410,23 @@ public synchronized void commitAsync(AsyncCommitObserver commitCallback) { try { CommitData cd = createCommitData(); - beginCommitAsync(cd, commitCallback, null); + cd = setUpBeginCommitAsync(cd, commitCallback, null); + if (cd != null) { + beginCommitAsync(cd); + } } catch (Exception e) { e.printStackTrace(); commitCallback.failed(e); } } - private void beginCommitAsync(CommitData cd, AsyncCommitObserver commitCallback, + private CommitData setUpBeginCommitAsync(CommitData cd, AsyncCommitObserver commitCallback, RowColumn primary) { - if (updates.size() == 0) { // TODO do async deleteWeakRow(); commitCallback.committed(); - return; + return null; } for (Map cols : updates.values()) { @@ -1257,7 +1461,7 @@ private void beginCommitAsync(CommitData cd, AsyncCommitObserver commitCallback, // there are only read locks, so nothing to write deleteWeakRow(); commitCallback.committed(); - return; + return null; } } @@ -1272,360 +1476,48 @@ private void beginCommitAsync(CommitData cd, AsyncCommitObserver commitCallback, cd.commitObserver = commitCallback; - CommitStep firstStep = new LockPrimaryStep(); - - firstStep.andThen(new LockOtherStep()) - .andThen(new GetCommitStampStep()) - .andThen(new WriteNotificationsStep()) - .andThen(new CommitPrimaryStep()) - .andThen(new DeleteLocksStep()) - .andThen(new FinishCommitStep()); - - firstStep.compose(cd); - } - - private void postLockPrimary(final CommitData cd, final ConditionalMutation pcm, Result result) - throws Exception { - final Status mutationStatus = result.getStatus(); - - if (mutationStatus == Status.ACCEPTED) { - lockOtherColumns(cd); - } else { - env.getSharedResources().getSyncCommitExecutor().execute(new SynchronousCommitTask(cd) { - @Override - protected void runCommitStep(CommitData cd) throws Exception { - synchronousPostLockPrimary(cd, pcm, mutationStatus); - } - }); - } - } - - private void synchronousPostLockPrimary(CommitData cd, ConditionalMutation pcm, - Status mutationStatus) throws AccumuloException, AccumuloSecurityException, Exception { - // TODO convert this code to async - while (mutationStatus == Status.UNKNOWN) { - TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs); - - switch (txInfo.status) { - case LOCKED: - mutationStatus = Status.ACCEPTED; - break; - case ROLLED_BACK: - mutationStatus = Status.REJECTED; - break; - case UNKNOWN: - // TODO async - mutationStatus = cd.cw.write(pcm).getStatus(); - // TODO handle case were data other tx has lock - break; - case COMMITTED: - default: - throw new IllegalStateException( - "unexpected tx state " + txInfo.status + " " + cd.prow + " " + cd.pcol); - - } - } - - if (mutationStatus != Status.ACCEPTED) { - cd.addPrimaryToRejected(); - getStats().setRejected(cd.getRejected()); - // TODO do async - checkForOrphanedLocks(cd); - if (checkForAckCollision(pcm)) { - cd.commitObserver.alreadyAcknowledged(); - } else { - cd.commitObserver.commitFailed(cd.getShortCollisionMessage()); - } - return; - } - - lockOtherColumns(cd); - } - - private void lockOtherColumns(CommitData cd) { - ArrayList mutations = new ArrayList<>(); - - for (Entry> rowUpdates : updates.entrySet()) { - ConditionalFlutation cm = null; - - for (Entry colUpdates : rowUpdates.getValue().entrySet()) { - if (cm == null) { - cm = prewrite(rowUpdates.getKey(), colUpdates.getKey(), colUpdates.getValue(), cd.prow, - cd.pcol, false); - } else { - prewrite(cm, colUpdates.getKey(), colUpdates.getValue(), cd.prow, cd.pcol, false); - } - } - - mutations.add(cm); - } - - cd.acceptedRows = new HashSet<>(); - - CompletableFuture> cfuture = cd.bacw.apply(mutations); - addCallback(cfuture, cd, results -> postLockOther(cd, results)); - } - - private void postLockOther(final CommitData cd, Iterator results) throws Exception { - while (results.hasNext()) { - Result result = results.next(); - // TODO handle unknown? - Bytes row = Bytes.of(result.getMutation().getRow()); - if (result.getStatus() == Status.ACCEPTED) { - cd.acceptedRows.add(row); - } else { - cd.addToRejected(row, updates.get(row).keySet()); - } - } - - if (cd.getRejected().size() > 0) { - getStats().setRejected(cd.getRejected()); - env.getSharedResources().getSyncCommitExecutor().execute(new SynchronousCommitTask(cd) { - @Override - protected void runCommitStep(CommitData cd) throws Exception { - checkForOrphanedLocks(cd); - rollbackOtherLocks(cd); - } - }); - } else if (stopAfterPreCommit) { - cd.commitObserver.committed(); - } else { - CompletableFuture cfuture = env.getSharedResources().getOracleClient().getStampAsync(); - addCallback(cfuture, cd, stamp -> beginSecondCommitPhase(cd, stamp)); - } - } - - private void rollbackOtherLocks(CommitData cd) throws Exception { - // roll back locks - - // TODO let rollback be done lazily? this makes GC more difficult - - Flutation m; - - ArrayList mutations = new ArrayList<>(cd.acceptedRows.size()); - for (Bytes row : cd.acceptedRows) { - m = new Flutation(env, row); - for (Entry entry : updates.get(row).entrySet()) { - if (isReadLock(entry.getValue())) { - m.put(entry.getKey(), ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, true), - DelReadLockValue.encodeRollback()); - } else { - m.put(entry.getKey(), ColumnConstants.DEL_LOCK_PREFIX | startTs, - DelLockValue.encodeRollback(false, true)); - } - } - mutations.add(m); - } - - CompletableFuture cfuture = - env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations); - addCallback(cfuture, cd, result -> rollbackPrimaryLock(cd)); - } - - private void rollbackPrimaryLock(CommitData cd) throws Exception { - - // mark transaction as complete for garbage collection purposes - Flutation m = new Flutation(env, cd.prow); - - m.put(cd.pcol, ColumnConstants.DEL_LOCK_PREFIX | startTs, - DelLockValue.encodeRollback(startTs, true, true)); - m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY); - - CompletableFuture cfuture = - env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(m); - addCallback(cfuture, cd, - result -> cd.commitObserver.commitFailed(cd.getShortCollisionMessage())); - } - - private void beginSecondCommitPhase(CommitData cd, Stamp commitStamp) throws Exception { - if (startTs < commitStamp.getGcTimestamp()) { - rollbackOtherLocks(cd); - } else { - // Notification are written here for the following reasons : - // * At this point all columns are locked, this guarantees that anything triggering as a - // result of this transaction will see all of this transactions changes. - // * The transaction is not yet committed. If the process dies at this point whatever - // was running this transaction should rerun and recreate all of the notifications. - // The next transactions will rerun because this transaction will have to be rolled back. - // * If notifications are written in the 2nd phase of commit, then when the 2nd phase - // partially succeeds notifications may never be written. Because in the case of failure - // notifications would not be written until a column is read and it may never be read. - // See https://github.com/fluo-io/fluo/issues/642 - // - // Its very important the notifications which trigger an observer are deleted after the 2nd - // phase of commit finishes. - getStats().setCommitTs(commitStamp.getTxTimestamp()); - writeNotificationsAsync(cd, commitStamp.getTxTimestamp()); - } - } - - private void writeNotificationsAsync(CommitData cd, final long commitTs) { - - HashMap mutations = new HashMap<>(); - - if (observedColumns.contains(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval)) { - Flutation m = new Flutation(env, cd.prow); - Notification.put(env, m, cd.pcol, commitTs); - mutations.put(cd.prow, m); - } - - for (Entry> rowUpdates : updates.entrySet()) { - - for (Entry colUpdates : rowUpdates.getValue().entrySet()) { - if (observedColumns.contains(colUpdates.getKey())) { - Bytes val = colUpdates.getValue(); - if (isWrite(val) && !isDelete(val)) { - Mutation m = mutations.get(rowUpdates.getKey()); - if (m == null) { - m = new Flutation(env, rowUpdates.getKey()); - mutations.put(rowUpdates.getKey(), m); - } - Notification.put(env, m, colUpdates.getKey(), commitTs); - } - } - } - } - - for (Entry> entry : weakNotifications.entrySet()) { - Mutation m = mutations.get(entry.getKey()); - if (m == null) { - m = new Flutation(env, entry.getKey()); - mutations.put(entry.getKey(), m); - } - for (Column col : entry.getValue()) { - Notification.put(env, m, col, commitTs); - } - } - - CompletableFuture cfuture = - env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations.values()); - addCallback(cfuture, cd, result -> commmitPrimary(cd, commitTs)); - } - - private void commmitPrimary(CommitData cd, final long commitTs) { - // try to delete lock and add write for primary column - IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class); - PrewriteIterator.setSnaptime(iterConf, startTs); - boolean isTrigger = isTriggerRow(cd.prow) && cd.pcol.equals(notification.getColumn()); - - Condition lockCheck = - new FluoCondition(env, cd.pcol).setIterators(iterConf).setValue(LockValue.encode(cd.prow, - cd.pcol, isWrite(cd.pval), isDelete(cd.pval), isTrigger, getTransactorID())); - final ConditionalMutation delLockMutation = new ConditionalFlutation(env, cd.prow, lockCheck); - - ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), isDelete(cd.pval), - isReadLock(cd.pval), startTs, commitTs, observedColumns, delLockMutation); - - CompletableFuture> cfuture = - cd.acw.apply(Collections.singletonList(delLockMutation)); - addCallback(cfuture, cd, result -> handleUnkownStatsAfterPrimary(cd, commitTs, delLockMutation, - Iterators.getOnlyElement(result))); + return cd; } - private void handleUnkownStatsAfterPrimary(CommitData cd, final long commitTs, - final ConditionalMutation delLockMutation, Result result) throws Exception { - - final Status mutationStatus = result.getStatus(); - if (mutationStatus == Status.UNKNOWN) { - // the code for handing this is synchronous and needs to be handled in another thread pool - Runnable task = new SynchronousCommitTask(cd) { - @Override - protected void runCommitStep(CommitData cd) throws Exception { - - Status ms = mutationStatus; - - while (ms == Status.UNKNOWN) { - - // TODO async - TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs); - - switch (txInfo.status) { - case COMMITTED: - if (txInfo.commitTs != commitTs) { - throw new IllegalStateException( - cd.prow + " " + cd.pcol + " " + txInfo.commitTs + "!=" + commitTs); - } - ms = Status.ACCEPTED; - break; - case LOCKED: - // TODO async - ms = cd.cw.write(delLockMutation).getStatus(); - break; - default: - ms = Status.REJECTED; - } - } - - postCommitPrimary(cd, commitTs, ms); - } - }; + private void beginCommitAsync(CommitData cd) { - env.getSharedResources().getSyncCommitExecutor().execute(task); - } else { - postCommitPrimary(cd, commitTs, mutationStatus); - } - } + // Notification are written between GetCommitStampStep and CommitPrimaryStep for the following + // reasons : + // * At this point all columns are locked, this guarantees that anything triggering as a + // result of this transaction will see all of this transactions changes. + // * The transaction is not yet committed. If the process dies at this point whatever + // was running this transaction should rerun and recreate all of the notifications. + // The next transactions will rerun because this transaction will have to be rolled back. + // * If notifications are written in the 2nd phase of commit, then when the 2nd phase + // partially succeeds notifications may never be written. Because in the case of failure + // notifications would not be written until a column is read and it may never be read. + // See https://github.com/fluo-io/fluo/issues/642 + // + // Its very important the notifications which trigger an observer are deleted after the 2nd + // phase of commit finishes. - private void postCommitPrimary(CommitData cd, long commitTs, Status mutationStatus) - throws Exception { - if (mutationStatus != Status.ACCEPTED) { - cd.commitObserver.commitFailed(cd.getShortCollisionMessage()); - } else { - if (stopAfterPrimaryCommit) { - cd.commitObserver.committed(); - } else { - deleteLocks(cd, commitTs); - } - } - } - - private void deleteLocks(CommitData cd, final long commitTs) { - // delete locks and add writes for other columns - ArrayList mutations = new ArrayList<>(updates.size() + 1); - for (Entry> rowUpdates : updates.entrySet()) { - Flutation m = new Flutation(env, rowUpdates.getKey()); - boolean isTriggerRow = isTriggerRow(rowUpdates.getKey()); - for (Entry colUpdates : rowUpdates.getValue().entrySet()) { - ColumnUtil.commitColumn(env, - isTriggerRow && colUpdates.getKey().equals(notification.getColumn()), false, - colUpdates.getKey(), isWrite(colUpdates.getValue()), isDelete(colUpdates.getValue()), - isReadLock(colUpdates.getValue()), startTs, commitTs, observedColumns, m); - } - - mutations.add(m); - } + CommitStep firstStep = new LockPrimaryStep(); - CompletableFuture cfuture = - env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations); - addCallback(cfuture, cd, result -> finishCommit(cd, commitTs)); - } + firstStep.andThen(new LockOtherStep()).andThen(new GetCommitStampStep()) + .andThen(new WriteNotificationsStep()).andThen(new CommitPrimaryStep()) + .andThen(new DeleteLocksStep()).andThen(new FinishCommitStep()); - @VisibleForTesting - public boolean finishCommit(CommitData cd, Stamp commitStamp) - throws TableNotFoundException, MutationsRejectedException { - deleteLocks(cd, commitStamp.getTxTimestamp()); - return true; + firstStep.compose(cd).exceptionally(throwable -> { + cd.commitObserver.failed(throwable); + return null; + }); } - private void finishCommit(CommitData cd, long commitTs) { - ArrayList afterFlushMutations = new ArrayList<>(2); - - Flutation m = new Flutation(env, cd.prow); - // mark transaction as complete for garbage collection purposes - m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | commitTs, EMPTY); - afterFlushMutations.add(m); - - if (weakNotification != null) { - afterFlushMutations.add(weakNotification.newDelete(env, startTs)); - } + private void beginCommitAsyncTest(CommitData cd) { - if (notification != null) { - afterFlushMutations.add(notification.newDelete(env, startTs)); - } + CommitStep firstStep = new LockPrimaryStep(); - env.getSharedResources().getBatchWriter().writeMutationsAsync(afterFlushMutations); + firstStep.andThen(new LockOtherStep()).andThen(new CommittedTestStep()); - cd.commitObserver.committed(); + firstStep.compose(cd).exceptionally(throwable -> { + cd.commitObserver.failed(throwable); + return null; + }); } public SnapshotScanner newSnapshotScanner(Span span, Collection columns) { diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java index a17550027..02ed35d6c 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java @@ -20,10 +20,7 @@ import java.util.Map.Entry; import java.util.Set; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Key; @@ -124,12 +121,11 @@ public void close() { tx.close(); } - public CommitData createCommitData() throws TableNotFoundException { + public CommitData createCommitData() { return tx.createCommitData(); } - public boolean preCommit(CommitData cd) throws AlreadyAcknowledgedException, - TableNotFoundException, AccumuloException, AccumuloSecurityException { + public boolean preCommit(CommitData cd) throws AlreadyAcknowledgedException { return tx.preCommit(cd); } @@ -137,13 +133,11 @@ public boolean preCommit(CommitData cd, RowColumn primary) { return tx.preCommit(cd, primary); } - public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp) - throws AccumuloException, AccumuloSecurityException { + public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp) { return tx.commitPrimaryColumn(cd, commitStamp); } - public void finishCommit(CommitData cd, Stamp commitStamp) - throws MutationsRejectedException, TableNotFoundException { + public void finishCommit(CommitData cd, Stamp commitStamp) { tx.finishCommit(cd, commitStamp); env.getSharedResources().getBatchWriter().waitForAsyncFlush(); }