Skip to content

Commit

Permalink
Make NetMQPoller implement ISynchronizeInvoke.
Browse files Browse the repository at this point in the history
This interface is used in several APIs where synchronisation is offered, especially those which have been around for a long time in the BCL such as WinForms, WebForms, System.Timers.Timer and System.Diagnostics.Process.

The implementation utilises the TaskScheduler, and as such is not available in the .NET 3.5 build.
  • Loading branch information
drewnoakes authored and somdoron committed Feb 17, 2016
1 parent 23aada2 commit 1fc33af
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 5 deletions.
32 changes: 30 additions & 2 deletions src/NetMQ.Tests/NetMQPollerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void AddSocketDuringWork()
poller.RunAsync();

dealer1.SendFrame("1");
Assert.IsTrue(signal1.WaitOne(300));
Assert.IsTrue(signal1.WaitOne(300));

dealer2.SendFrame("2");
Assert.IsTrue(signal2.WaitOne(300));
Expand Down Expand Up @@ -799,7 +799,7 @@ public void NativeSocket()

#endregion

#region Scheduling tests
#region TaskScheduler tests

#if !NET35
[Test]
Expand Down Expand Up @@ -931,5 +931,33 @@ public void TwoThreads()
#endif

#endregion

#region ISynchronizeInvoke tests

#if !NET35
[Test]
public void ISynchronizeInvokeWorks()
{
using (var poller = new NetMQPoller())
using (var signal = new ManualResetEvent(false))
using (var timer = new System.Timers.Timer { AutoReset = false, Interval = 100, SynchronizingObject = poller, Enabled = true })
{
var isCorrectThread = false;

poller.RunAsync();

timer.Elapsed += (sender, args) =>
{
isCorrectThread = poller.CanExecuteTaskInline;
Assert.True(signal.Set());
};

Assert.True(signal.WaitOne(TimeSpan.FromSeconds(2)));
Assert.True(isCorrectThread);
}
}
#endif

#endregion
}
}
41 changes: 38 additions & 3 deletions src/NetMQ/NetMQPoller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using NetMQ.Core.Utils;
#if !NET35
using System.Threading.Tasks;
using System.ComponentModel;
#endif

using Switch = NetMQ.Core.Utils.Switch;
Expand All @@ -17,7 +18,7 @@ namespace NetMQ
{
public sealed class NetMQPoller :
#if !NET35
TaskScheduler,
TaskScheduler, ISynchronizeInvoke,
#endif
INetMQPoller, ISocketPollableCollection, IEnumerable, IDisposable
{
Expand Down Expand Up @@ -109,7 +110,7 @@ protected override void QueueTask(Task task)
m_tasksQueue.Enqueue(task);
}

private void Run(Action action)
private void Run([NotNull] Action action)
{
if (CanExecuteTaskInline)
action();
Expand Down Expand Up @@ -528,7 +529,7 @@ public void Dispose()
m_switch.WaitForOff();
Debug.Assert(!IsRunning);
}

m_stopSignaler.Dispose();
#if !NET35
m_tasksQueue.Dispose();
Expand All @@ -542,6 +543,40 @@ public void Dispose()

#endregion

#region ISynchronizeInvoke

#if !NET35
IAsyncResult ISynchronizeInvoke.BeginInvoke(Delegate method, object[] args)
{
var task = new Task<object>(() => method.DynamicInvoke(args));
task.Start(this);
return task;
}

object ISynchronizeInvoke.EndInvoke(IAsyncResult result)
{
var task = (Task<object>)result;
return task.Result;
}

object ISynchronizeInvoke.Invoke(Delegate method, object[] args)
{
if (CanExecuteTaskInline)
return method.DynamicInvoke(args);

var task = new Task<object>(() => method.DynamicInvoke(args));
task.Start(this);
return task.Result;
}

bool ISynchronizeInvoke.InvokeRequired
{
get { return !CanExecuteTaskInline; }
}
#endif

#endregion

#region Synchronisation context

#if !NET35
Expand Down

0 comments on commit 1fc33af

Please sign in to comment.