Skip to content

Commit

Permalink
Merge pull request #7 from somdoron/master
Browse files Browse the repository at this point in the history
cherry picks fixes from NetMQ4
  • Loading branch information
drewnoakes committed Jul 9, 2016
2 parents 2dbb8f3 + ad5059e commit 9d19a46
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 37 deletions.
71 changes: 71 additions & 0 deletions src/NetMQ.Tests/CleanupTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ.Sockets;
using NUnit.Framework;

namespace NetMQ.Tests
{
[TestFixture]
public class CleanupTests
{
[Test]
public void Block()
{
const int count = 1000;

NetMQConfig.Linger = TimeSpan.FromSeconds(0.5);

using (var client = new DealerSocket(">tcp://localhost:5557"))
{
// Sending a lot of messages
client.Options.SendHighWatermark = count;
for (int i = 0; i < count; i++)
{
client.SendFrame("Hello");
}
}

Stopwatch stopwatch = Stopwatch.StartNew();
NetMQConfig.Cleanup();
stopwatch.Stop();

Assert.Greater(stopwatch.ElapsedMilliseconds, 500);
}

[Test]
public void NoBlock()
{
const int count = 1000;

NetMQConfig.Linger = TimeSpan.FromSeconds(0.5);

using (var client = new DealerSocket(">tcp://localhost:5557"))
{
// Sending a lot of messages
client.Options.SendHighWatermark = count;
for (int i = 0; i < count; i++)
{
client.SendFrame("Hello");
}
}

Stopwatch stopwatch = Stopwatch.StartNew();
NetMQConfig.Cleanup(false);
stopwatch.Stop();

Assert.Less(stopwatch.ElapsedMilliseconds, 500);
}

[Test]
public void NoBlockNoDispose()
{
var client = new DealerSocket(">tcp://localhost:5557");
NetMQConfig.Cleanup(false);
}
}
}
2 changes: 2 additions & 0 deletions src/NetMQ.Tests/NetMQ.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
<Compile Include="ActorTests.cs" />
<Compile Include="BeaconTests.cs" />
<Compile Include="ByteArraySegmentTests.cs" />
<Compile Include="CleanupTests.cs" />
<Compile Include="EventDelegatorTests.cs" />
<Compile Include="ExceptionTests.cs" />
<Compile Include="InProcActors\AccountJSON\Account.cs" />
Expand All @@ -91,6 +92,7 @@
<Compile Include="ReceivingSocketExtensionsTests.cs" />
<Compile Include="RequestWithRetryTests.cs" />
<Compile Include="RouterTests.cs" />
<Compile Include="Setup.cs" />
<Compile Include="XPubSubTests.cs" />
<Compile Include="Devices\DeviceTestBase.cs" />
<Compile Include="Devices\ForwarderDeviceTests.cs" />
Expand Down
2 changes: 2 additions & 0 deletions src/NetMQ.Tests/NetMQ3.5.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
<Compile Include="ActorTests.cs" />
<Compile Include="BeaconTests.cs" />
<Compile Include="ByteArraySegmentTests.cs" />
<Compile Include="CleanupTests.cs" />
<Compile Include="Core\YQueueTests.cs" />
<Compile Include="EventDelegatorTests.cs" />
<Compile Include="ExceptionTests.cs" />
Expand All @@ -84,6 +85,7 @@
<Compile Include="NetMQPollerTest.cs" />
<Compile Include="ReceivingSocketExtensionsTests.cs" />
<Compile Include="RouterTests.cs" />
<Compile Include="Setup.cs" />
<Compile Include="SocketOptionsTests.cs" />
<Compile Include="XPubSubTests.cs" />
<Compile Include="MessageTests.cs" />
Expand Down
11 changes: 7 additions & 4 deletions src/NetMQ.Tests/NetMQMonitorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@

