Skip to content

Commit

Permalink
Fixed a problem where copying over active transactions to the new log…
Browse files Browse the repository at this point in the history
… (when rotating) would accidentally mark all copied transactions as active again, by putting them into xidIdentMap. This would completely mess up the next rotation in that virtually all transactions would have to be copied, filling the new log immediately. Fixed by only putting in xidIdentMap if it already was active

(cherry picked from commit bbd4318)
  • Loading branch information
tinwelint committed Jan 28, 2012
1 parent 7472088 commit f0b7243
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -1410,45 +1408,45 @@ public synchronized long rotate() throws IOException
long firstEntryPosition = getFirstStartEntry( endPosition );
fileChannel.position( firstEntryPosition );
msgLog.logMessage( "Rotate log first start entry @ pos=" +
firstEntryPosition );
firstEntryPosition + " out of " + xidIdentMap );
}
LogEntry entry;
// Set<Integer> startEntriesWritten = new HashSet<Integer>();
LogBuffer newLogBuffer = instantiateCorrectWriteBuffer( newLog );
boolean foundFirstStrayTx = false;
Set<Integer> identsWithStartEntry = new HashSet<Integer>();
while ((entry = LogIoUtils.readEntry( sharedBuffer, fileChannel, cf )) != null )
{
if ( !foundFirstStrayTx && xidIdentMap.get( entry.getIdentifier() ) != null )
{
foundFirstStrayTx = true;
}
if ( foundFirstStrayTx )

// Copy over active transactions to the new log
boolean foundFirstActiveTx = false;
Map<Integer,LogEntry.Start> transactionsToCopy = new HashMap<Integer,LogEntry.Start>();
for ( LogEntry entry = null; (entry = LogIoUtils.readEntry( sharedBuffer, fileChannel, cf )) != null; )
{
Integer identifier = entry.getIdentifier();
boolean isActive = xidIdentMap.get( identifier ) != null;
if ( !foundFirstActiveTx && isActive ) foundFirstActiveTx = true;
if ( foundFirstActiveTx )
{
if ( entry instanceof LogEntry.Start )
{
identsWithStartEntry.add( entry.getIdentifier() );
LogEntry.Start startEntry = (LogEntry.Start) entry;
transactionsToCopy.put( identifier, startEntry );
startEntry.setStartPosition( newLogBuffer.getFileChannelPosition() ); // newLog.position() );
// overwrite old start entry with new that has updated position
xidIdentMap.put( startEntry.getIdentifier(), startEntry );
// startEntriesWritten.add( entry.getIdentifier() );
// If the transaction is active then update it with the new one
if ( isActive ) xidIdentMap.put( identifier, startEntry );
}
if ( identsWithStartEntry.contains( entry.getIdentifier() ) )
if ( transactionsToCopy.containsKey( identifier ) )
{
if ( entry instanceof LogEntry.Commit )
{
LogEntry.Start startEntry = xidIdentMap.get( entry.getIdentifier() );
LogEntry.Start startEntry = transactionsToCopy.get( identifier );
LogEntry.Commit commitEntry = (LogEntry.Commit) entry;
TxPosition oldPos = positionCache.getStartPosition( commitEntry.getTxId() );
TxPosition newPos = cacheTxStartPosition( commitEntry.getTxId(), startEntry.getMasterId(), startEntry, logVersion+1 );
TxPosition newPos = cacheTxStartPosition( commitEntry.getTxId(),
startEntry.getMasterId(), startEntry, logVersion+1 );
msgLog.logMessage( "Updated tx " + ((LogEntry.Commit) entry ).getTxId() +
" from " + oldPos + " to " + newPos );
}
LogIoUtils.writeLogEntry( entry, newLogBuffer );
}
}
}

newLogBuffer.force();
newLog.position( newLogBuffer.getFileChannelPosition() );
msgLog.logMessage( "Rotate: old log scanned, newLog @ pos=" +
Expand Down
36 changes: 35 additions & 1 deletion neo4j/src/test/java/recovery/TestMoveTxToNewLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.neo4j.helpers.Exceptions.launderedException;

import java.io.IOException;
import java.lang.reflect.Field;
Expand All @@ -33,6 +34,7 @@
import org.neo4j.kernel.AbstractGraphDatabase;
import org.neo4j.kernel.Config;
import org.neo4j.kernel.impl.transaction.TxManager;
import org.neo4j.kernel.impl.transaction.xaframework.XaLogicalLog;
import org.neo4j.kernel.impl.transaction.xaframework.XaResourceManager;
import org.neo4j.kernel.impl.util.ArrayMap;
import org.neo4j.test.AbstractSubProcessTestBase;
Expand Down Expand Up @@ -194,6 +196,34 @@ public void run( AbstractGraphDatabase graphdb )
}
}

/* Verifies the number of active transactions in XaLogicalLog */
private static class VerifyActiveTransactions implements Task
{
private final int count;

public VerifyActiveTransactions( int count )
{
this.count = count;
}

@SuppressWarnings( "rawtypes" )
@Override
public void run( AbstractGraphDatabase graphdb )
{
try
{
XaLogicalLog log = graphdb.getConfig().getTxModule().getXaDataSourceManager().getXaDataSource(
Config.DEFAULT_DATA_SOURCE_NAME ).getXaContainer().getLogicalLog();
ArrayMap activeXids = (ArrayMap) inaccessibleField( log, "xidIdentMap" ).get( log );
assertEquals( count, activeXids.size() );
}
catch ( Exception e )
{
throw launderedException( e );
}
}
}

/* Verifies that the data is in a correct state despite the steps performed in this test
* to try to break data consistency. */
private static class VerifyTask implements Task
Expand Down Expand Up @@ -286,11 +316,15 @@ public void executeTx_A_Then_B_Where_A_FailsBeforeDoneRecordThenRotateAndRecover
run( new CreateNamedNodeTask() );
waitForDone.await();

run( new VerifyActiveTransactions( 1 ) );

/* Here we should have ended up with an active nioneo logical log that contains tx A (w/o DONE record)
* and then B (w/ a DONE record). Verify that somehow?
* Now rotate the logical log (this will make tx A to be copied over to new log) */
* Now rotate the logical log (this should make A and everything after that to be copied over to new log) */
run( new RotateTask() );

run( new VerifyActiveTransactions( 1 ) );

/* Restart the db. Here a recovery will take place and should apply transaction A in
* that process so that the relationship created in transaction B is lost. */
restart();
Expand Down

0 comments on commit f0b7243

Please sign in to comment.