Skip to content
Permalink
Browse files
https://issues.apache.org/activemq/browse/AMQNET-284
Improved Inactivity Monitor code to work with Apollo, and StompWireFormat and StompFrame logging for easier debug.
  • Loading branch information
Timothy A. Bish committed Nov 8, 2010
1 parent 987dbcb commit 4e94698a4d1577891911e21b319743c7e619e905
Showing 3 changed files with 104 additions and 36 deletions.
@@ -31,6 +31,8 @@ public class StompFrame
public const String SEPARATOR = ":";
/// Used to mark the End of the Frame.
public const byte FRAME_TERMINUS = (byte) 0;
/// Used to denote a Special KeepAlive command that consists of a single newline.
public const String KEEPALIVE = "KEEPALIVE";

private string command;
private IDictionary properties = new Hashtable();
@@ -113,8 +115,33 @@ public void ClearProperties()
this.properties.Clear();
}

public override string ToString()
{
StringBuilder builder = new StringBuilder();

builder.Append(GetType().Name + "[ ");
builder.Append("Command=" + Command);
builder.Append(", Properties={");
foreach(string key in this.properties.Keys)
{
builder.Append(" " + key + "=" + this.properties[key]);
}

builder.Append("}, ");
builder.Append("Content=" + this.content ?? this.content.ToString());
builder.Append("]");

return builder.ToString();
}

public void ToStream(BinaryWriter dataOut)
{
if(this.Command == KEEPALIVE)
{
dataOut.Write(NEWLINE);
return;
}

StringBuilder builder = new StringBuilder();

builder.Append(this.Command);
@@ -142,8 +169,12 @@ public void ToStream(BinaryWriter dataOut)
public void FromStream(BinaryReader dataIn)
{
this.ReadCommandHeader(dataIn);
this.ReadHeaders(dataIn);
this.ReadContent(dataIn);

if(this.command != KEEPALIVE)
{
this.ReadHeaders(dataIn);
this.ReadContent(dataIn);
}
}

private void ReadCommandHeader(BinaryReader dataIn)
@@ -125,7 +125,12 @@ public Object Unmarshal(BinaryReader dataIn)
protected virtual Object CreateCommand(StompFrame frame)
{
string command = frame.Command;


if(Tracer.IsDebugEnabled)
{
Tracer.Debug("StompWireFormat - Received " + frame.ToString());
}

if(command == "RECEIPT")
{
string text = frame.RemoveProperty("receipt-id");
@@ -137,15 +142,12 @@ protected virtual Object CreateCommand(StompFrame frame)
text = text.Substring("ignore:".Length);
}

Tracer.Debug("StompWireFormat - Received RESPONSE command: CorrelationId = " + text);

answer.CorrelationId = Int32.Parse(text);
return answer;
}
}
else if(command == "CONNECTED")
{
Tracer.Debug("StompWireFormat - Received CONNECTED command");
return ReadConnected(frame);
}
else if(command == "ERROR")
@@ -154,7 +156,6 @@ protected virtual Object CreateCommand(StompFrame frame)

if(text != null && text.StartsWith("ignore:"))
{
Tracer.Debug("StompWireFormat - Received ERROR Response command: correlationId = " + text);
Response answer = new Response();
answer.CorrelationId = Int32.Parse(text.Substring("ignore:".Length));
return answer;
@@ -170,18 +171,15 @@ protected virtual Object CreateCommand(StompFrame frame)
BrokerError error = new BrokerError();
error.Message = frame.RemoveProperty("message");
answer.Exception = error;
Tracer.Debug("StompWireFormat - Received ERROR command: " + error.Message);
return answer;
}
}
else if(command == "KEEPALIVE")
{
Tracer.Debug("StompWireFormat - Received KEEPALIVE command");
return new KeepAliveInfo();
}
else if(command == "MESSAGE")
{
Tracer.Debug("StompWireFormat - Received MESSAGE command");
return ReadMessage(frame);
}

@@ -192,12 +190,6 @@ protected virtual Object CreateCommand(StompFrame frame)

protected virtual Command ReadConnected(StompFrame frame)
{
Tracer.Debug("CONNECTED command: " + frame.Command + " headers: ");
foreach(string key in frame.Properties.Keys)
{
Tracer.DebugFormat(" property[{0}] = {1}", key, frame.Properties[key]);
}

string responseId = frame.RemoveProperty("response-id");

this.remoteWireFormatInfo = new WireFormatInfo();
@@ -415,6 +407,11 @@ protected virtual void WriteMessage(Message command, BinaryWriter dataOut)
frame.SetProperty(key, map[key]);
}

if(Tracer.IsDebugEnabled)
{
Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
}

frame.ToStream(dataOut);
}

@@ -428,13 +425,16 @@ protected virtual void WriteMessageAck(MessageAck command, BinaryWriter dataOut)

frame.SetProperty("message-id", command.LastMessageId.ToString());

Tracer.Debug("ACK - Outbound MessageId = " + frame.GetProperty("message-id"));

if(command.TransactionId != null)
{
frame.SetProperty("transaction", command.TransactionId.ToString());
}

if(Tracer.IsDebugEnabled)
{
Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
}

frame.ToStream(dataOut);
}

@@ -456,14 +456,26 @@ protected virtual void WriteConnectionInfo(ConnectionInfo command, BinaryWriter
frame.SetProperty("heart-beat", command.WriteCheckInterval + "," + command.ReadCheckInterval);
}

