Skip to content

Commit

Permalink
port router handover from libzmq
Browse files Browse the repository at this point in the history
Conflicts:
	src/NetMQ/Core/ZmqSocketOption.cs
  • Loading branch information
somdoron committed Feb 22, 2016
1 parent 5bd0807 commit 9d6a983
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 9 deletions.
40 changes: 40 additions & 0 deletions src/NetMQ.Tests/RouterTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using NUnit.Framework;
using System.Text;
using System.Threading;
using NetMQ.Sockets;

// ReSharper disable AccessToDisposedClosure
Expand Down Expand Up @@ -80,5 +81,44 @@ public void TwoMessagesFromRouterToDealer()
poller.Run();
}
}

[Test]
public void Handover()
{
using (var router = new RouterSocket())
using (var dealer1 = new DealerSocket())
{
router.Options.RouterHandover = true;
router.Bind("inproc://127.0.0.1:5555");
dealer1.Options.Identity = Encoding.ASCII.GetBytes("ID");
dealer1.Connect("inproc://127.0.0.1:5555");
dealer1.SendMoreFrame("Hello").SendFrame("World");

var identity = router.ReceiveFrameString();
Assert.AreEqual("ID", identity);

using (var dealer2 = new DealerSocket())
{
dealer2.Options.Identity = Encoding.ASCII.GetBytes("ID");
dealer2.Connect("inproc://127.0.0.1:5555");

// We have new peer which should take over, however we are still reading a message
var message = router.ReceiveFrameString();
Assert.AreEqual("Hello", message);
message = router.ReceiveFrameString();
Assert.AreEqual("World", message);

dealer2.SendMoreFrame("Hello").SendFrame("World");
identity = router.ReceiveFrameString();
Assert.AreEqual("ID", identity);

message = router.ReceiveFrameString();
Assert.AreEqual("Hello", message);

message = router.ReceiveFrameString();
Assert.AreEqual("World", message);
}
}
}
}
}
82 changes: 73 additions & 9 deletions src/NetMQ/Core/Patterns/Router.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class RouterSession : SessionBase
{
public RouterSession([NotNull] IOThread ioThread, bool connect, [NotNull] SocketBase socket, [NotNull] Options options, [NotNull] Address addr)
: base(ioThread, connect, socket, options, addr)
{}
{ }
}

/// <summary>
Expand Down Expand Up @@ -106,6 +106,14 @@ public Outpipe([NotNull] Pipe pipe, bool active)
/// </summary>
private Pipe m_currentOut;

/// <summary>
/// The pipe we are currently reading from.
/// </summary>
private Pipe m_currentIn;


private bool m_closingCurrentIn;

/// <summary>
/// If true, more outgoing message parts are expected.
/// </summary>
Expand All @@ -125,6 +133,11 @@ public Outpipe([NotNull] Pipe pipe, bool active)

private bool m_rawSocket;

/// <summary>
/// When enabled new router connections with same identity take over old ones
/// </summary>
private bool m_handover;

/// <summary>
/// Create a new Router instance with the given parent-Ctx, thread-id, and socket-id.
/// </summary>
Expand Down Expand Up @@ -185,6 +198,9 @@ protected override bool XSetSocketOption(ZmqSocketOption option, object optval)
case ZmqSocketOption.RouterMandatory:
m_mandatory = (bool)optval;
return true;
case ZmqSocketOption.RouterHandover:
m_handover = (bool)optval;
return true;
}

return false;
Expand Down Expand Up @@ -278,7 +294,7 @@ protected override bool XSend(ref Msg msg)
// If there's no such pipe just silently ignore the message, unless
// mandatory is set.

var identity = msg.Size == msg.Data.Length
var identity = msg.Size == msg.Data.Length
? msg.Data
: msg.CloneData();

Expand Down Expand Up @@ -375,6 +391,17 @@ protected override bool XRecv(ref Msg msg)
m_prefetched = false;
}
m_moreIn = msg.HasMore;