namespace NetMQ.Tests
{
[TestFixture]
[TestFixture(Category = "Monitor")]
public class NetMQMonitorTests
{
[Test]
public void Monitoring()
{
using (var rep = new ResponseSocket())
using (var req = new RequestSocket())
using (var monitor = new NetMQMonitor(rep, "inproc://rep.inproc", SocketEvents.Accepted | SocketEvents.Listening))
using (var monitor = new NetMQMonitor(rep, $"inproc://rep.inproc", SocketEvents.Accepted | SocketEvents.Listening))
{
var listening = false;
var accepted = false;
Expand All @@ -28,6 +28,8 @@ public void Monitoring()

var monitorTask = Task.Factory.StartNew(monitor.Start);

Thread.Sleep(10);

var port = rep.BindRandomPort("tcp://127.0.0.1");

req.Connect("tcp://127.0.0.1:" + port);
Expand All @@ -47,8 +49,8 @@ public void Monitoring()

Thread.Sleep(200);

Assert.IsTrue(monitorTask.IsCompleted);
}
Assert.IsTrue(monitorTask.IsCompleted);
}
}

#if !NET35
Expand Down Expand Up @@ -102,6 +104,7 @@ public void ErrorCodeTest()
monitor.Timeout = TimeSpan.FromMilliseconds(100);

var monitorTask = Task.Factory.StartNew(monitor.Start);
Thread.Sleep(10);

var port = rep.BindRandomPort("tcp://127.0.0.1");

Expand Down
18 changes: 18 additions & 0 deletions src/NetMQ.Tests/Setup.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using NUnit.Framework;

namespace NetMQ.Tests
{
[SetUpFixture]
public class Setup
{
[TearDown]
public void TearDown()
{
NetMQConfig.Cleanup(false);
}
}
}
2 changes: 1 addition & 1 deletion src/NetMQ/Core/Command.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public Command([CanBeNull] ZObject destination, CommandType type, [CanBeNull] ob
/// Get the argument to this command.
/// </summary>
[CanBeNull]
public object Arg { get; private set; }
public object Arg { get; private set; }

/// <summary>
/// Override of ToString, which returns a string in the form [ command-type, destination ].
Expand Down
7 changes: 6 additions & 1 deletion src/NetMQ/Core/CommandType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ internal enum CommandType
/// Sent by reaper thread to the term thread when all the sockets
/// have successfully been deallocated.
/// </summary>
Done
Done,

/// <summary>
/// Send to reaper to stop the reaper immediatly
/// </summary>
ForceStop
}
}
15 changes: 9 additions & 6 deletions src/NetMQ/Core/Ctx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ namespace NetMQ.Core
/// <remarks>Internal analog of the public <see cref="NetMQContext"/> class.</remarks>
internal sealed class Ctx
{
private const int DefaultIOThreads = 1;
private const int DefaultMaxSockets = 1024;
internal const int DefaultIOThreads = 1;
internal const int DefaultMaxSockets = 1024;

#region Nested class: Endpoint

Expand Down Expand Up @@ -217,8 +217,12 @@ public void Terminate()
foreach (var socket in m_sockets)
socket.Stop();

if (m_sockets.Count == 0)
m_reaper.Stop();
if (!Block)
{
m_reaper.ForceStop();
}
else if (m_sockets.Count == 0)
m_reaper.Stop();
}
finally
{
Expand All @@ -234,8 +238,7 @@ public void Terminate()

Debug.Assert(found);
Debug.Assert(command.CommandType == CommandType.Done);
Monitor.Enter(m_slotSync);
Debug.Assert(m_sockets.Count == 0);
Monitor.Enter(m_slotSync);
}
else
Monitor.Enter(m_slotSync);
Expand Down
15 changes: 14 additions & 1 deletion src/NetMQ/Core/Reaper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ public void Stop()
SendStop();
}

public void ForceStop()
{
SendForceStop();
}

/// <summary>
/// Handle input-ready events, by receiving and processing any commands
/// that are waiting in the mailbox.
Expand Down Expand Up @@ -168,6 +173,14 @@ protected override void ProcessStop()
}
}

protected override void ProcessForceStop()
{
m_terminating = true;
SendDone();
m_poller.RemoveHandle(m_mailboxHandle);
m_poller.Stop();
}

/// <summary>
/// Add the given socket to the list to be reaped (terminated).
/// </summary>
Expand Down Expand Up @@ -196,6 +209,6 @@ protected override void ProcessReaped()
m_poller.RemoveHandle(m_mailboxHandle);
m_poller.Stop();
}
}
}
}
}
3 changes: 1 addition & 2 deletions src/NetMQ/Core/Utils/Poller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ public void Destroy()
if (!m_stopped)
{
try
{
m_stopping = true;
{
m_workerThread.Join();
}
catch (Exception)
Expand Down
15 changes: 15 additions & 0 deletions src/NetMQ/Core/ZObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ protected void SendStop()
m_ctx.SendCommand(m_threadId, new Command(this, CommandType.Stop));
}

protected void SendForceStop()
{
m_ctx.SendCommand(m_threadId, new Command(this, CommandType.ForceStop));
}

/// <summary>
/// Send the Plug command, incrementing the destinations sequence-number if incSeqnum is true.
/// </summary>
Expand Down Expand Up @@ -322,6 +327,10 @@ public void ProcessCommand([NotNull] Command cmd)
ProcessReaped();
break;

case CommandType.ForceStop:
ProcessForceStop();
break;

default:
throw new ArgumentException();
}
Expand All @@ -333,6 +342,12 @@ protected virtual void ProcessStop()
throw new NotSupportedException();
}

/// <exception cref="NotSupportedException">Not supported on the ZObject class.</exception>
protected virtual void ProcessForceStop()
{
throw new NotSupportedException();
}

/// <exception cref="NotSupportedException">Not supported on the ZObject class.</exception>
protected virtual void ProcessPlug()
{
Expand Down
Loading

0 comments on commit 9d19a46

Please sign in to comment.