Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mainThread() scheduler immediate mode #365

Closed
bishopmatthew opened this issue Feb 17, 2017 · 7 comments
Closed

mainThread() scheduler immediate mode #365

bishopmatthew opened this issue Feb 17, 2017 · 7 comments

Comments

@bishopmatthew
Copy link

I know this has been brought up before multiple times (#228, #335), but I'm still not sure what to do in my case and wondered if anyone had an idea.

I have some data I need to restore into a RecyclerView.Adapter synchronously (if available) before the view state gets restored in order to maintain scroll position.

    private RecyclerView recyclerView;
    private Adapter adapter;
    
    @Override
    public View onCreateView (LayoutInflater inflater, ViewGroup container, Bundle savedInstanceState) {
        recyclerView = (RecyclerView) inflater.inflate(R.layout.fragment_sample);
        if (adapter == null) {
            adapter = new Adapter();
        }
        recyclerView.setAdapter(adapter);
        return recyclerView;
    }

    @Override
    public void onViewCreated (View view, Bundle savedInstanceState) {
        SampleRepository.getInstance()
                .getData()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Data>() {
                    @Override
                    public void call (Data data) {
                        adapter.setData(data);
                    }
                });
    }

I can't just leave off the observeOn(AndroidSchedulers.mainThread() because sampleRepository.getData() can either return a cached value from memory or a new value from the network. Any suggestions?

@JakeWharton
Copy link
Member

Can you just apply observeOn only to the network observable inside of SampleRepository?

@bishopmatthew
Copy link
Author

Thanks for the suggestion. I tried that, and can't get it to work. Possibly because my SampleRepository is actually a bit more complicated than I let on -- it returns the cached data (if available) but then concats a call to update with fresh data (my data source can't tell me if it has new data 😒 ).

    private LruCache<String, Data> mMemoryCache;

    public Observable<Data> getData(final String key) {
        return Observable.concat(getCachedData(key), requestFreshData(key));
    }

    private Observable<Data> requestFreshData (final String key) {
        return api.getData(key);
    }

    private Observable<Data> getCachedData (final String key) {
        final Data cachedData = mMemoryCache.get(key);
        if (cachedData != null) {
            return Observable.just(cachedData);
        } else {
            return Observable.empty();
        }
    }

@JakeWharton
Copy link
Member

    private Observable<Data> requestFreshData (final String key) {
        return api.getData(key).observeOn(mainThread());
    }

@YLBFDEV
Copy link

YLBFDEV commented Feb 21, 2017

import io.reactivex.android.schedulers.AndroidSchedulers;
import rx.Observer;
import rx.schedulers.Schedulers;
observeOn(AndroidSchedulers.mainThread())

Error:(119, 56) 错误: 不兼容的类型: io.reactivex.Scheduler无法转换为rx.Scheduler

 RetrofitManager.getInstance().getData()
                .delay(2, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Data>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {
                       
                    }

                    @Override
                    public void onNext(Data data) {
                        
                    }
                });

@bishopmatthew
Copy link
Author

@JakeWharton sorry I was off the grid for the long weekend. I published an example that (I think) demonstrates the problem.

https://github.com/bishopmatthew/SynchronousSchedulerTest/

If you run it and rotate the device, you'll see a Toast that says whether the state was restored before onViewStateRestored gets called.

https://github.com/bishopmatthew/SynchronousSchedulerTest/blob/master/app/src/main/java/com/example/synchronousschedulertest/SampleRepository.java#L36

    public Observable<Data> getData (final String key) {
        return getCachedData(key)
                .observeOn(Schedulers.io())
                .concatWith(requestFreshData(key))
                .observeOn(AndroidSchedulers.mainThread());
    }

Actually, I tried the FastPathHandlerScheduler from #228 and it didn't help either. It seems like if I have any observeOn's in the chain, it always runs my code via a LooperScheduler.

"main@3972" prio=5 runnable
  java.lang.Thread.State: RUNNABLE
	  at com.example.synchronousschedulertest.MainFragment$1.call(MainFragment.java:41)
	  at com.example.synchronousschedulertest.MainFragment$1.call(MainFragment.java:38)
	  at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
	  at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
	  at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101)
	  at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:224)
	  at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)
	  at android.os.Handler.handleCallback(Handler.java:739)
	  at android.os.Handler.dispatchMessage(Handler.java:95)
	  at android.os.Looper.loop(Looper.java:148)
	  at android.app.ActivityThread.main(ActivityThread.java:5417)
	  at java.lang.reflect.Method.invoke(Method.java:-1)
	  at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)
	  at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)

You can see the same thing if you put a breakpoint in my Fragment's subscription's onNext.

Whereas if I don't do any observeOn's at all, it just runs synchronously:

"main@3972" prio=5 runnable
  java.lang.Thread.State: RUNNABLE
	  at com.example.synchronousschedulertest.MainFragment$1.call(MainFragment.java:41)
	  at com.example.synchronousschedulertest.MainFragment$1.call(MainFragment.java:38)
	  at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
	  at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
	  at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101)
	  at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101)
	  at rx.observers.SerializedObserver.onNext(SerializedObserver.java:91)
	  at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
	  at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
	  at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
	  at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101)
	  at rx.observers.Subscribers$5.onNext(Subscribers.java:235)
	  at rx.internal.util.ScalarSynchronousObservable$WeakSingleProducer.request(ScalarSynchronousObservable.java:276)
	  at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
	  at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.setProducer(OnSubscribeConcatMap.java:329)
	  at rx.Subscriber.setProducer(Subscriber.java:205)
	  at rx.Subscriber.setProducer(Subscriber.java:205)
	  at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:138)
	  at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:129)
	  at rx.Observable.unsafeSubscribe(Observable.java:10200)
	  at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
	  at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
	  at rx.Observable.unsafeSubscribe(Observable.java:10200)
	  at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	  at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	  at rx.Observable.unsafeSubscribe(Observable.java:10200)
	  at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
	  at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
	  at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:100)
	  at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
	  at rx.Subscriber.setProducer(Subscriber.java:211)
	  at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
	  at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
	  at rx.Observable.unsafeSubscribe(Observable.java:10200)
	  at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
	  at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
	  at rx.Observable.unsafeSubscribe(Observable.java:10200)
	  at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	  at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	  at rx.Observable.unsafeSubscribe(Observable.java:10200)
	  at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	  at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	  at rx.Observable.subscribe(Observable.java:10296)
	  at rx.Observable.subscribe(Observable.java:10263)
	  at rx.Observable.subscribe(Observable.java:10103)
my code--> at com.example.synchronousschedulertest.MainFragment.onViewCreated(MainFragment.java:38) 
	  at android.support.v4.app.FragmentManagerImpl.moveToState(FragmentManager.java:1314)
	  at [... cut for brevity]

@JakeWharton
Copy link
Member

Replace

    public Observable<Data> getData (final String key) {
        return getCachedData(key)
                .observeOn(Schedulers.io())
                .concatWith(requestFreshData(key))
                .observeOn(AndroidSchedulers.mainThread());
    }

with

    public Observable<Data> getData (final String key) {
        return getCachedData(key)
                .concatWith(requestFreshData(key)
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread()));
    }

@bishopmatthew
Copy link
Author

Wow it works! I swear I tried that. Er, I guess not with the nested subscribeOn(). I thought subscribeOn was going to go all the way up the chain and change the thread the initial observable runs on. Anyways, a million thanks for your help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants