Skip to content
Permalink
Browse files
https://issues.apache.org/activemq/browse/AMQNET-254
Adds more failover handling, basic failover is working now.
  • Loading branch information
Timothy A. Bish committed Jul 8, 2010
1 parent a5e241f commit 14726f39fa399fa84e67a1899ea293d0460528e2
Show file tree
Hide file tree
Showing 18 changed files with 247 additions and 15 deletions.
@@ -19,6 +19,7 @@
using System.Collections;

using Apache.NMS.Util;
using Apache.NMS.Stomp.State;

namespace Apache.NMS.Stomp.Commands
{
@@ -264,6 +265,11 @@ public override bool IsMessage
}
}

public override Response visit(ICommandVisitor visitor)
{
return visitor.processMessage( this );
}

};
}

@@ -17,6 +17,7 @@

using System;
using System.Collections;
using Apache.NMS.Stomp.State;

namespace Apache.NMS.Stomp.Commands
{
@@ -75,6 +76,11 @@ public override bool IsConnectionError
}
}

public override Response visit(ICommandVisitor visitor)
{
return visitor.processConnectionError( this );
}

};
}

@@ -17,6 +17,7 @@

using System;
using System.Collections;
using Apache.NMS.Stomp.State;

namespace Apache.NMS.Stomp.Commands
{
@@ -91,6 +92,11 @@ public override bool IsConnectionInfo
}
}

public override Response visit(ICommandVisitor visitor)
{
return visitor.processAddConnection( this );
}

};
}

@@ -19,6 +19,7 @@
using System.Collections;

using Apache.NMS;
using Apache.NMS.Stomp.State;

namespace Apache.NMS.Stomp.Commands
{
@@ -167,6 +168,11 @@ public override bool IsConsumerInfo
}
}

public override Response visit(ICommandVisitor visitor)
{
return visitor.processAddConsumer( this );
}

};
}

@@ -18,6 +18,8 @@
using System;
using System.Collections;

using Apache.NMS.Stomp.State;

namespace Apache.NMS.Stomp.Commands
{
public class MessageAck : BaseCommand
@@ -115,6 +117,11 @@ public override bool IsMessageAck
}
}

public override Response visit(ICommandVisitor visitor)
{
return visitor.processMessageAck( this );
}

};
}

@@ -18,6 +18,8 @@
using System;
using System.Collections;

using Apache.NMS.Stomp.State;

namespace Apache.NMS.Stomp.Commands
{
public class MessageDispatch : BaseCommand
@@ -133,6 +135,11 @@ public override bool IsMessageDispatch
}
}

public override Response visit(ICommandVisitor visitor)
{
return visitor.processMessageDispatch( this );
}

};
}

@@ -18,6 +18,8 @@
using System;
using System.Collections;

using Apache.NMS.Stomp.State;

namespace Apache.NMS.Stomp.Commands
{
public class ProducerInfo : BaseCommand
@@ -83,6 +85,11 @@ public override bool IsProducerInfo
}
}

public override Response visit(ICommandVisitor visitor)
{
return visitor.processAddProducer( this );
}

};
}

@@ -17,6 +17,7 @@

using System;
using System.Collections;
using Apache.NMS.Stomp.State;

namespace Apache.NMS.Stomp.Commands
{
@@ -61,6 +62,29 @@ public override bool IsRemoveInfo
}
}

///
/// <summery>
/// Allows a Visitor to visit this command and return a response to the
/// command based on the command type being visited. The command will call
/// the proper processXXX method in the visitor.
/// </summery>
///
public override Response visit(ICommandVisitor visitor)
{
switch(objectId.GetDataStructureType())
{
case DataStructureTypes.ConnectionIdType:
return visitor.processRemoveConnection((ConnectionId) objectId);
case DataStructureTypes.SessionIdType:
return visitor.processRemoveSession((SessionId) objectId);
case DataStructureTypes.ConsumerIdType:
return visitor.processRemoveConsumer((ConsumerId) objectId);
case DataStructureTypes.ProducerIdType:
return visitor.processRemoveProducer((ProducerId) objectId);
default:
throw new IOException("Unknown remove command type: " + objectId.GetDataStructureType());
}
}
};
}

@@ -17,6 +17,7 @@

using System;
using System.Collections;
using Apache.NMS.Stomp.State;

namespace Apache.NMS.Stomp.Commands
{
@@ -92,6 +93,11 @@ public override bool IsRemoveSubscriptionInfo
}
}

public override Response visit(ICommandVisitor visitor)
{
return visitor.processRemoveSubscriptionInfo( this );
}

};
}

@@ -18,6 +18,8 @@
using System;
using System.Collections;

using Apache.NMS.Stomp.State;

namespace Apache.NMS.Stomp.Commands
{
public class SessionInfo : BaseCommand
@@ -74,6 +76,11 @@ public override bool IsSessionInfo
}
}

public override Response visit(ICommandVisitor visitor)
{
return visitor.processAddSession( this );
}

};
}

@@ -18,6 +18,8 @@
using System;
using System.Collections;

