Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrency bug in ScheduledObserver #268

Merged
merged 1 commit into from
May 10, 2013

Conversation

benjchristensen
Copy link
Member

public class RunTest {
    public static void main(String[] args) {
        System.out.println("Starting test...");

        final ArrayList<String> strings = new ArrayList<String>(200000);

        int num = 10000;
        while (true) {
            long start = System.currentTimeMillis();
            final AtomicInteger count = new AtomicInteger();
            for (int i = 0; i < num; i++) {
                new TestService1(2, 5).toObservable().forEach(new Action1<Integer>() {

                    @Override
                    public void call(Integer v) {
                        count.addAndGet(v);
                    }
                });

                new TestService2("hello").toObservable().forEach(new Action1<String>() {

                    @Override
                    public void call(String v) {
                        strings.add(v);
                    }

                });
            }
            long time = (System.currentTimeMillis() - start);
            long executions = num * 2;
            System.out.println("Time: " + time + "ms for " + executions + " executions (" + (time * 1000) / executions + " microseconds)");
            System.out.println("   Count: " + count);
            System.out.println("   Strings: " + strings.size());
            strings.clear();
        }
    }
}
  • Also made OperationObserveOn not use ScheduledObserver if the ImmediateScheduler is chosen to allow an optimization. I believe this optimization is safe because ScheduledObserver does not require knowledge of a Scheduler (such as for now()) and all we do is emit data to the Observer on a scheduler and if we know it's Immediate we can go direct and skip the enqueuing step. This allows shaving off a noticable number of microseconds per execution in the loop above.

- found a concurrency bug while working on Netflix/Hystrix#123
- the following code would lock up occasionally due to onCompleted not being delivered:

```java
public class RunTest {
    public static void main(String[] args) {
        System.out.println("Starting test...");

        final ArrayList<String> strings = new ArrayList<String>(200000);

        int num = 10000;
        while (true) {
            long start = System.currentTimeMillis();
            final AtomicInteger count = new AtomicInteger();
            for (int i = 0; i < num; i++) {
                new TestService1(2, 5).toObservable().forEach(new Action1<Integer>() {

                    @OverRide
                    public void call(Integer v) {
                        count.addAndGet(v);
                    }
                });

                new TestService2("hello").toObservable().forEach(new Action1<String>() {

                    @OverRide
                    public void call(String v) {
                        strings.add(v);
                    }

                });
            }
            long time = (System.currentTimeMillis() - start);
            long executions = num * 2;
            System.out.println("Time: " + time + "ms for " + executions + " executions (" + (time * 1000) / executions + " microseconds)");
            System.out.println("   Count: " + count);
            System.out.println("   Strings: " + strings.size());
            strings.clear();
        }
    }
}
```

- Also made OperationObserveOn not use ScheduledObserver if the `ImmediateScheduler` is chosen to allow an optimization. I believe this optimization is safe because ScheduledObserver does not require knowledge of a Scheduler (such as for now()) and all we do is emit data to the Observer on a scheduler and if we know it's Immediate we can go direct and skip the enqueuing step. This allows shaving off a noticable number of microseconds per execution in the loop above.
benjchristensen added a commit that referenced this pull request May 10, 2013
Fix concurrency bug in ScheduledObserver
@benjchristensen benjchristensen merged commit 24b6b37 into ReactiveX:master May 10, 2013
@cloudbees-pull-request-builder

RxJava-pull-requests #140 SUCCESS
This pull request looks good

rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
…r configurations and overriding of Spring beans.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants