Permalink
Browse files

Merge remote-tracking branch 'neo4j/master'

Conflicts:
	ha/src/test/java/slavetest/SingleJvmWithNettyTest.java
  • Loading branch information...
2 parents e963bc9 + dd25ad5 commit 5c68cc82c9fae78d966dda3acc06ff64099eb79d @rickardoberg rickardoberg committed Mar 30, 2012
@@ -207,12 +207,7 @@ public void messageReceived( ChannelHandlerContext ctx, MessageEvent event )
msgLog.logMessage( "Error handling request", e );
ctx.getChannel().close();
tryToFinishOffChannel( ctx.getChannel() );
- e.printStackTrace();
- if ( e instanceof Exception )
- {
- throw (Exception) e;
- }
- throw new RuntimeException( e );
+ throw Exceptions.launderedException( e );
}
}
@@ -265,17 +260,19 @@ protected void tryToFinishOffChannel( Channel channel, SlaveContext slave )
finishOffChannel( channel, slave );
unmapSlave( channel, slave );
}
- catch ( IllegalStateException e ) // From TxManager.resume (if the tx is already active)
- {
- submitSilent( unfinishedTransactionExecutor, newTransactionFinisher( slave ) );
- }
catch ( Throwable failure ) // Unknown error trying to finish off the tx
{
submitSilent( unfinishedTransactionExecutor, newTransactionFinisher( slave ) );
- msgLog.logMessage( "Could not finish off dead channel", failure );
+ if ( shouldLogFailureToFinishOffChannel( failure ) )
+ msgLog.logMessage( "Could not finish off dead channel", failure );
}
}
+ protected boolean shouldLogFailureToFinishOffChannel( Throwable failure )
+ {
+ return true;
+ }
+
private void submitSilent( ExecutorService service, Runnable job )
{
try
@@ -41,12 +41,14 @@
import org.neo4j.com.ObjectSerializer;
import org.neo4j.com.Protocol;
import org.neo4j.com.RequestType;
+import org.neo4j.com.ResourceReleaser;
import org.neo4j.com.Response;
import org.neo4j.com.Serializer;
import org.neo4j.com.SlaveContext;
import org.neo4j.com.StoreIdGetter;
import org.neo4j.com.StoreWriter;
import org.neo4j.com.ToNetworkStoreWriter;
+import org.neo4j.com.TransactionStream;
import org.neo4j.com.TxExtractor;
import org.neo4j.helpers.Pair;
import org.neo4j.kernel.IdType;
@@ -222,13 +224,34 @@ public Long read( ChannelBuffer buffer, ByteBuffer temporaryBuffer ) throws IOEx
public Response<Void> finishTransaction( SlaveContext context, final boolean success )
{
- return sendRequest( HaRequestType.FINISH, context, new Serializer()
+ try
{
- public void write( ChannelBuffer buffer, ByteBuffer readBuffer ) throws IOException
+ return sendRequest( HaRequestType.FINISH, context, new Serializer()
{
- buffer.writeByte( success ? 1 : 0 );
- }
- }, VOID_DESERIALIZER );
+ public void write( ChannelBuffer buffer, ByteBuffer readBuffer ) throws IOException
+ {
+ buffer.writeByte( success ? 1 : 0 );
+ }
+ }, VOID_DESERIALIZER );
+ }
+ catch ( UnableToResumeTransactionException e )
+ {
+ if ( !success )
+ {
+ /* Here we are in a state where the client failed while the request
+ * was processing on the server and the tx.finish() in the usual
+ * try-finally transaction block gets called, only to find that
+ * the transaction is already active... which is totally expected.
+ * The fact that the transaction is already active here shouldn't
+ * hide the original exception on the client, the exception which
+ * cause the client to fail while the request was processing on the master.
+ * This is effectively the use case of awaiting a lock that isn't granted
+ * within the lock read timeout period.
+ */
+ return new Response<Void>( null, getMyStoreId(), TransactionStream.EMPTY, ResourceReleaser.NO_OP );
+ }
+ throw e;
+ }
}
public void rollbackOngoingTransactions( SlaveContext context )
@@ -569,22 +592,19 @@ public boolean isLock()
this.includesSlaveContext = includesSlaveContext;
}
- protected int timeoutForLocking( int defaultTimeout )
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
+ @Override
public ObjectSerializer getObjectSerializer()
{
return serializer;
}
+ @Override
public MasterCaller getMasterCaller()
{
return caller;
}
+ @Override
public byte id()
{
return (byte) ordinal();
@@ -73,6 +73,12 @@ public void shutdown()
super.shutdown();
getMaster().shutdown();
}
+
+ @Override
+ protected boolean shouldLogFailureToFinishOffChannel( Throwable failure )
+ {
+ return !( failure instanceof UnableToResumeTransactionException );
+ }
public Map<Integer, Collection<SlaveContext>> getSlaveInformation()
{
@@ -399,6 +399,39 @@ protected Boolean executeInTransaction( GraphDatabaseAPI db, Transaction tx )
}
}
+ public static class SetNodePropertyWithThrowJob implements Job<Void>
+ {
+ private final long id;
+ private final String key;
+ private final Object value;
+ private final long firstId;
+
+ public SetNodePropertyWithThrowJob( long firstId, long id, String key, Object value )
+ {
+ this.firstId = firstId;
+ this.id = id;
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public Void execute( GraphDatabaseSPI db )
+ {
+ Transaction tx = db.beginTx();
+ try
+ {
+ tx.acquireWriteLock( db.getNodeById( firstId ) );
+ db.getNodeById( id ).setProperty( key, value );
+ tx.success();
+ return null;
+ }
+ finally
+ {
+ tx.finish();
+ }
+ }
+ }
+
public static class CreateNodesJob extends TransactionalJob<Long[]>
{
private final int count;
@@ -19,19 +19,34 @@
*/
package slavetest;
+import static java.util.Arrays.asList;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.neo4j.helpers.collection.MapUtil.stringMap;
+import static org.neo4j.kernel.HaConfig.CONFIG_KEY_LOCK_READ_TIMEOUT;
+import static org.neo4j.kernel.HaConfig.CONFIG_KEY_READ_TIMEOUT;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.rmi.RemoteException;
import java.util.Collection;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.neo4j.com.Client;
import org.neo4j.com.Client.ConnectionLostHandler;
+import org.neo4j.com.ComException;
import org.neo4j.com.Protocol;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
@@ -539,6 +554,40 @@ public void run()
executeJob( new CommonJobs.HoldLongLock( nodeId, latchFetcher ), 0 );
}
+
+ @Test
+ public void lockWaitTimeoutShouldHaveSilentTxFinishRollingBackToNotHideOriginalException() throws Exception
+ {
+ final long lockTimeout = 1;
+ initializeDbs( 1, stringMap( CONFIG_KEY_LOCK_READ_TIMEOUT, String.valueOf( lockTimeout ) ) );
+ final Long otherNodeId = executeJob( new CommonJobs.CreateNodeJob( true ), 0 );
+ final Fetcher<DoubleLatch> latchFetcher = getDoubleLatch();
+ ExecutorService executor = newFixedThreadPool( 1 );
+ final long refNodeId = getMasterHaDb().getReferenceNode().getId();
+ Future<Void> lockHolder = executor.submit( new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ executeJobOnMaster( new CommonJobs.HoldLongLock( refNodeId, latchFetcher ) );
+ return null;
+ }
+ } );
+
+ DoubleLatch latch = latchFetcher.fetch();
+ latch.countDownFirst();
+ try
+ {
+ executeJob( new CommonJobs.SetNodePropertyWithThrowJob( otherNodeId.longValue(),
+ refNodeId, "key", "value" ), 0 );
+ fail( "Should've failed" );
+ }
+ catch ( ComException e )
+ { // Good
+ }
+ latch.countDownSecond();
+ assertNull( lockHolder.get() );
+ }
@Ignore
@Test

0 comments on commit 5c68cc8

Please sign in to comment.