New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MessageBroker internal subscription(s) list is not thread safe #108
Comments
ops 😈 |
:-P |
now...we have a failing test :-) so far so good. Possible solutions that come to my mind:
Comments? |
@micdenny take a look at the current fix: https://github.com/RadicalFx/radical/blob/293a96144802735bc7f3cc7ec04ecd153a10080e/src/net35/Radical/Messaging/MessageBroker.cs There is a branch for the hot-fix. In the "develop" there is the failing test, in the "hotfix/issue-108-broker-thread-safety" the test passes. Two things:
|
And here is the horrible test :-) |
Flagged to review, I'll come back in these days |
👍 |
I'm on the issue.... will update you soon! |
I want to let you see this other horrible test before I commit 😄 I've just tried to let the test run faster, while trying to leave the test deterministic, making some kind of metrics at startup, then pushing many subscriptions as we need to have a broadcast running for at least 100ms, in the meanwhile the other thread start subscribe changing the underline collection, creating the race condition. [TestMethod]
[TestCategory( "MessageBroker" )]
public void MessageBroker_broadcast_from_multiple_thread_should_not_fail()
{
Exception failure = null;
var wh = new ManualResetEvent( false );
var run = true;
var dispatcher = new NullDispatcher();
var broker = new MessageBroker( dispatcher );
// do some metrics to find a good number of subscription to let the pub do a long running
const int metricsCount = 100;
for (int i = 0; i < metricsCount; i++) broker.Subscribe<PocoTestMessage>(this, (sender, msg) => { });
var sw = Stopwatch.StartNew();
broker.Broadcast(this, new PocoTestMessage());
sw.Stop();
const int longRunningTiming = 100;
var subCountForLongRunning = (longRunningTiming / sw.ElapsedMilliseconds) * metricsCount;
Trace.WriteLine(string.Format("Need {0} subscriptions to run for at least {1} ms. {2} took {3} ms.", subCountForLongRunning, longRunningTiming, metricsCount, sw.ElapsedMilliseconds));
for (int i = 0; i < subCountForLongRunning - metricsCount; i++)
{
broker.Subscribe<PocoTestMessage>(this, (sender, msg) => { });
}
// this will take approximately 100 ms to do 1 broadcast
var broadcastThread1 = new Thread( payload =>
{
while( run )
{
try
{
broker.Broadcast( this, new PocoTestMessage() );
}
catch( Exception e )
{
lock( this )
{
failure = e;
wh.Set();
}
break;
}
}
} );
broadcastThread1.IsBackground = true;
broadcastThread1.Start();
// this should istantly throw an error because the broadcasting is enumerating the subscriptions and should take 100 ms to enumerate them all
var subscriberThread1 = new Thread(payload =>
{
while (run)
{
try
{
broker.Subscribe<PocoTestMessage>(this, (sender, msg) =>
{
Thread.Sleep(10);
});
}
catch (Exception e)
{
lock (this)
{
failure = e;
wh.Set();
break;
}
}
}
});
subscriberThread1.IsBackground = true;
subscriberThread1.Start();
var timeout = 1;
var signaled = wh.WaitOne( TimeSpan.FromSeconds( timeout ) );
if( !signaled )
{
Trace.WriteLine( String.Format( "Run without any issue for {0} seconds.", timeout ) );
}
run = false;
subscriberThread1.Join();
Assert.IsNull( failure, failure != null ? failure.ToString() : "--" );
} Unit testing concurrency is a real PITA!! |
Actually didn't meant to push directly the changes, but git automatically configure the push on the upstream instead of my origin (fork). Anyway, I've done the changes in the right order and place, the changes to the test I've did it in develop and then merge on hotfix, then I've improved the locking style in the hotfix branch, but I've only pushed the hotfix branch to remote, so you can tell me if you don't like the changes. |
I've pushed also the development branch to avoid strange and inconsistent merge between branch. If you want to revert some or all of my changes, just tell me 😄 |
On my VM the test always fail with the expected exception: "Collection was modified; ..." |
@micdenny ignore the previous. I just noticed that I was running the test in the "develop" branch where it is obvious that it fails :-) |
ahahahah :-D in "release" it can happen that the broker is so fast that to broadcast the first 100 messages to setup the metrics it takes 0ms thus the test fails with a DivideByZeroException :-D |
Eheheh bug in the test OMG :) it was something to expect, it could not be really deterministic... Actually I don't have any other idea... Maybe rollback to your version and leave it as a manual test? What about using the ReaderWriterLock, are you happy with that? It should improve the contention while reading that should be the 90% of the usage while publishing. |
The fix to me sounds perfect. |
the test should have enough guards now. |
can you review? and if ok we should merge the hot-fix and release immediately |
Reviewed, that's a go for me 👍 |
ok, I'll merge |
MessageBroker is not thread safe, the internal subscriptions list is a shared resource and accessing it concurrently, due to concurrent subscribe/unsubscribe/dispatch/broadcast, can raise some race conditions.
Introducing a lock means a performance penalty in all the scenarios.
The text was updated successfully, but these errors were encountered: