New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get RxCachedThreadScheduler-n when calling Disposable.dispose() #4863

Closed
enginebai opened this Issue Nov 18, 2016 · 20 comments

Comments

Projects
None yet
7 participants
@enginebai
Copy link

enginebai commented Nov 18, 2016

Requirement

I migrate from 1.x to 2.x, replace Subscription to Disposable, and I'd like to cancel the subscription before a new subscription starts. But I got RxCachedThreadScheduler-n when calling Disposable.dispose(). I've check #4807 and found that it may timeout problem, but I'm sure that my api is pretty fast and won't be timeout at all. How can I resolve this problem??

Exception

E/AndroidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1
                  Process: com.machipopo.swag, PID: 30241
                  java.io.InterruptedIOException: thread interrupted
                      at okio.Timeout.throwIfReached(Timeout.java:145)
                      at okio.Okio$2.read(Okio.java:136)
                      at okio.AsyncTimeout$2.read(AsyncTimeout.java:211)
                      at okio.RealBufferedSource.request(RealBufferedSource.java:71)
                      at okio.RealBufferedSource.require(RealBufferedSource.java:64)
                      at okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:270)
                      at okhttp3.internal.http.Http1xStream$ChunkedSource.readChunkSize(Http1xStream.java:441)
                      at okhttp3.internal.http.Http1xStream$ChunkedSource.read(Http1xStream.java:422)
                      at okio.RealBufferedSource.read(RealBufferedSource.java:50)
                      at okio.RealBufferedSource.exhausted(RealBufferedSource.java:60)
                      at okio.InflaterSource.refill(InflaterSource.java:101)
                      at okio.InflaterSource.read(InflaterSource.java:62)
                      at okio.GzipSource.read(GzipSource.java:80)
                      at okio.RealBufferedSource.request(RealBufferedSource.java:71)
                      at okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:225)
                      at okhttp3.RealCall$ApplicationInterceptorChain.proceed(RealCall.java:187)
                      at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:160)
                      at okhttp3.RealCall.execute(RealCall.java:57)
                      at com.test.api.ApiService.get(ApiService.java:145)
                      at com.test.UserApi$1.subscribe(UserApi.java:63)

Old 1.x Code

if (mSubscriptionLoadMe != null && !mSubscriptionLoadMe.isUnsubscribed()) 
	mSubscriptionLoadMe.unsubscribe();
mSubscriptionLoadMe = UserApi.getUserInfo(this, userId)
		.subscribeOn(Schedulers.io())
		.observeOn(AndroidSchedulers.mainThread())
		.subscribe(new Subscriber<String>() {
			onNext();
			onCompleted();
			onError();
		});

New 2.x Code

if (mSubscriptionLoadMe != null && !mSubscriptionLoadMe.isDisposed())
	mSubscriptionLoadMe.dispose();
mSubscriptionLoadMe = UserApi.getUserInfo(this, userId)
		.subscribeOn(Schedulers.io())
		.observeOn(AndroidSchedulers.mainThread())
		.subscribeWith(new DisposableObserver<String>() {
			onNext();
			onCompleted();
			onError();
		});
@enginebai

This comment has been minimized.

Copy link
Author

enginebai commented Nov 18, 2016

I've add timeout setting to OkHttpClient, but no help. (HTTP_TIMEOUT = 180 secs)

new OkHttpClient.Builder()
        .addInterceptor(new HttpLoggingInterceptor().setLevel(debugLevel))
        .addNetworkInterceptor(new StethoInterceptor())
        .connectTimeout(ApiService.HTTP_TIMEOUT, TimeUnit.SECONDS)
        .writeTimeout(ApiService.HTTP_TIMEOUT, TimeUnit.SECONDS)
        .readTimeout(ApiService.HTTP_TIMEOUT, TimeUnit.SECONDS)
        .build();
@akarnokd

This comment has been minimized.

Copy link
Member

akarnokd commented Nov 18, 2016

Maybe your thread is in an interrupted state when you subscribe.

@akarnokd akarnokd added the Question label Nov 18, 2016

@enginebai

This comment has been minimized.

Copy link
Author

enginebai commented Nov 21, 2016

How do I fix that? Or how can I cancel a api call subscription in 2.x?

@akarnokd

This comment has been minimized.

Copy link
Member

akarnokd commented Nov 21, 2016

Thread.currentThread().interrupted() will clear the interrupt state before you subscribe. Otherwise, this seems to be an issue with OkHttp and you have a better chance asking them about the situation.

@enginebai

This comment has been minimized.

Copy link
Author

enginebai commented Nov 21, 2016

OK, one more question here, is my migration code from 1.x to 2.x correct? I'd like to make sure that there is only one api call at a time.

if (mSubscriptionLoadMe != null && !mSubscriptionLoadMe.isDisposed())
	mSubscriptionLoadMe.dispose();
mSubscriptionLoadMe = UserApi.getUserInfo(this, userId)
		.subscribeOn(Schedulers.io())
		.observeOn(AndroidSchedulers.mainThread())
		.subscribeWith(new DisposableObserver<String>() {
			onNext();
			onCompleted();
			onError();
		});
@akarnokd

This comment has been minimized.

Copy link
Member

akarnokd commented Nov 21, 2016

Yes, the patterns remained largely the same.

@enginebai

This comment has been minimized.

Copy link
Author

enginebai commented Nov 21, 2016

Got it, thanks for your help.

@michaldrabik

This comment has been minimized.

Copy link

michaldrabik commented Nov 21, 2016

  override fun getCities(): Observable<List<City>> {
    return syncCities().publish { Observable.merge(it, databaseManager.getCities().takeUntil(it)) }
        .filter { it.isNotEmpty() }
        .distinct()
        .onErrorResumeNext(databaseManager.getCities())
  }

  private fun syncCities(): Observable<List<City>> {
    return Observable.error { Throwable("Error") }
}

I'm getting

**_FATAL EXCEPTION: RxCachedThreadScheduler-1
Process: *.debug, PID: 27335
java.lang.Throwable: Error

when running above code. Anyone could point me to what is wrong there?

EDIT getCities() is called with subscribeOn(io()) and observeOn(mainThread()) of course

@enginebai

This comment has been minimized.

Copy link
Author

enginebai commented Nov 22, 2016

@michaldrabik What's your full exception stack trace?

@michaldrabik

This comment has been minimized.

Copy link

michaldrabik commented Nov 22, 2016

@enginebai Pasted here: http://pastebin.com/GHjJWtiS

When I remove takeUntil() everything works fine. Might be me missing something...

@personshelldon

This comment has been minimized.

Copy link

personshelldon commented Dec 19, 2016

This is not work for me too, I wrote a simple test app for RxJava2 and next code causes application to crash:

private Disposable subscription;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        subscription = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Thread.sleep(5000);
                if (!e.isDisposed()) {
                    e.onNext(25);
                    e.onComplete();
                }
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d("MainActivity","Value: "+integer);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e("MainActivity","Error: "+throwable.toString());
                    }
                });
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        subscription.dispose();
    }

I start simple async task in onCreate() method and I want to unsubscribe (dispose) in activity onDestroy call (Android), but subscription.dispose() causes the next crash (it not happens if Thread.sleep() finished):
12-19 15:11:56.797 5041-5058/com.example.don.rxjava2test E/AndroidRuntime: FATAL EXCEPTION:

RxCachedThreadScheduler-1
                                                                           Process: com.example.don.rxjava2test, PID: 5041
                                                                           java.lang.InterruptedException
                                                                               at java.lang.Thread.sleep(Native Method)
                                                                               at java.lang.Thread.sleep(Thread.java:371)
                                                                               at java.lang.Thread.sleep(Thread.java:313)
                                                                               at com.example.don.rxjava2test.MainActivity$3.subscribe(MainActivity.java:28)
                                                                               at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
                                                                               at io.reactivex.Observable.subscribe(Observable.java:10514)
                                                                               at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
                                                                               at io.reactivex.Scheduler$1.run(Scheduler.java:134)
                                                                               at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
                                                                               at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
                                                                               at java.util.concurrent.FutureTask.run(FutureTask.java:237)
                                                                               at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
                                                                               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
                                                                               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
                                                                               at java.lang.Thread.run(Thread.java:761)

I expect to see my error log, but application simply crashes.

@akarnokd

This comment has been minimized.

Copy link
Member

akarnokd commented Dec 19, 2016

@don11995 If you don't care about such exceptions then you can suppress them via:

RxJavaPlugins.setErrorHandler(Functions.<Throwable>emptyConsumer());
@personshelldon

This comment has been minimized.

Copy link

personshelldon commented Dec 19, 2016

@akarnokd , thank You, Your solution works, but what about if I want to handle this error in subscriber?
Also this is not called if InterruptedException happens:

RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e("MainActivity","Uncaught: "+throwable.toString());
            }
        });
@akarnokd

This comment has been minimized.

Copy link
Member

akarnokd commented Dec 19, 2016

Such errors happen after the lifecycle of Subscribers and you can't handle them there. Cancelling a Subscriber is an indication that you no longer want to receive any events.

@JakeWharton

This comment has been minimized.

Copy link
Member

JakeWharton commented Dec 20, 2016

That signal should be used to suppress the errors instead of conflating them with fundamentally underliverable ones (exception during onError, after onComplete, etc.). Sending the two different types of exceptions to a single callback means that you can neither crash the app to indicate a programming problem because it might just be someone that unsubscribed but you also cannot simple ignore all errors to the plugin because it might be a programming problem.