if(Tracer.IsDebugEnabled)
{
Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
}

frame.ToStream(dataOut);
}

protected virtual void WriteShutdownInfo(ShutdownInfo command, BinaryWriter dataOut)
{
System.Diagnostics.Debug.Assert(!command.ResponseRequired);

new StompFrame("DISCONNECT").ToStream(dataOut);

StompFrame frame = new StompFrame("DISCONNECT");

if(Tracer.IsDebugEnabled)
{
Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
}

frame.ToStream(dataOut);
}

protected virtual void WriteConsumerInfo(ConsumerInfo command, BinaryWriter dataOut)
@@ -481,8 +493,6 @@ protected virtual void WriteConsumerInfo(ConsumerInfo command, BinaryWriter data
frame.SetProperty("selector", command.Selector);
frame.SetProperty("ack", StompHelper.ToStomp(command.AckMode));

Tracer.Debug("SUBSCRIBE : Outbound AckMode = " + frame.GetProperty("ack"));

if(command.NoLocal)
{
frame.SetProperty("no-local", command.NoLocal.ToString());
@@ -523,12 +533,24 @@ protected virtual void WriteConsumerInfo(ConsumerInfo command, BinaryWriter data
frame.SetProperty("activemq.retroactive", command.Retroactive);
}

if(Tracer.IsDebugEnabled)
{
Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
}

frame.ToStream(dataOut);
}

protected virtual void WriteKeepAliveInfo(KeepAliveInfo command, BinaryWriter dataOut)
{
dataOut.Write((byte) '\n' );
StompFrame frame = new StompFrame(StompFrame.KEEPALIVE);

if(Tracer.IsDebugEnabled)
{
Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
}

frame.ToStream(dataOut);
}

protected virtual void WriteRemoveInfo(RemoveInfo command, BinaryWriter dataOut)
@@ -544,6 +566,12 @@ protected virtual void WriteRemoveInfo(RemoveInfo command, BinaryWriter dataOut)
frame.SetProperty("receipt", command.CommandId);
}
frame.SetProperty("id", consumerId.ToString() );

if(Tracer.IsDebugEnabled)
{
Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
}

frame.ToStream(dataOut);
}
}
@@ -574,6 +602,12 @@ protected virtual void WriteTransactionInfo(TransactionInfo command, BinaryWrite
}

frame.SetProperty("transaction", command.TransactionId.ToString());

if(Tracer.IsDebugEnabled)
{
Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
}

frame.ToStream(dataOut);
}

@@ -49,21 +49,21 @@ public class InactivityMonitor : TransportFilter

private DateTime lastReadCheckTime;

private long readCheckTime;
private long readCheckTime = 30000;
public long ReadCheckTime
{
get { return this.readCheckTime; }
set { this.readCheckTime = value; }
}

private long writeCheckTime;
private long writeCheckTime = 10000;
public long WriteCheckTime
{
get { return this.writeCheckTime; }
set { this.writeCheckTime = value; }
}

private long initialDelayTime;
private long initialDelayTime = 0;
public long InitialDelayTime
{
get { return this.initialDelayTime; }
@@ -103,6 +103,8 @@ protected override void Dispose(bool disposing)

public void CheckConnection(object state)
{
Tracer.DebugFormat("Timer Elapsed at {0}", DateTime.Now.ToLocalTime());

// First see if we have written or can write.
WriteCheck();

@@ -123,16 +125,16 @@ public void WriteCheck()
return;
}

if(!commandSent.Value)
{
// if(!commandSent.Value)
// {
Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
this.asyncWriteTask.IsPending = true;
this.asyncTasks.Wakeup();
}
else
{
Tracer.Debug("Message sent since last write check. Resetting flag");
}
// }
// else
// {
// Tracer.Debug("Message sent since last write check. Resetting flag");
// }

commandSent.Value = false;
}
@@ -178,7 +180,7 @@ public void ReadCheck()
/// <returns></returns>
public bool AllowReadCheck(TimeSpan elapsed)
{
return (elapsed.TotalMilliseconds > (readCheckTime * 9 / 10));
return (elapsed.TotalMilliseconds > (readCheckTime + readCheckTime * 0.90) );
}
#endregion

@@ -316,16 +318,19 @@ private void StartMonitorThreads()

if(this.asyncErrorTask != null)
{
Tracer.Debug("Inactivity: Adding the Async Read Check Task to the Runner.");
this.asyncTasks.AddTask(this.asyncErrorTask);
}

if(this.asyncWriteTask != null)
{
Tracer.Debug("Inactivity: Adding the Async Write Check Task to the Runner.");
this.asyncTasks.AddTask(this.asyncWriteTask);
}

if(this.asyncErrorTask != null || this.asyncWriteTask != null)
{
Tracer.Debug("Inactivity: Starting the Monitor Timer.");
monitorStarted.Value = true;

this.connectionCheckTimer = new Timer(
@@ -410,12 +415,10 @@ public bool IsPending

public bool Iterate()
{
Tracer.Debug("AsyncWriteTask perparing for another Write Check");
if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
{
try
{
Tracer.Debug("AsyncWriteTask Write Check required sending KeepAlive.");
KeepAliveInfo info = new KeepAliveInfo();
this.parent.Oneway(info);
}

0 comments on commit 4e94698

Please sign in to comment.