-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Description
Hi,
It would appear to me that there's an issue with using when using Observable.cache() and Schedulers.io(). The issue is that an indefinite amount of threads are created as opposed to having thread re-use.
This can eventually lead the following fatal scenario: "java.lang.OutOfMemoryError: Unable to create new native thread".
I did notice that when using Observable.cache() and Schedulers.from(Executors.newCachedThreadPool()) Rx is able to re-use threads.
Other than thread-caching configuration, it's not clear to me why these 2 implementations produce wildly different thread caching results and behavior. I can only assume that this behavior is not intended. It would be great to hear whether that's the case.
Here's a sample program I've put together that demonstrates the 2 different results. It's a very simple example that creates multiple observables and ensures that they are subscribed to and that result is read. I'm using SettableFuture to mock a lag between calls.
Result with Schedulers.from(Executors.newCachedThreadPool()) (last few items displayed, notice the re-use in threads):
Thread[pool-1-thread-18,5,main]
Thread[pool-1-thread-15,5,main]
Thread[pool-1-thread-20,5,main]
Thread[pool-1-thread-7,5,main]
Thread[pool-1-thread-6,5,main]
Thread[pool-1-thread-11,5,main]
Thread[pool-1-thread-17,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-21,5,main]
Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-18,5,main]
Thread[pool-1-thread-10,5,main]
Thread[pool-1-thread-7,5,main]
Thread[pool-1-thread-11,5,main]
Thread[pool-1-thread-20,5,main]
Result with Schedulers.io() (last few items displayed, notice that the number is incremental, no re-use):
Thread[RxCachedThreadScheduler-187,5,main]
Thread[RxCachedThreadScheduler-196,5,main]
Thread[RxCachedThreadScheduler-189,5,main]
Thread[RxCachedThreadScheduler-198,5,main]
Thread[RxCachedThreadScheduler-191,5,main]
Thread[RxCachedThreadScheduler-200,5,main]
Thread[RxCachedThreadScheduler-193,5,main]
Thread[RxCachedThreadScheduler-195,5,main]
Thread[RxCachedThreadScheduler-197,5,main]
Thread[RxCachedThreadScheduler-199,5,main]
public class RxIndefiniateThreads {
static final Scheduler scheduler = Schedulers.from(Executors.newCachedThreadPool());
// static final Scheduler scheduler = Schedulers.io();
public static void main(String[] args) throws Exception {
int i = 0;
final AtomicInteger counter = new AtomicInteger(0);
while (true) {
final SettableFuture<Object> first = SettableFuture.create();
final SettableFuture<Object> second = SettableFuture.create();
Observable<Object> observe = Async.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
System.out.println(Thread.currentThread());
return first.get();
}
}, scheduler).observeOn(scheduler).flatMap(new Func1<Object, Observable<Object>>() {
@Override
public Observable<Object> call(Object t1) {
System.out.println(Thread.currentThread());
try {
return Observable.just((Object) (t1.toString() + " :: " + second.get()));
} catch (Exception e) {
throw new IllegalStateException("Mock exception for second");
}
}
}).cache();
final Future<Object> future = observe.toBlocking().toFuture();
// Thread.sleep(2);
i++;
new Thread(new Runnable(){
@Override
public void run() {
int count = counter.incrementAndGet();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
first.set("done");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
second.set("done");
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
// System.out.println("doneWith=" + count);
}
}).start();
if (i == 100) {
break;
}
}
System.out.println("done");
}
}
dependencies:
com.google.guava
guava
18.0
io.reactivex
rxjava
1.0.3
io.reactivex
rxjava-async-util
0.21.0
io.reactivex
rxjava