Version
RxJava 3.x
(Based on source code analysis of io.reactivex.rxjava3.internal.schedulers.IoScheduler)
Description
I am conducting static analysis on open-source Java projects to detect concurrency misuse patterns. I identified a potential "Unbounded Number of Threads" risk in IoScheduler.java.
Problem Location
- Class:
io.reactivex.rxjava3.internal.schedulers.IoScheduler
- Inner Class:
CachedWorkerPool
- Method:
get()
Code Analysis
In the get() method of the internal CachedWorkerPool, the logic attempts to reuse an existing idle worker from expiringWorkerQueue. However, if the queue is empty (which happens frequently during a burst of concurrent tasks), it unconditionally creates a new thread without checking against any maximum pool size limit.
// Logic from IoScheduler.java
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
// 1. Try to reuse an idle worker
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// 2. [Risk] No upper limit check here.
// No cached worker found, so create a new one immediately.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
Risk Scenario
While Schedulers.io() is documented as appropriate for I/O-bound work, the lack of a "safety net" (Defense in Depth) can lead to severe stability issues. A common misuse pattern involves developers triggering a large number of concurrent blocking tasks using flatMap, often referred to as a "Concurrency Bomb":
// Example of misuse leading to Thread Explosion
Observable.range(1, 10000)
.flatMap(i -> doBlockingIO(i).subscribeOn(Schedulers.io()))
.subscribe();
In this scenario, IoScheduler will attempt to create 10,000 native threads almost instantly because the expiringWorkerQueue is drained faster than workers are returned. This rapidly leads to java.lang.OutOfMemoryError: unable to create new native thread and crashes the JVM.
Suggestion
To improve resilience and prevent application crashes due to misuse:
-
Soft Limit / Cap: Consider introducing a system property (e.g., rx3.io-max-threads) to enforce an optional upper limit in the get() method. If the limit is reached, the scheduler could queue the task or fallback to a rejection policy.
-
Documentation: If strict bounding is contrary to the design philosophy, adding a prominent warning in the IoScheduler or Schedulers.io() Javadoc regarding the "Unbounded" nature and the specific risk of flatMap misuse would help developers avoid this pitfall.
Thank you for maintaining this essential library!
Version
RxJava 3.x
(Based on source code analysis of
io.reactivex.rxjava3.internal.schedulers.IoScheduler)Description
I am conducting static analysis on open-source Java projects to detect concurrency misuse patterns. I identified a potential "Unbounded Number of Threads" risk in
IoScheduler.java.Problem Location
io.reactivex.rxjava3.internal.schedulers.IoSchedulerCachedWorkerPoolget()Code Analysis
In the
get()method of the internalCachedWorkerPool, the logic attempts to reuse an existing idle worker fromexpiringWorkerQueue. However, if the queue is empty (which happens frequently during a burst of concurrent tasks), it unconditionally creates a new thread without checking against any maximum pool size limit.Risk Scenario
While Schedulers.io() is documented as appropriate for I/O-bound work, the lack of a "safety net" (Defense in Depth) can lead to severe stability issues. A common misuse pattern involves developers triggering a large number of concurrent blocking tasks using flatMap, often referred to as a "Concurrency Bomb":
In this scenario, IoScheduler will attempt to create 10,000 native threads almost instantly because the expiringWorkerQueue is drained faster than workers are returned. This rapidly leads to java.lang.OutOfMemoryError: unable to create new native thread and crashes the JVM.
Suggestion
To improve resilience and prevent application crashes due to misuse:
Soft Limit / Cap: Consider introducing a system property (e.g., rx3.io-max-threads) to enforce an optional upper limit in the get() method. If the limit is reached, the scheduler could queue the task or fallback to a rejection policy.
Documentation: If strict bounding is contrary to the design philosophy, adding a prominent warning in the IoScheduler or Schedulers.io() Javadoc regarding the "Unbounded" nature and the specific risk of flatMap misuse would help developers avoid this pitfall.
Thank you for maintaining this essential library!