Skip to content
Permalink
Browse files
https://issues.apache.org/activemq/browse/AMQNET-284
Sets the Host field on the v1.1+ Connect Frame.
Tweaks the Frame's keep alive send a bit more.
Updates the Inactivity monitor to better log its state and to no send the keep alive through its own Oneway method which was causing it to miss every other KeepAliveInfo send.  Also sets the Inactivity read check to use a generous window since Apollo seems to take twice as long as you ask it to send you a KeepAlive.
  • Loading branch information
Timothy A. Bish committed Nov 8, 2010
1 parent 4e94698 commit c61878a4d242d2b79acc4d4abf01bab24bfcf1db
Showing 3 changed files with 23 additions and 11 deletions.
@@ -81,6 +81,7 @@ public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdG

this.info = new ConnectionInfo();
this.info.ConnectionId = id;
this.info.Host = brokerUri.Host;

this.messageTransformation = new StompMessageTransformation(this);
}
@@ -139,6 +139,7 @@ public void ToStream(BinaryWriter dataOut)
if(this.Command == KEEPALIVE)
{
dataOut.Write(NEWLINE);
dataOut.Flush();
return;
}

@@ -103,7 +103,7 @@ protected override void Dispose(bool disposing)

public void CheckConnection(object state)
{
Tracer.DebugFormat("Timer Elapsed at {0}", DateTime.Now.ToLocalTime());
Tracer.DebugFormat("CheckConnection: Timer Elapsed at {0}", DateTime.Now.ToLocalTime());

// First see if we have written or can write.
WriteCheck();
@@ -125,16 +125,16 @@ public void WriteCheck()
return;
}

// if(!commandSent.Value)
// {
Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
if(!commandSent.Value)
{
//Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
this.asyncWriteTask.IsPending = true;
this.asyncTasks.Wakeup();
// }
// else
// {
// Tracer.Debug("Message sent since last write check. Resetting flag");
// }
}
else
{
Tracer.Debug("Message sent since last write check. Resetting flag");
}

commandSent.Value = false;
}
@@ -180,7 +180,7 @@ public void ReadCheck()
/// <returns></returns>
public bool AllowReadCheck(TimeSpan elapsed)
{
return (elapsed.TotalMilliseconds > (readCheckTime + readCheckTime * 0.90) );
return (elapsed.TotalMilliseconds > (readCheckTime * 2) );
}
#endregion

@@ -211,6 +211,16 @@ protected override void OnCommand(ITransport sender, Command command)
}
}
}
else if(command.IsKeepAliveInfo)
{
if(Tracer.IsDebugEnabled)
{
Tracer.Debug("InactivityMonitor: New Keep Alive Received at -> " +
DateTime.Now.ToLongTimeString().TrimEnd(" APM".ToCharArray()) +
"." + DateTime.Now.Millisecond);
}
}

base.OnCommand(sender, command);
}
finally
@@ -420,7 +430,7 @@ public bool Iterate()
try
{
KeepAliveInfo info = new KeepAliveInfo();
this.parent.Oneway(info);
this.parent.next.Oneway(info);
}
catch(IOException e)
{

0 comments on commit c61878a

Please sign in to comment.