if (!m_moreIn)
{
if (m_closingCurrentIn)
{
m_currentIn.Terminate(true);
m_closingCurrentIn = false;
}
m_currentIn = null;
}

return true;
}

Expand All @@ -397,7 +424,19 @@ protected override bool XRecv(ref Msg msg)

// If we are in the middle of reading a message, just return the next part.
if (m_moreIn)
{
m_moreIn = msg.HasMore;

if (!m_moreIn)
{
if (m_closingCurrentIn)
{
m_currentIn.Terminate(true);
m_closingCurrentIn = false;
}
m_currentIn = null;
}
}
else
{
// We are at the beginning of a message.
Expand All @@ -406,6 +445,7 @@ protected override bool XRecv(ref Msg msg)
m_prefetchedMsg.Move(ref msg);

m_prefetched = true;
m_currentIn = pipe[0];

byte[] identity = pipe[0].Identity;
msg.InitPool(identity.Length);
Expand Down Expand Up @@ -468,6 +508,7 @@ protected override bool XHasIn()

m_prefetched = true;
m_identitySent = false;
m_currentIn = pipe[0];

return true;
}
Expand Down Expand Up @@ -517,15 +558,38 @@ private bool IdentifyPeer([NotNull] Pipe pipe)
else
{
identity = msg.CloneData();
msg.Close();

// Ignore peers with duplicate ID.
if (m_outpipes.ContainsKey(identity))
{
msg.Close();
return false;
}
Outpipe existPipe;

msg.Close();
if (m_outpipes.TryGetValue(identity, out existPipe))
{
if (!m_handover)
{
// Ignore peers with duplicate ID.
return false;
}
else
{
// We will allow the new connection to take over this
// identity. Temporarily assign a new identity to the
// existing pipe so we can terminate it asynchronously.
var newIdentity = new byte[5];
byte[] result = BitConverter.GetBytes(m_nextPeerId++);
Buffer.BlockCopy(result, 0, newIdentity, 1, 4);
existPipe.Pipe.Identity = newIdentity;
m_outpipes.Add(newIdentity, existPipe);

// Remove the existing identity entry to allow the new
// connection to take the identity.
m_outpipes.Remove(identity);

if (existPipe.Pipe == m_currentIn)
m_closingCurrentIn = true;
else
existPipe.Pipe.Terminate(true);
}
}
}
}

Expand Down
19 changes: 19 additions & 0 deletions src/NetMQ/Core/ZmqSocketOption.cs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,25 @@ internal enum ZmqSocketOption
/// </summary>
XPublisherBroadcast = 45,

/// <summary>
/// The low-water mark for message transmission. This is the number of messages that should be processed
/// before transmission is unblocked (in case it was blocked by reaching high-watermark). The default value is
/// calculated using relevant high-watermark (HWM): HWM > 2048 ? HWM - 1024 : (HWM + 1) / 2
/// </summary>
SendLowWatermark = 46,

/// <summary>
/// The low-water mark for message reception. This is the number of messages that should be processed
/// before reception is unblocked (in case it was blocked by reaching high-watermark). The default value is
/// calculated using relevant high-watermark (HWM): HWM > 2048 ? HWM - 1024 : (HWM + 1) / 2
/// </summary>
ReceiveLowWatermark = 47,

/// <summary>
/// When enabled new router connections with same identity take over old ones
/// </summary>
RouterHandover = 48,

/// <summary>
/// Specifies the byte-order: big-endian, vs little-endian.
/// </summary>
Expand Down
8 changes: 8 additions & 0 deletions src/NetMQ/SocketOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,14 @@ public bool RouterRawSocket
set { m_socket.SetSocketOption(ZmqSocketOption.RouterRawSocket, value); }
}

/// <summary>
/// When enabled new router connections with same identity take over old ones
/// </summary>
public bool RouterHandover
{
set { m_socket.SetSocketOption(ZmqSocketOption.RouterHandover, value); }
}

/// <summary>
/// Get or set the byte-order: big-endian, vs little-endian.
/// </summary>
Expand Down

0 comments on commit 9d6a983

Please sign in to comment.