Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make NewThreadScheduler create Daemon threads #631

Merged

Conversation

benjchristensen
Copy link
Member

This matches the behavior of Schedulers.COMPUTATION_EXECUTOR and Schedulers.IO_EXECUTOR.

See https://groups.google.com/forum/#!topic/rxjava/Qe1qi0aHtnE and #431 (comment)

This matches the behavior of Schedulers.COMPUTATION_EXECUTOR and Schedulers.IO_EXECUTOR.

See https://groups.google.com/forum/#!topic/rxjava/Qe1qi0aHtnE and ReactiveX#431 (comment)
@cloudbees-pull-request-builder

RxJava-pull-requests #561 FAILURE
Looks like there's a problem with this pull request

@akarnokd
Copy link
Member

This solves the non-termination problem, but should those terminations be so harsh? Perhaps allowing the core threads to timeout might be a better alternative to all of these schedulers, although the tuning of timeout value could be of concern.

@benjchristensen
Copy link
Member Author

I don't understand what you mean. Can you elaborate? For example, what timeout are you referring to?

This code should not prevent the JVM from exiting:

    public static void main(String args[]) {
        Observable.from(1, 2, 3, 4).observeOn(Schedulers.newThread()).toBlockingObservable().forEach(new Action1<Integer>() {

            @Override
            public void call(Integer i) {
                System.out.println("i: " + i);
            }
        });
    }

It should be application logic that chooses to block, not the exit of a JVM.

@akarnokd
Copy link
Member

I was talking about the thread pool and core thread timeouts (aka keepalive time):

private static class EventLoopScheduler extends Scheduler {
    private final ExecutorService executor;

    private EventLoopScheduler() {
        ThreadPoolExecutor e = (ThreadPoolExecutor)Executors.newFixedThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet());
            }
        });
        e.setKeepAliveTime(1, TimeUnit.SECONDS); // <-- mabe lower, maybe greater time in general 
        e.allowCoreThreadTimeOut(true);
        executor = e;
    }
    ...
}

The problem I encounter with daemon-threaded pools is that when the main thread exists, these threads stop immediately and don't finish their given tasks. This is why there are sleep() calls in various tests. They require a similar subscribe (new pool)/unsubscribe(shutdown pool) pattern to be used. I guess there are pros and cons for both cases.

@benjchristensen
Copy link
Member Author

If something needs to be waited on then toBlockingObservable() (or some other similar approach) should be used to block the main thread until the application completes. We don't want the main thread waiting on Scheduler threads. If an app let's the main thread complete then the app should finish.

Sleep and timeouts are non-deterministic and incorrect for using to keep an application running.

@akarnokd
Copy link
Member

Fair enough. But there is still an issue with NewThreadScheduler. Even if quitting the application removes these threads, performing thousand scheduling operation will yield thousand threads or an exception about the inability to create more threads. So until the application quits, these threads stay. I suggest adding the

e.setKeepAliveTime(50, TimeUnit.MILLISECONDS);
e.allowCoreThreadTimeOut(true);

from my example to the NewThreadScheduler to be safe. 50 milliseconds might be to big since threads are no way to be reused, so decreasing it to a small number (zero is disallowed) is also possible.

Test program:

public static void main(String[] args) throws Exception {
        BlockingObservable<Integer> source = Observable.from(1).observeOn(Schedulers.newThread()).toBlockingObservable();
        for (int i = 0; i < 20000; i++) {
            Thread.sleep(1);
            source.single();
            if (i % 1000 == 0) {
                System.out.println(i);
            }
        }
    }
}

- there is no guarantee for how many threads Interval will use so useless to assert anything on it
@cloudbees-pull-request-builder

RxJava-pull-requests #562 SUCCESS
This pull request looks good

benjchristensen added a commit that referenced this pull request Dec 17, 2013
Make NewThreadScheduler create Daemon threads
@benjchristensen benjchristensen merged commit a252dca into ReactiveX:master Dec 17, 2013
@benjchristensen benjchristensen deleted the NewThreadScheduler-Daemon branch December 17, 2013 19:49
@akarnokd
Copy link
Member

Should I open a new issue for the thread exhaustion problem?

rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
…ler-Daemon

Make NewThreadScheduler create Daemon threads
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants