Skip to content
Permalink
Browse files
https://issues.apache.org/activemq/browse/AMQNET-254
Adds a basic FailoverTransport and Inactivity monitor.  Also provides a simple ConnectionStateTracker that can track the Connection and restore all its Consumer subscriptions once a successful failover is completed.
  • Loading branch information
Timothy A. Bish committed Jul 6, 2010
1 parent 1ebbfe1 commit 3ef8780875c744e6018e9751cef8f3ad66ef2aa4
Show file tree
Hide file tree
Showing 16 changed files with 1,475 additions and 11 deletions.
@@ -16,6 +16,7 @@
*/

using System;
using Apache.NMS.Stomp.State;

namespace Apache.NMS.Stomp.Commands
{
@@ -200,6 +201,14 @@ public virtual bool IsErrorCommand
}
}

public virtual bool IsKeepAliveInfo
{
get
{
return false;
}
}

public virtual bool ResponseRequired
{
get
@@ -212,6 +221,11 @@ public virtual bool ResponseRequired
}
}

public virtual Response visit(ICommandVisitor visitor)
{
throw new ApplicationException("BaseCommand.Visit() not implemented");
}

public override Object Clone()
{
// Since we are a derived class use the base's Clone()
@@ -16,6 +16,7 @@
*/

using System;
using Apache.NMS.Stomp.State;

namespace Apache.NMS.Stomp.Commands
{
@@ -81,6 +82,17 @@ bool IsResponse
get;
}

bool IsKeepAliveInfo
{
get;
}

bool IsShutdownInfo
{
get;
}

Response visit(ICommandVisitor visitor);
}
}

@@ -49,6 +49,7 @@ public class DataStructureTypes
public const byte RemoveInfoType = 25;
public const byte RemoveSubscriptionInfoType = 26;
public const byte ErrorResponseType = 27;
public const byte KeepAliveInfoType = 28;

public const byte DestinationType = 48;
public const byte TempDestinationType = 49;
@@ -128,6 +129,9 @@ public static String GetDataStructureTypeAsString(int type)
case ErrorResponseType:
packetTypeStr = "ErrorResponseType";
break;
case KeepAliveInfoType:
packetTypeStr = "KeepAliveInfoType";
break;
case DestinationType:
packetTypeStr = "DestinationType";
break;
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections;

using Apache.NMS.Stomp.State;

namespace Apache.NMS.Stomp.Commands
{
public class KeepAliveInfo : BaseCommand
{
///
/// <summery>
/// Get the unique identifier that this object and its own
/// Marshaler share.
/// </summery>
///
public override byte GetDataStructureType()
{
return DataStructureTypes.KeepAliveInfoType;
}

///
/// <summery>
/// Returns a string containing the information for this DataStructure
/// such as its type and value of its elements.
/// </summery>
///
public override string ToString()
{
return GetType().Name + "[ " +
"commandId = " + this.CommandId + ", " +
"responseRequired = " + this.ResponseRequired + ", " + " ]";
}

///
/// <summery>
/// Return an answer of true to the isKeepAliveInfo() query.
/// </summery>
///
public override bool IsKeepAliveInfo
{
get
{
return true;
}
}

///
/// <summery>
/// Allows a Visitor to visit this command and return a response to the
/// command based on the command type being visited. The command will call
/// the proper processXXX method in the visitor.
/// </summery>
///
public override Response visit(ICommandVisitor visitor)
{
return visitor.processKeepAliveInfo( this );
}

};
}

@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;

using Apache.NMS.Stomp.Commands;
using System.Text;
using Apache.NMS;

namespace Apache.NMS.Stomp
{

/// <summary>
/// Exception thrown when an IO error occurs
/// </summary>
public class IOException : NMSException
{
public IOException()
: base("IO Exception failed with missing exception log")
{
}

public IOException(String msg)
: base(msg)
{
}

public IOException(String msg, Exception inner)
: base(msg, inner)
{
}
}
}


@@ -93,6 +93,10 @@ public void Marshal(Object o, BinaryWriter dataOut)
{
WriteRemoveInfo((RemoveInfo) o, dataOut);
}
else if(o is KeepAliveInfo)
{
WriteKeepAliveInfo((KeepAliveInfo) o, dataOut);
}
else if(o is Command)
{
Command command = o as Command;
@@ -435,7 +439,7 @@ protected virtual void WriteConsumerInfo(ConsumerInfo command, BinaryWriter data
}
else
{
// frame.SetProperty("transformation", "jms-map-xml");
frame.SetProperty("transformation", "jms-xml");
}

frame.SetProperty("activemq.dispatchAsync", command.DispatchAsync);
@@ -465,6 +469,11 @@ protected virtual void WriteConsumerInfo(ConsumerInfo command, BinaryWriter data
frame.ToStream(dataOut);
}

protected virtual void WriteKeepAliveInfo(KeepAliveInfo command, BinaryWriter dataOut)
{
dataOut.Write((byte) '\n' );
}

protected virtual void WriteRemoveInfo(RemoveInfo command, BinaryWriter dataOut)
{
StompFrame frame = new StompFrame("UNSUBSCRIBE");

0 comments on commit 3ef8780

Please sign in to comment.