Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group By implementation not thread safe #49

Closed
mattpodwysocki opened this issue Oct 13, 2014 · 3 comments
Closed

Group By implementation not thread safe #49

mattpodwysocki opened this issue Oct 13, 2014 · 3 comments

Comments

@mattpodwysocki
Copy link
Contributor

Copied from https://rx.codeplex.com/workitem/40

Looks like the group by implementation is not thread safe. Internally it uses a Dictionary.Add, in the onnext of the Sink, without using proper locking mechanisms.

 if (!_map.TryGetValue(key, out writer)) {
     writer = new Subject<TElement>();
     _map.Add(key, writer);
    fireNewMapEntry = true;
}

I noticed by subscribing to an event pattern that is invoked by multiple threads at the same time. (In my case 24 threads that potentially call TransportMessageReceived concurrently)

Observable.FromEventPattern<TransportMessageReceivedEventArgs>(
                    h => _bus.Transport.TransportMessageReceived += h,
                    h => _bus.Transport.TransportMessageReceived -= h);

And simply subscribe to it with a group by expression

var messagesReceived = from e in observable
                       group e by "something" into c
                       select c ;

subscription = messagesReceived.Subscribe(r => Console.WriteLine("Triggered") ); 

This will occasionally throw a null reference in Dictionary.Insert ... as that implementation is not thread safe by default.

@mattpodwysocki
Copy link
Contributor Author

davedev wrote Jul 3, 2013 at 7:50 PM

Hi,

That is the correct behavior.

One of Rx's contracts is that notifications must be pushed serially; i.e., you must not invoke OnNext concurrently within a single observable. There are probably many Rx operators that may cause threading bugs if you do not satisfy this contract.

See §4.2 in the Rx Design Guidelines document.
◾Dave

@mattpodwysocki
Copy link
Contributor Author

Bnaya wrote Aug 10, 2013 at 3:15 PM

Isn't it time to re-consider this limitation?
it is true that this is the current contract, but many operatoir can be thread-safe with
a little effort.
I believe that it should change, maybe by adding a set of thread-safe operators.
without doing it there is a need for synchronization and we may not be able to get full
utilization of a multi core environment (see Amdahl's law).

@mattpodwysocki
Copy link
Contributor Author

davedev wrote Aug 10, 2013 at 6:54 PM

I reject the premise. A thread-safe operator is thread safe because it ensures synchronization, so whether Rx does it or you do it yourself (e.g., via the Synchronize operator) it's going to end up being the same thing anyway. A set of thread-safe operators would essentially be, perhaps, identical to the set of existing operators with the addition of the Synchronize operator.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant