Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to implement polling using Observables? #448

Closed
lexer opened this Issue Oct 22, 2013 · 20 comments

Comments

Projects
None yet
8 participants
@lexer
Copy link

lexer commented Oct 22, 2013

I've got parametrized rest call with Observable interface:

   api.updateAppState(params);

I want to repeat it with delay so I have created delayed version of it:

   Observable<AppState> delayedApiCall = Observable.interval(delay, TimeUnit.SECONDS)
            .first()
            .flatMap(new Func1<Long, Observable<AppState>>() {
        @Override
        public Observable<AppState> call(Long seconds) {
            return lyftApi.updateAppState(params);
        }
    });

But now I want to have polling observable that will recreate "delayedApiCall" with params and produce continuous results. I also want observable to continue producing results even if error was returned in "delayedApiCall".

    pollingObservable.subscribe(new Observer<AppState>() {
        onNext(AppState appSte) {
           ....
        },
        onError(Throwable e) {
           ....
        }
    });
@lexer

This comment has been minimized.

Copy link
Author

lexer commented Oct 22, 2013

I think about this implementation:

    Observable.create(new Observable.OnSubscribeFunc<AppState>() {
        public Subscription currentRequest;

        @Override
        public Subscription onSubscribe(final Observer<? super AppState> observer) {
            currentRequest = createDelayedApiCal().subscribe(createRequestObserver(observer));

            return new Subscription() {
                @Override
                public void unsubscribe() {
                    currentRequest.unsubscribe();
                }
            };
        }

        private Observer<AppState> createRequestObserver(final Observer<? super AppState> observer) {
            return new Observer<AppState>() {
                @Override
                public void onCompleted() {
                    currentRequest = createDelayedApiCal().subscribe(createRequestObserver(observer));
                }

                @Override
                public void onError(Throwable throwable) {
                    observer.onError(throwable);
                }

                @Override
                public void onNext(AppState appState) {
                    observer.onNext(appState);
                }
            };
        }
    });

But we have recursive subscribe here. Probably this is bad usage of observables. Could you share your opinion?

@samuelgruetter

This comment has been minimized.

Copy link
Contributor

samuelgruetter commented Oct 23, 2013

What about this?

 Observable.interval(delay, TimeUnit.SECONDS)
     .flatMap(new Func1<Long, Observable<Notification<AppState>>>() {
            public Observable<Notification<AppState>> call(Long seconds) { 
                   return lyftApi.updateAppState(params).materialize(); } });
@lexer

This comment has been minimized.

Copy link
Author

lexer commented Oct 23, 2013

@samuelgruetter interval won't wait until operation will be completed. im also guess that same params will be used for each request in this case.

@samuelgruetter

This comment has been minimized.

Copy link
Contributor

samuelgruetter commented Oct 23, 2013

Yes. 

  1. What will params depend on?
  2. Somehow you should make sure that if an updateAppState request is lost in the network and you never get any onNext, onComplete or onError, you still keep polling. That's why I suggested the above approach with interval. But you could also combine your recursive approach with the timeout operator.

PS I had to delete one post because I accidentally hit 'comment' too early on my smartphone

@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Dec 6, 2013

Any conclusion on this?

@lexer

This comment has been minimized.

Copy link
Author

lexer commented Dec 6, 2013

@benjchristensen probably yes. If we will use @samuelgruetter implementation with recently implemented TimeInterval operator.

    Observable.interval(delay, TimeUnit.SECONDS).timeInterval()
            .flatMap(new Func1<Long, Observable<Notification<AppState>>>() {
                public Observable<Notification<AppState>> call(Long seconds) {
                    return lyftApi.updateAppState(params).materialize(); } });

Also in my case i still have hacky implementation since my delay is dynamic and I need Delay operator to be implemented.

@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Jan 2, 2014

How about something like this?

        Observable.create({ observer ->
            Schedulers.newThread().schedulePeriodically({
                observer.onNext("application-state-from-network");
            }, 0, 1000, TimeUnit.MILLISECONDS);
        }).take(10).subscribe({ v -> println(v) });
