Skip to content

Commit

Permalink
Merge pull request #602 from afs/jena1746-tdb2-abort
Browse files Browse the repository at this point in the history
JENA-1746: TDB2 abort
  • Loading branch information
afs committed Sep 11, 2019
2 parents 71d23d7 + 4eafab1 commit c1a8403
Show file tree
Hide file tree
Showing 14 changed files with 694 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public static Transactional createTransactional(Journal journal, TransactionalCo
return createTransactional(coord, elements);
}

private static Transactional createTransactional(TransactionCoordinator coord, TransactionalComponent[] elements) {
private static Transactional createTransactional(TransactionCoordinator coord, TransactionalComponent... elements) {
for ( TransactionalComponent tc : elements ) {
coord.add(tc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

package org.apache.jena.dboe.transaction.txn;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

Expand All @@ -39,6 +36,10 @@ private ComponentGroup(Map<ComponentId, TransactionalComponent> group) {
this.group.putAll(group);
}

public void addAll(Collection<TransactionalComponent> components) {
components.forEach(this::add);
}

public void add(TransactionalComponent component) {
Objects.requireNonNull(component);
//Log.info(this, "add("+component.getComponentId()+")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public boolean promote(boolean readCommitted) {
checkState(ACTIVE);
if ( txnType == TxnType.READ )
return false;
boolean b = txnMgr.promoteTxn(this, readCommitted);
boolean b = txnMgr.executePromote(this, readCommitted);
if ( !b )
return false;
mode = ReadWrite.WRITE;
Expand Down Expand Up @@ -160,6 +160,7 @@ public void commit() {
prepare();
checkState(PREPARE);
setState(COMMIT);
// Sys abort -> state?
switch(mode) {
case WRITE:
txnMgr.executeCommit(this,
Expand Down Expand Up @@ -195,10 +196,12 @@ public void abort() {
}

public void end() {
// [1746]
// txnMgr.executeEnd(thus, ()->{});
txnMgr.notifyEndStart(this);
if ( isWriteTxn() && getState() == ACTIVE ) {
//Log.warn(this, "Write transaction with no commit() or abort() before end()");
// Just the abort process.
// Just abort process.
abort$();
endInternal();
throw new TransactionException("Write transaction with no commit() or abort() before end() - forced abort");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;

import org.apache.jena.atlas.logging.FmtLog;
import org.apache.jena.atlas.logging.Log;
Expand Down Expand Up @@ -75,8 +76,9 @@ public class TransactionCoordinator {
private boolean configurable = true;

private final ComponentGroup components = new ComponentGroup();
private final List<TransactionListener> listeners = new ArrayList<>();

// Components
private ComponentGroup txnComponents = null;
private List<ShutdownHook> shutdownHooks;
private TxnIdGenerator txnIdGenerator = TxnIdFactory.txnIdGenSimple;

Expand Down Expand Up @@ -138,10 +140,8 @@ public TransactionCoordinator(Journal journal, List<TransactionalComponent> comp
private TransactionCoordinator(Journal journal, List<TransactionalComponent> txnComp, List<ShutdownHook> shutdownHooks) {
this.journal = journal;
this.shutdownHooks = new ArrayList<>(shutdownHooks);
if ( txnComp != null ) {
//txnComp.forEach(x-> System.out.println(x.getComponentId().label()+" :: "+Bytes.asHex(x.getComponentId().bytes()) ) );
txnComp.forEach(components::add);
}
if ( txnComp != null )
components.addAll(txnComp);
}

/** Add a {@link TransactionalComponent}.
Expand All @@ -167,6 +167,18 @@ public TransactionCoordinator remove(TransactionalComponent elt) {
return this;
}

public TransactionCoordinator addListener(TransactionListener listener) {
checklAllowModification();
listeners.add(listener);
return this;
}

public TransactionCoordinator removeListener(TransactionListener listener) {
checklAllowModification();
listeners.remove(listener);
return this;
}

/**
* Perform modification of this {@code TransactionCoordiator} after it has been
* started.
Expand All @@ -179,7 +191,7 @@ public TransactionCoordinator remove(TransactionalComponent elt) {
* <p>
* Use with care!
*/
public void modify(Runnable action) {
public void modifyConfig(Runnable action) {
try {
startExclusiveMode();
configurable = true;
Expand All @@ -190,6 +202,11 @@ public void modify(Runnable action) {
}
}

/** Call the action for each listener */
private void listeners(Consumer<TransactionListener> action) {
listeners.forEach(x->action.accept(x));
}

/**
* Add a shutdown hook. Shutdown is not guaranteed to be called
* and hence hooks may not get called.
Expand Down Expand Up @@ -533,6 +550,7 @@ public Transaction begin(TxnType txnType, boolean canBlock) {
Transaction transaction = begin$(txnType);
startActiveTransaction(transaction);
transaction.begin();
notifyBegin(transaction);
return transaction;
}

Expand Down Expand Up @@ -589,17 +607,21 @@ private ComponentGroup chooseComponents(ComponentGroup components, TxnType txnTy
}

/** Attempt to promote a transaction from READ mode to WRITE mode based.
* Whether intevening commits are seen is determined by the boolean flag.
* Return true if the transaction is already a writer.
* Whether intervening commits are seen is determined by the boolean flag.
* Return true if the transaction is already a writer.
*/
/*package*/ boolean promoteTxn(Transaction transaction, boolean readCommittedPromotion) {
/*package*/ boolean executePromote(Transaction transaction, boolean readCommittedPromotion) {
if ( transaction.getMode() == ReadWrite.WRITE )
return true;
// While this code allows promotion of TxnType.READ, this ability is usually rejected
// Even if promotion of TxnType.READ allowed, this ability is usually rejected
// by the transaction system around it. e.g. TransactionalBase.
if ( transaction.getTxnType() == TxnType.READ )
throw new TransactionException("promote: can't promote a READ transaction");
return promoteTxn$(transaction, readCommittedPromotion);

notifyPromoteStart(transaction);
boolean b = promoteTxn$(transaction, readCommittedPromotion);
notifyPromoteFinish(transaction);
return b;
}

private boolean promoteTxn$(Transaction transaction, boolean readCommittedPromotion) {
Expand Down Expand Up @@ -689,8 +711,11 @@ private boolean promotionWaitForWriters() {
/*package*/ void completed(Transaction transaction) {
finishActiveTransaction(transaction);
journal.reset();
notifyEnd(transaction);
}

// Internally, an APi call "commit" is "prepare then commit".

/*package*/ void executePrepare(Transaction transaction) {
// Do here because it needs access to the journal.
notifyPrepareStart(transaction);
Expand All @@ -705,7 +730,13 @@ private boolean promotionWaitForWriters() {
}

/*package*/ void executeCommit(Transaction transaction, Runnable commit, Runnable finish, Runnable sysabort) {
notifyCommitStart(transaction);
if ( transaction.getMode() == ReadWrite.READ ) {

//[1746]
//executeCommitReader();
// No commit on components, all "end".
// Make abort the same?
finish.run();
notifyCommitFinish(transaction);
return;
Expand All @@ -719,6 +750,7 @@ private boolean promotionWaitForWriters() {
} catch (Throwable th) {
throw th;
} finally { journal.endWrite(); }
notifyCommitFinish(transaction);
}

private void executeCommitWriter(Transaction transaction, Runnable commit, Runnable finish, Runnable sysabort) {
Expand Down Expand Up @@ -777,7 +809,6 @@ private void executeCommitWriter(Transaction transaction, Runnable commit, Runna
finish.run();
// Bump global serialization point
advanceDataVersion();
notifyCommitFinish(transaction);
}
}

Expand Down Expand Up @@ -879,38 +910,70 @@ private void finishActiveTransaction(Transaction transaction) {
public long countActive() { return activeTransactionCount.get(); }

// notify*Start/Finish called round each transaction lifecycle step
// Called in cooperation between Transaction and TransactionCoordinator
// depending on who is actually do the work of each step.

/*package*/ void notifyPrepareStart(Transaction transaction) {}
private void notifyBegin(Transaction transaction) {
listeners(x -> x.notifyTxnStart(transaction));
}

/*package*/ void notifyPrepareFinish(Transaction transaction) {}
private void notifyEnd(Transaction transaction) {
listeners(x -> x.notifyTxnFinish(transaction));
}

private void notifyPromoteStart(Transaction transaction) {
listeners(x -> x.notifyPromoteStart(transaction));
}

private void notifyPromoteFinish(Transaction transaction) {
listeners(x -> x.notifyPromoteFinish(transaction));
}

private void notifyPrepareStart(Transaction transaction) {
listeners(x -> x.notifyPrepareStart(transaction));
}

private void notifyPrepareFinish(Transaction transaction) {
listeners(x -> x.notifyPrepareFinish(transaction));
}

// Writers released here - can happen because of commit() or abort().

private void notifyCommitStart(Transaction transaction) {}
private void notifyCommitStart(Transaction transaction) {
listeners(x -> x.notifyCommitStart(transaction));
}

private void notifyCommitFinish(Transaction transaction) {
private void notifyCommitFinish(Transaction transaction) {
listeners(x->x.notifyCommitFinish(transaction));
if ( transaction.getMode() == ReadWrite.WRITE )
releaseWriterLock();
}

private void notifyAbortStart(Transaction transaction) { }
private void notifyAbortStart(Transaction transaction) {
listeners(x->x.notifyAbortStart(transaction));
}

private void notifyAbortFinish(Transaction transaction) {
listeners(x->x.notifyAbortFinish(transaction));
if ( transaction.getMode() == ReadWrite.WRITE )
releaseWriterLock();
}

/*package*/ void notifyEndStart(Transaction transaction) { }
/*package*/ void notifyEndStart(Transaction transaction) {
listeners(x->x.notifyEndStart(transaction));
}

/*package*/ void notifyEndFinish(Transaction transaction) {}
/*package*/ void notifyEndFinish(Transaction transaction) {
listeners(x->x.notifyEndFinish(transaction));
}

// Called by Transaction once at the end of first commit()/abort() or end()

/*package*/ void notifyCompleteStart(Transaction transaction) { }
/*package*/ void notifyCompleteStart(Transaction transaction) {
listeners(x -> x.notifyCompleteStart(transaction));
}

/*package*/ void notifyCompleteFinish(Transaction transaction) { }
/*package*/ void notifyCompleteFinish(Transaction transaction) {
listeners(x -> x.notifyCompleteFinish(transaction));
}

// Coordinator state.
private final AtomicLong countBegin = new AtomicLong(0);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.jena.dboe.transaction.txn;

/** Call backs for the transaction lifecycle. */
public interface TransactionListener {

/** A transaction has started; begin has done the setup. */
public default void notifyTxnStart(Transaction transaction) { }

/** Start a call to promote */
public default void notifyPromoteStart(Transaction transaction) { }
/** Finish a call to promote */
public default void notifyPromoteFinish(Transaction transaction) { }

/** Start prepare during a commit */
public default void notifyPrepareStart(Transaction transaction) { }
/** Finish prepare during a commit */
public default void notifyPrepareFinish(Transaction transaction) { }

/** Start a commit (prepare has been done) */
public default void notifyCommitStart(Transaction transaction) { }
/** Finish a commit (prepare has been done) */
public default void notifyCommitFinish(Transaction transaction) { }

/** Start an abort */
public default void notifyAbortStart(Transaction transaction) { }
/** Start finish an abort */
public default void notifyAbortFinish(Transaction transaction) { }

/** Start an end() */
public default void notifyEndStart(Transaction transaction) { }
/** Finish an end() */
public default void notifyEndFinish(Transaction transaction) { }

/** Start the complete step. */
public default void notifyCompleteStart(Transaction transaction) { }
/** Finish the complete step. */
public default void notifyCompleteFinish(Transaction transaction) { }

/** Transaction has finished. This is called during "complete" */
public default void notifyTxnFinish(Transaction transaction) { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ protected X _begin(ReadWrite readWrite, TxnId txnId) {
return null;
}

@Override
protected X _promote(TxnId txnId, X state) { return null; }

@Override
protected ByteBuffer _commitPrepare(TxnId txnId, X state) {
return null;
Expand All @@ -69,9 +72,5 @@ protected void _complete(TxnId txnId, X state) {}

@Override
protected void _shutdown() {}

@Override
protected X _promote(TxnId txnId, X state) { return null; }

}

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class TestTxnSwitching {

//Transactional transactional = TransactionalFactory.create(jrnl, integer);
TransactionalBase transactional;
TransactionCoordinator txnMgr = new TransactionCoordinator(jrnl); {
TransactionCoordinator txnMgr = new TransactionCoordinator(jrnl); {
txnMgr.add(integer);
transactional = new TransactionalBase(txnMgr);
txnMgr.start();
Expand Down

0 comments on commit c1a8403

Please sign in to comment.