@akarnokd

This comment has been minimized.

Copy link
Member

akarnokd commented Feb 14, 2017

Closing via #5075 for now. Let us know if 2.0.6 fixes this for you (tomorrow).

@akarnokd akarnokd closed this Feb 14, 2017

@crazyhitty

This comment has been minimized.

Copy link

crazyhitty commented Feb 28, 2017

@akarnokd 2.0.6 still doesn't resolve this issue for me. I just disposed an okhttp call but instead of unsubscribing normally, it just crashed.

Here is the crash log.

FATAL EXCEPTION: RxCachedThreadScheduler-1
Process: com.crazyhitty.chdev.ks.rssmanagerlib.demo, PID: 27708
io.reactivex.exceptions.UndeliverableException: java.io.InterruptedIOException: thread interrupted
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:43)
    at io.reactivex.Observable.subscribe(Observable.java:10700)
    at io.reactivex.internal.operators.observable.ObservableZip$ZipCoordinator.subscribe(ObservableZip.java:110)
    at io.reactivex.internal.operators.observable.ObservableZip.subscribeActual(ObservableZip.java:72)
    at io.reactivex.Observable.subscribe(Observable.java:10700)
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
    at io.reactivex.Scheduler$1.run(Scheduler.java:138)
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
    at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
    at java.lang.Thread.run(Thread.java:761)
 Caused by: java.io.InterruptedIOException: thread interrupted
    at okio.Timeout.throwIfReached(Timeout.java:146)
    at okio.Okio$2.read(Okio.java:135)
    at okio.AsyncTimeout$2.read(AsyncTimeout.java:236)
    at okio.RealBufferedSource.request(RealBufferedSource.java:66)
    at okio.RealBufferedSource.require(RealBufferedSource.java:59)
    at okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:284)
    at okhttp3.internal.http1.Http1Codec$ChunkedSource.readChunkSize(Http1Codec.java:444)
    at okhttp3.internal.http1.Http1Codec$ChunkedSource.read(Http1Codec.java:425)
    at okio.RealBufferedSource.read(RealBufferedSource.java:45)
    at okio.RealBufferedSource.request(RealBufferedSource.java:66)
    at okio.RealBufferedSource.require(RealBufferedSource.java:59)
    at okio.GzipSource.consumeHeader(GzipSource.java:114)
    at okio.GzipSource.read(GzipSource.java:73)
    at okio.RealBufferedSource.request(RealBufferedSource.java:66)
    at okio.RealBufferedSource.rangeEquals(RealBufferedSource.java:387)
    at okio.RealBufferedSource.rangeEquals(RealBufferedSource.java:371)
    at okhttp3.internal.Util.bomAwareCharset(Util.java:412)
    at okhttp3.ResponseBody.string(ResponseBody.java:173)
    at com.crazyhitty.chdev.ks.rssmanager.RssReader$1.subscribe(RssReader.java:48)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10700) 
    at io.reactivex.internal.operators.observable.ObservableZip$ZipCoordinator.subscribe(ObservableZip.java:110) 
    at io.reactivex.internal.operators.observable.ObservableZip.subscribeActual(ObservableZip.java:72) 
    at io.reactivex.Observable.subscribe(Observable.java:10700) 
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39) 
    at io.reactivex.Scheduler$1.run(Scheduler.java:138) 
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59) 
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:237) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) 
    at java.lang.Thread.run(Thread.java:761) 
@akarnokd

This comment has been minimized.

Copy link
Member

akarnokd commented Feb 28, 2017

@crazyhitty looks like you have an Observable.create() around the okhttp blocking call. You have to check for cancellation before emitting any error if you don't want to receive such errors:

Observable.create(emitter -> {
    // ...
    try {
         responsebody.string();
    } catch (InterruptedException ex) {
        if (!emitter.isDisposed()) {
            emitter.onError(ex);
            return;
        }
    }
})
@crazyhitty

This comment has been minimized.

Copy link

crazyhitty commented Feb 28, 2017

@akarnokd You are a genius mate, thanks for the such an easy solution 👍

@Yazazzello

This comment has been minimized.

Copy link

Yazazzello commented Oct 7, 2017

@crazyhitty since version 2.1.1 tryOnError is available:

The emitter API (such as FlowableEmitter, SingleEmitter, etc.) now features a new method, tryOnError that tries to emit the Throwable if the sequence is not cancelled/disposed. Unlike the regular onError, if the downstream is no longer willing to accept events, the method returns false and doesn't signal an UndeliverableException.

https://github.com/ReactiveX/RxJava/blob/2.x/CHANGES.md

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment