Skip to content

Commit

Permalink
Fixed an issue where the master cached the wrong masterId for transac…
Browse files Browse the repository at this point in the history
…tions.

Conflicts:

	kernel/src/main/java/org/neo4j/kernel/impl/transaction/xaframework/XaLogicalLog.java
	kernel/src/main/java/org/neo4j/kernel/impl/transaction/xaframework/XaResourceManager.java
  • Loading branch information
thobe committed Feb 4, 2012
1 parent c064ad3 commit bbf0566
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 26 deletions.
Expand Up @@ -19,9 +19,6 @@
*/
package org.neo4j.kernel.impl.transaction.xaframework;

import static java.lang.Math.max;
import static org.neo4j.kernel.impl.transaction.xaframework.LogExtractor.newLogReaderBuffer;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
Expand All @@ -39,6 +36,8 @@
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;

import static java.lang.Math.max;

import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.Pair;
import org.neo4j.kernel.impl.transaction.xaframework.LogEntry.Commit;
Expand All @@ -51,6 +50,8 @@
import org.neo4j.kernel.impl.util.FileUtils;
import org.neo4j.kernel.impl.util.StringLogger;

import static org.neo4j.kernel.impl.transaction.xaframework.LogExtractor.newLogReaderBuffer;

/**
* <CODE>XaLogicalLog</CODE> is a transaction and logical log combined. In
* this log information about the transaction (such as started, prepared and
Expand Down Expand Up @@ -339,7 +340,7 @@ public synchronized void commitOnePhase( int identifier, long txId )
assert txId != -1;
try
{
cacheTxStartPosition( txId, startEntry.getMasterId(), startEntry );
cacheTxStartPosition( txId, startEntry );
LogIoUtils.writeCommit( false, writeBuffer, identifier, txId, System.currentTimeMillis() );
writeBuffer.force();
}
Expand All @@ -350,21 +351,19 @@ public synchronized void commitOnePhase( int identifier, long txId )
}
}

private synchronized void cacheTxStartPosition( long txId, int masterId,
LogEntry.Start startEntry )
private synchronized void cacheTxStartPosition( long txId, LogEntry.Start startEntry )
{
cacheTxStartPosition( txId, masterId, startEntry, logVersion );
cacheTxStartPosition( txId, startEntry, logVersion );
}

private synchronized TxPosition cacheTxStartPosition( long txId, int masterId,
LogEntry.Start startEntry, long logVersion )
private synchronized TxPosition cacheTxStartPosition( long txId, LogEntry.Start startEntry, long logVersion )
{
if ( startEntry.getStartPosition() == -1 )
{
throw new RuntimeException( "StartEntry.position is " + startEntry.getStartPosition() );
}

TxPosition result = new TxPosition( logVersion, masterId, startEntry.getIdentifier(),
TxPosition result = new TxPosition( logVersion, startEntry.getMasterId(), startEntry.getIdentifier(),
startEntry.getStartPosition(), startEntry.getTimeWritten() );
positionCache.putStartPosition( txId, result );
return result;
Expand Down Expand Up @@ -416,7 +415,7 @@ public synchronized void commitTwoPhase( int identifier, long txId )
assert txId != -1;
try
{
cacheTxStartPosition( txId, startEntry.getMasterId(), startEntry );
cacheTxStartPosition( txId, startEntry );
LogIoUtils.writeCommit( true, writeBuffer, identifier, txId, System.currentTimeMillis() );
writeBuffer.force();
}
Expand Down Expand Up @@ -520,7 +519,7 @@ private void applyOnePhaseCommitEntry( LogEntry.OnePhaseCommit commit )
{
XaTransaction xaTx = xaRm.getXaTransaction( xid );
xaTx.setCommitTxId( txId );
cacheTxStartPosition( txId, startEntry.getMasterId(), startEntry );
cacheTxStartPosition( txId, startEntry );
xaRm.injectOnePhaseCommit( xid );
registerRecoveredTransaction( txId );
}
Expand Down Expand Up @@ -580,7 +579,7 @@ private void applyTwoPhaseCommitEntry( LogEntry.TwoPhaseCommit commit ) throws I
{
XaTransaction xaTx = xaRm.getXaTransaction( xid );
xaTx.setCommitTxId( txId );
cacheTxStartPosition( txId, identifier, startEntry );
cacheTxStartPosition( txId, startEntry );
xaRm.injectTwoPhaseCommit( xid );
registerRecoveredTransaction( txId );
}
Expand Down Expand Up @@ -1207,7 +1206,7 @@ private long[] readLogHeader( ReadableByteChannel source, String message ) throw
}

public synchronized void applyTransactionWithoutTxId( ReadableByteChannel byteChannel,
long nextTxId, int masterId ) throws IOException
long nextTxId ) throws IOException
{
if ( nextTxId != (xaTf.getLastCommittedTx() + 1) )
{
Expand Down Expand Up @@ -1246,7 +1245,7 @@ public synchronized void applyTransactionWithoutTxId( ReadableByteChannel byteCh
{
XaTransaction xaTx = xaRm.getXaTransaction( xid );
xaTx.setCommitTxId( nextTxId );
cacheTxStartPosition( nextTxId, masterId, startEntry );
cacheTxStartPosition( nextTxId, startEntry );
xaRm.commit( xid, true );
LogEntry doneEntry = new LogEntry.Done( startEntry.getIdentifier() );
LogIoUtils.writeLogEntry( doneEntry, writeBuffer );
Expand Down Expand Up @@ -1313,8 +1312,7 @@ public synchronized void applyTransaction( ReadableByteChannel byteChannel )
throw new IOException( "Unable to find start entry" );
}
startEntry.setStartPosition( startEntryPosition );
cacheTxStartPosition( logApplier.getCommitEntry().getTxId(),
startEntry.getMasterId(), startEntry );
cacheTxStartPosition( logApplier.getCommitEntry().getTxId(), startEntry );
// System.out.println( "applyFullTx#end @ pos: " + writeBuffer.getFileChannelPosition() );
checkLogRotation();
}
Expand Down Expand Up @@ -1411,10 +1409,10 @@ public synchronized long rotate() throws IOException
msgLog.logMessage( "Rotate log first start entry @ pos=" +
firstEntryPosition + " out of " + xidIdentMap );
}

LogBuffer newLogBuffer = instantiateCorrectWriteBuffer( newLog );
copyPartiallyWrittenTransactionsToTheNewLog( newLogBuffer );

newLogBuffer.force();
newLog.position( newLogBuffer.getFileChannelPosition() );
msgLog.logMessage( "Rotate: old log scanned, newLog @ pos=" +
Expand Down Expand Up @@ -1476,8 +1474,7 @@ else if ( entry instanceof LogEntry.Commit )
else
{
TxPosition oldPos = positionCache.getStartPosition( commitEntry.getTxId() );
TxPosition newPos = cacheTxStartPosition( commitEntry.getTxId(),
startEntry.getMasterId(), startEntry, logVersion+1 );
TxPosition newPos = cacheTxStartPosition( commitEntry.getTxId(), startEntry, logVersion+1 );
msgLog.logMessage( "Updated tx " + ((LogEntry.Commit) entry ).getTxId() +
" from " + oldPos + " to " + newPos );
}
Expand Down
Expand Up @@ -48,7 +48,7 @@ public class XaResourceManager
private final ArrayMap<Xid,XidStatus> xidMap =
new ArrayMap<Xid,XidStatus>();
private int recoveredTxCount = 0;
private Set<TransactionInfo> recoveredDoneRecords = new HashSet<TransactionInfo>();
private final Set<TransactionInfo> recoveredDoneRecords = new HashSet<TransactionInfo>();

private XaLogicalLog log = null;
private final XaTransactionFactory tf;
Expand Down Expand Up @@ -208,7 +208,7 @@ synchronized void destroy( XAResource xaResource )
private static class XidStatus
{
private boolean active = true;
private TransactionStatus txStatus;
private final TransactionStatus txStatus;

XidStatus( XaTransaction xaTransaction )
{
Expand Down Expand Up @@ -360,7 +360,7 @@ synchronized boolean injectPrepare( Xid xid ) throws IOException
}
}

private Map<Xid,Integer> txOrderMap = new HashMap<Xid,Integer>();
private final Map<Xid,Integer> txOrderMap = new HashMap<Xid,Integer>();
private int nextTxOrder = 0;

// called during recovery
Expand Down Expand Up @@ -735,8 +735,7 @@ public synchronized long applyPreparedTransaction(
ReadableByteChannel transaction ) throws IOException
{
long txId = TxIdGenerator.DEFAULT.generate( dataSource, 0 );
int masterId = txIdGenerator.getCurrentMasterId();
log.applyTransactionWithoutTxId( transaction, txId, masterId );
log.applyTransactionWithoutTxId( transaction, txId );
return txId;
}

Expand Down

0 comments on commit bbf0566

Please sign in to comment.