using Apache.NMS.Stomp.State;

namespace Apache.NMS.Stomp.Commands
{
public class SubscriptionInfo : BaseDataStructure
@@ -18,6 +18,8 @@
using System;
using System.Collections;

using Apache.NMS.Stomp.State;

namespace Apache.NMS.Stomp.Commands
{
public class TransactionInfo : BaseCommand
@@ -87,6 +89,20 @@ public override bool IsTransactionInfo
}
}

public override Response visit(ICommandVisitor visitor)
{
switch(type)
{
case TransactionInfo.BEGIN:
return visitor.processBeginTransaction(this);
case TransactionInfo.COMMIT:
return visitor.processCommitTransaction(this);
case TransactionInfo.ROLLBACK:
return visitor.processRollbackTransaction(this);
default:
throw new IOException("Transaction info type unknown: " + type);
}
}
};
}

@@ -20,6 +20,7 @@
using System.Threading;
using Apache.NMS.Stomp.Commands;
using Apache.NMS.Stomp.Transport;
using Apache.NMS.Stomp.Transport.Failover;
using Apache.NMS.Stomp.Util;
using Apache.NMS.Util;

@@ -61,6 +62,7 @@ public class Connection : IConnection
private ConnectionMetaData metaData = null;
private bool disposed = false;
private IdGenerator clientIdGenerator;
private CountDownLatch transportInterruptionProcessingComplete;

public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
{
@@ -532,6 +534,9 @@ protected void OnCommand(ITransport commandTransport, Command command)
{
if(command is MessageDispatch)
{
// We wait if the Connection is still processing interruption
// code to reset the MessageConsumers.
WaitForTransportInterruptionProcessingToComplete();
DispatchMessage((MessageDispatch) command);
}
else if(command is ConnectionError)
@@ -606,6 +611,12 @@ protected void OnTransportInterrupted(ITransport sender)
{
Tracer.Debug("Transport has been Interrupted.");

this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.Count);
if(Tracer.IsDebugEnabled)
{
Tracer.Debug("transport interrupted, dispatchers: " + dispatchers.Count);
}

foreach(Session session in this.sessions)
{
session.ClearMessagesInProgress();
@@ -682,5 +693,28 @@ protected SessionInfo CreateSessionInfo(AcknowledgementMode sessionAcknowledgeme
answer.SessionId = sessionId;
return answer;
}

private void WaitForTransportInterruptionProcessingToComplete()
{
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
if(cdl != null)
{
if(!closed && cdl.Remaining > 0)
{
Tracer.Warn("dispatch paused, waiting for outstanding dispatch interruption " +
"processing (" + cdl.Remaining + ") to complete..");
cdl.await(TimeSpan.FromSeconds(10));
}
}
}

internal void TransportInterruptionProcessingComplete()
{
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
if(cdl != null)
{
cdl.countDown();
}
}
}
}
@@ -53,6 +53,7 @@ public class MessageConsumer : IMessageConsumer, IDispatcher
private int dispatchedCount = 0;
private volatile bool synchronizationRegistered = false;
private bool clearDispatchList = false;
private bool inProgressClearRequiredFlag;

private event MessageListener listener;

@@ -314,16 +315,38 @@ public void Stop()
this.unconsumedMessages.Stop();
}

public void ClearMessagesInProgress()
internal void InProgressClearRequired()
{
// we are called from inside the transport reconnection logic
// which involves us clearing all the connections' consumers
// dispatch lists and clearing them
// so rather than trying to grab a mutex (which could be already
// owned by the message listener calling the send) we will just set
// a flag so that the list can be cleared as soon as the
// dispatch thread is ready to flush the dispatch list
this.clearDispatchList = true;
inProgressClearRequiredFlag = true;
// deal with delivered messages async to avoid lock contention with in progress acks
clearDispatchList = true;
}

internal void ClearMessagesInProgress()
{
if(inProgressClearRequiredFlag)
{
// Called from a thread in the ThreadPool, so we wait until we can
// get a lock on the unconsumed list then we clear it.
lock(this.unconsumedMessages)
{
if(inProgressClearRequiredFlag)
{
if(Tracer.IsDebugEnabled)
{
Tracer.Debug(this.ConsumerId + " clearing dispatched list (" +
this.unconsumedMessages.Count + ") on transport interrupt");
}

this.unconsumedMessages.Clear();
this.synchronizationRegistered = false;

// allow dispatch on this connection to resume
this.session.Connection.TransportInterruptionProcessingComplete();
this.inProgressClearRequiredFlag = false;
}
}
}
}

public void DeliverAcks()
@@ -744,8 +767,6 @@ private void Rollback()

redeliveryDelay = this.redeliveryPolicy.RedeliveryDelay(currentRedeliveryCount);

// MessageId firstMsgId = this.dispatchedMessages.Last.Value.Message.MessageId;

foreach(MessageDispatch dispatch in this.dispatchedMessages)
{
// Allow the message to update its internal to reflect a Rollback.

0 comments on commit 14726f3

Please sign in to comment.