@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Jan 2, 2014

Here is Java using manual recursion instead of schedulePeriodically:

        Observable.create(new OnSubscribeFunc<String>() {

            @Override
            public Subscription onSubscribe(final Observer<? super String> o) {

                return Schedulers.newThread().schedule(0L, new Func2<Scheduler, Long, Subscription>() {

                    @Override
                    public Subscription call(Scheduler inner, Long t2) {
                        o.onNext("data-from-polling");
                        return inner.schedule(t2, this, 1000, TimeUnit.MILLISECONDS);
                    }
                });
            }

        }).toBlockingObservable().forEach(new Action1<String>() {

            @Override
            public void call(String v) {
                System.out.println("output: " + v);
            }

        });
@lexer

This comment has been minimized.

Copy link
Author

lexer commented Jan 2, 2014

@benjchristensen

I think primary problem is that schedulePeriodically do not wait till operation is completed.

Java ScheduledExecutorService(http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html) has two different methods:

  1. scheduleWithFixedDelay
  2. scheduleAtFixedRate

So I think we need to add scheduleWithFixedDelay analog to Rx scheduler.

@lexer

This comment has been minimized.

Copy link
Author

lexer commented Jan 11, 2014

@benjchristensen Haven't noticed you second message. Thanks a lot for this manual recursion implementation!

@roman-ku

This comment has been minimized.

Copy link

roman-ku commented Jul 17, 2016

@benjchristensen

How can scheduling periodically but wait for completion be done today using RxJava? Your code snippet has several things that have been deprecated, so I am wondering what an alternative would look like?

@akarnokd

This comment has been minimized.

Copy link
Member

akarnokd commented Jul 17, 2016

Use Observable.interval() as the starting point and forget that manual recursion.

Observable.interval(1, TimeUnit.SECONDS, Schedulers.io())
.map(time -> pollSource())
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace)
@roman-ku

This comment has been minimized.

Copy link

roman-ku commented Jul 18, 2016

@akarnokd

The problem with that is that the "interval" operator is dumb. It will continue to pump out a pulse every X seconds without any regards what happens down the line. That is not what I want. The "toBlocking" does nothing to fix this.

The behavior I want is a new tick from the time the poll finished not every 20 seconds:

What your code does:
0 (seconds): tick generated
0: poll started
5: poll finished
20: new tick generated

What I want:
0 (seconds): tick generated
0: poll started
5: poll finished
25: new tick generated

@akarnokd

This comment has been minimized.

Copy link
Member

akarnokd commented Jul 18, 2016

Okay then, how about:

Observable.fromCallable(() -> pollValue())
.repeatWhen(o -> o.concatMap(v -> Observable.timer(20, TimeUnit.SECONDS)));

@akarnokd akarnokd added the Question label Jul 18, 2016

@roman-ku

This comment has been minimized.

Copy link

roman-ku commented Jul 19, 2016

@akarnokd

I did some testing and it looks like it's working as intended. I will have to throw in some conditional check so it doesn't go on forever but it looks like we have a winner!

Thank you so much! I really appreciate your help!

@cxzhang2

This comment has been minimized.

Copy link

cxzhang2 commented Jan 31, 2017

@akarnokd
I maybe be doing something wrong but it looks like unsubscribing to the Observable you suggested won't stop the currently scheduled emission from going out, any way to get around this? Thanks!

@akarnokd

This comment has been minimized.

Copy link
Member

akarnokd commented Jan 31, 2017

@cxzhang2 An unsubscribe should stop that timer. Do you have some code that demonstrates what you experience?

@cxzhang2

This comment has been minimized.

Copy link

cxzhang2 commented Jan 31, 2017

@akarnokd totally my mistake, great solution thank you!

@lby1992

This comment has been minimized.

Copy link

lby1992 commented Jul 18, 2017

I found this post, hope it helps.

@Lavaei

This comment has been minimized.

Copy link

Lavaei commented Aug 8, 2017

You can use retryWhen instead of repeatWhen and in your map() operator use Rx.Observable.throw(new Error('My Error Message')) in case you want to retry the request

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.