Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 158 additions & 158 deletions jena-arq/src/main/java/org/apache/jena/query/ARQ.java

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions jena-arq/src/main/java/org/apache/jena/system/Txn.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static <T extends Transactional, X> X calculate(T txn, Supplier<X> r) {
/** Execute application code in a transaction with the given {@link TxnType trasnaction type}. */
public static <T extends Transactional> void exec(T txn, TxnType txnType, Runnable r) {
boolean b = txn.isInTransaction() ;
if ( b )
if ( b )
checkCompatible(txn, txnType);
else
txn.begin(txnType) ;
Expand All @@ -92,7 +92,7 @@ public static <T extends Transactional> void exec(T txn, TxnType txnType, Runnab
/** Execute and return a value in a transaction with the given {@link TxnType trasnaction type}. */
public static <T extends Transactional, X> X calc(T txn, TxnType txnType, Supplier<X> r) {
boolean b = txn.isInTransaction() ;
if ( b )
if ( b )
checkCompatible(txn, txnType);
else
txn.begin(txnType) ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void prepare() {
}

public void commit() {
// XXX Split into READ and WRITE forms.
// The commit point is in TransactionCoordinator.executeCommit().
TxnState s = getState();
if ( s == ACTIVE )
// Auto exec prepare().
Expand All @@ -164,13 +164,17 @@ public void commit() {
case WRITE:
txnMgr.executeCommit(this,
()->{components.forEach((c) -> c.commit()) ; } ,
()->{components.forEach((c) -> c.commitEnd()) ; } ) ;
()->{components.forEach((c) -> c.commitEnd()) ; },
()->{components.forEach((c) -> c.abort()) ; }
) ;
break ;
case READ:
// Different lifecycle?
txnMgr.executeCommit(this,
()->{components.forEach((c) -> c.commit()) ; } ,
()->{components.forEach((c) -> c.commitEnd()) ; } ) ;
()->{components.forEach((c) -> c.commitEnd()) ; } ,
()->{components.forEach((c) -> c.abort()) ; }
) ;
break ;
}
setState(COMMITTED) ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import static org.apache.jena.dboe.transaction.txn.journal.JournalEntryType.UNDO;

import java.io.IOException;
import java.nio.ByteBuffer ;
import java.nio.channels.ClosedByInterruptException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.Semaphore ;
Expand All @@ -30,6 +32,7 @@

import org.apache.jena.atlas.logging.FmtLog;
import org.apache.jena.atlas.logging.Log ;
import org.apache.jena.dboe.base.file.FileException;
import org.apache.jena.dboe.base.file.Location;
import org.apache.jena.dboe.sys.Sys;
import org.apache.jena.dboe.transaction.txn.journal.Journal;
Expand Down Expand Up @@ -64,7 +67,8 @@
*/
final
public class TransactionCoordinator {
private static Logger log = Sys.syslog ;
private static Logger SysLog = Sys.syslog ;
private static Logger SysErr = Sys.errlog;

private final Journal journal ;
// Lock on configuration changes.
Expand Down Expand Up @@ -220,7 +224,7 @@ public void start() {
return ;
}

log.info("Journal recovery start") ;
SysLog.info("Journal recovery start") ;
components.forEachComponent(c -> c.startRecovery()) ;

// Group to commit
Expand All @@ -244,7 +248,7 @@ public void start() {

components.forEachComponent(c -> c.finishRecovery()) ;
journal.reset() ;
log.info("Journal recovery end") ;
SysLog.info("Journal recovery end") ;
}

private void recover(List<JournalEntry> entries) {
Expand Down Expand Up @@ -296,7 +300,7 @@ public void shutdown() {
if ( coordinatorLock == null )
return ;
if ( countActive() > 0 )
FmtLog.warn(log, "Transactions active: W=%d, R=%d", countActiveWriter(), countActiveReaders());
FmtLog.warn(SysErr, "Transactions active: W=%d, R=%d", countActiveWriter(), countActiveReaders());
components.forEach((id, c) -> c.shutdown()) ;
shutdownHooks.forEach((h)-> h.shutdown()) ;
coordinatorLock = null ;
Expand Down Expand Up @@ -569,10 +573,10 @@ private ComponentGroup chooseComponents(ComponentGroup components, TxnType txnTy
cg.forEach((id, c) -> {
TransactionalComponent tcx = components.findComponent(id) ;
if ( ! tcx.equals(c) )
log.warn("TransactionalComponent not in TransactionCoordinator's ComponentGroup") ;
SysLog.warn("TransactionalComponent not in TransactionCoordinator's ComponentGroup") ;
}) ;
if ( log.isDebugEnabled() )
log.debug("Custom ComponentGroup for transaction "+txnType+": size="+cg.size()+" of "+components.size()) ;
if ( SysLog.isDebugEnabled() )
SysLog.debug("Custom ComponentGroup for transaction "+txnType+": size="+cg.size()+" of "+components.size()) ;
return cg ;
}

Expand Down Expand Up @@ -626,9 +630,9 @@ private ComponentGroup chooseComponents(ComponentGroup components, TxnType txnTy

// Now a proto-writer. We need to confirm when inside the synchronized.
synchronized(coordinatorLock) {
// Not read commited.
// Not read committed.
// Need to check the data version once we are the writer and all previous
// writers have commited or aborted.
// writers have committed or aborted.
// Has there been an writer active since the transaction started?

if ( ! checkNoInterveningCommits(transaction) ) {
Expand Down Expand Up @@ -683,28 +687,81 @@ private boolean promotionWaitForWriters() {
// Do here because it needs access to the journal.
notifyPrepareStart(transaction);
transaction.getComponents().forEach(sysTrans -> {
TransactionalComponent c = sysTrans.getComponent() ;
ByteBuffer data = c.commitPrepare(transaction) ;
ByteBuffer data = sysTrans.commitPrepare() ;
if ( data != null ) {
PrepareState s = new PrepareState(c.getComponentId(), data) ;
PrepareState s = new PrepareState(sysTrans.getComponentId(), data) ;
journal.write(s) ;
}
}) ;
notifyPrepareFinish(transaction);
}

/*package*/ void executeCommit(Transaction transaction, Runnable commit, Runnable finish) {
/*package*/ void executeCommit(Transaction transaction, Runnable commit, Runnable finish, Runnable sysabort) {
if ( transaction.getMode() == ReadWrite.READ ) {
finish.run();
notifyCommitFinish(transaction);
return;
}

// This is the commit for a write transaction
journal.startWrite();
try {
executeCommitWriter(transaction, commit, finish, sysabort);
journal.commitWrite();
} catch (TransactionException ex) {
throw ex;
} catch (Throwable th) {
throw th;
} finally { journal.endWrite(); }
}

private void executeCommitWriter(Transaction transaction, Runnable commit, Runnable finish, Runnable sysabort) {
synchronized(coordinatorLock) {
// *** COMMIT POINT
journal.sync() ;
// *** COMMIT POINT
try {
// Simulate a Thread.interrupt during I/O.
// if ( true )
// throw new FileException(new ClosedByInterruptException());

// *** COMMIT POINT
journal.writeJournal(JournalEntry.COMMIT);
journal.sync();
// *** COMMIT POINT
}
// catch (ClosedByInterruptException ex) {}
// Some low level system error - probably a sign of something serious like disk error.
catch(FileException ex) {
if ( ex.getCause() instanceof ClosedByInterruptException ) {
// Thread interrupt during java I/O.
// File was closed by java.nio.
// Reopen - this truncates to the last write start position.
journal.reopen();
// This call should clear up the transaction state.
rollback(transaction, sysabort);
SysLog.warn("Thread interrupt during I/O in 'commit' : executed transaction rollback: "+ex.getMessage());
throw new TransactionException("Thread interrupt during I/O in 'commit' : transaction rollback.", ex);
}
if ( isIOException(ex) )
SysErr.warn("IOException during 'commit' : transaction may have committed. Attempting rollback: "+ex.getMessage());
else
SysErr.warn("Exception during 'commit' : transaction may have committed. Attempting rollback. Details:",ex);
if ( abandonTxn(transaction, sysabort) ) {
SysErr.warn("Transaction rollback");
throw new TransactionException("Exception during 'commit' - transaction rollback.", ex);
}
// Very bad. (This have been dealt with already and should get to here.)
SysErr.error("Transaction rollback failed. System unstable."+
"\nPlease contact users@jena.apache.org, giving details of the environment and this incident.");
throw new Error("Exception during 'rollback' - System unstable.", ex);
}
catch (Throwable ex) {
SysErr.warn("Unexpected Throwable during 'commit' : transaction may have committed. Attempting rollback: ",ex);
if ( abandonTxn(transaction, sysabort) ) {
SysErr.warn("Transaction rollback");
throw new TransactionException("Exception during 'commit' - transaction rollback.", ex);
}
// Very bad. (This should not happen.)
SysErr.error("Transaction rollback failed. System unstable.");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

throw new TransactionException("Exception during 'rollback' - System unstable.", ex);
}

// Now run the Transactions commit actions.
commit.run() ;
journal.truncate(0) ;
Expand All @@ -713,13 +770,53 @@ private boolean promotionWaitForWriters() {
// Bump global serialization point
advanceDataVersion() ;
notifyCommitFinish(transaction) ;
}
}
}

// Inside the global transaction start/commit lock.
private void advanceDataVersion() {
dataVersion.incrementAndGet();
}

/** Test whether the thread is interrupted and if it is, abort the transaction. */
private void abandonIfInterruped(Transaction txn, Runnable sysabort, String msg) {
// Clears interrupted status
if (Thread.interrupted()) {
abandonTxn(txn, sysabort);
Thread.currentThread().interrupt();
throw new TransactionException(msg);
}
}

/**
* Try to abort, including removing the journal entries (including commit if written)
* Return true for succeeded and false for throwable, state unknown.
*/
private boolean abandonTxn(Transaction txn, Runnable sysabort ) {
try {
journal.abortWrite();
rollback(txn, sysabort);
return true;
} catch (Throwable th) {
SysErr.warn("Exception during system rollback", th);
return false;
}
}

private void rollback(Transaction txn, Runnable sysabort) {
txn.setState(TxnState.ACTIVE);
sysabort.run();
txn.setState(TxnState.ABORTED);
}

private boolean isIOException(Throwable ex) {
while (ex != null) {
if ( ex instanceof IOException )
return true;
ex = ex.getCause();
}
return false;
}

/*package*/ void executeAbort(Transaction transaction, Runnable abort) {
notifyAbortStart(transaction) ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ public void commitPrepare() {
@Override
public void commitExec() {
Transaction txn = getValidTransaction() ;
txn.commit() ;
_end() ;
try { txn.commit(); }
finally { _end(); }
}

// /** Signal end of commit phase */
Expand Down
Loading