New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No InterruptedException with synchronous BlockingObservable #3120

Merged
merged 1 commit into from Jul 30, 2015

Conversation

Projects
None yet
2 participants
@ypresto
Contributor

ypresto commented Jul 29, 2015

I'm using Observable for backport of Java 8's java.util.stream (i.e. no more loops) for Android project.

List<Foo> list2 = Observable.from(list)
        .map(...)
        .filter(...)
        .toList().toBlocking().single()

But it sometimes emits InterruptedException at BlockingObservable.
#1804 (comment)

As the BlocingObservable is placed in map() of another observable with subscribeOn(Schedulers.io()), and it is unsubscribed from main thread, perhaps Future.cancel(true) is called on unsubscribing. (#1914)

This PR allows BlockingObsevable not to be interrupted when source observable emits synchronously, by checking current latch or queue state before awaiting for them.

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Jul 29, 2015

Member

So what happens is that you use some blocking calls inside an observable sequence and it gets interrupted because of the underlying task is cancelled?

I believe it is an indication that the flow can be assembled non-blockingly instead. I've been fiddling with the idea that once RxJava scheduler threads become identifiable, toBlocking() would throw/onError if it is attempted to use it inside such schedulers.

Member

akarnokd commented Jul 29, 2015

So what happens is that you use some blocking calls inside an observable sequence and it gets interrupted because of the underlying task is cancelled?

I believe it is an indication that the flow can be assembled non-blockingly instead. I've been fiddling with the idea that once RxJava scheduler threads become identifiable, toBlocking() would throw/onError if it is attempted to use it inside such schedulers.

@ypresto

This comment has been minimized.

Show comment
Hide comment
@ypresto

ypresto Jul 30, 2015

Contributor

So what happens is that you use some blocking calls inside an observable sequence and it gets interrupted because of the underlying task is cancelled?

Yes. But in my case, blocking call is inside of another (synchronous) method and I don't want to make it non-blocking, for testability and convenience.

For example, in instagram-like app, there are entities to represent photo/movie and comment.
Both entities has entity translation logic and they can be written in for-loop or toList().toBlocking().single().

// MediaFileModel
public MediaFileEntity entityFromDao(MediaFileDao dao, List<CommentDao> commentDaoList) {
    List<CommentEntity> comments = Observable.from(commentDaoList)
            .map(mCommentModel::entityFromDao) // No more loops!
            .filter(not(mCommentModel::isDeleted))
            .toList().toBlocking().single(); // called from where..?
    MediaFileEntity mediaFile = new MediaFileEntity();
    mediaFile.setUuid(dao.getUuid());
    mediaFile.setComments(comments);
    ...
    return mediaFile;
}
// MediaFileStore
public Observable<List<MediaFileEntity>> allEntities() {
    return queryAll() // async operation
            .map(daoList -> {
                    Observable.from(daoList)
                            .map(mMediaFileModel::entityFromDao) // is calling toBlocking() or not...?
                            .toList()
            );
}

I think so too that asynchronous (with blocking wait) operation should be assembled non-blockingly.
But I think it is just convenient if nesting of synchronous (without blocking wait) operation is allowed. :)

Contributor

ypresto commented Jul 30, 2015

So what happens is that you use some blocking calls inside an observable sequence and it gets interrupted because of the underlying task is cancelled?

Yes. But in my case, blocking call is inside of another (synchronous) method and I don't want to make it non-blocking, for testability and convenience.

For example, in instagram-like app, there are entities to represent photo/movie and comment.
Both entities has entity translation logic and they can be written in for-loop or toList().toBlocking().single().

// MediaFileModel
public MediaFileEntity entityFromDao(MediaFileDao dao, List<CommentDao> commentDaoList) {
    List<CommentEntity> comments = Observable.from(commentDaoList)
            .map(mCommentModel::entityFromDao) // No more loops!
            .filter(not(mCommentModel::isDeleted))
            .toList().toBlocking().single(); // called from where..?
    MediaFileEntity mediaFile = new MediaFileEntity();
    mediaFile.setUuid(dao.getUuid());
    mediaFile.setComments(comments);
    ...
    return mediaFile;
}
// MediaFileStore
public Observable<List<MediaFileEntity>> allEntities() {
    return queryAll() // async operation
            .map(daoList -> {
                    Observable.from(daoList)
                            .map(mMediaFileModel::entityFromDao) // is calling toBlocking() or not...?
                            .toList()
            );
}

I think so too that asynchronous (with blocking wait) operation should be assembled non-blockingly.
But I think it is just convenient if nesting of synchronous (without blocking wait) operation is allowed. :)

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Jul 30, 2015

Member

The changes look okay to me.

Member

akarnokd commented Jul 30, 2015

The changes look okay to me.

akarnokd added a commit that referenced this pull request Jul 30, 2015

Merge pull request #3120 from ypresto/no-interrupt-for-sync
No InterruptedException with synchronous BlockingObservable

@akarnokd akarnokd merged commit 072ffad into ReactiveX:1.x Jul 30, 2015

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
@ypresto

This comment has been minimized.

Show comment
Hide comment
@ypresto

ypresto Jul 30, 2015

Contributor

Thanks! 🎉 🎉

Contributor

ypresto commented Jul 30, 2015

Thanks! 🎉 🎉

@ypresto ypresto deleted the ypresto:no-interrupt-for-sync branch Jul 31, 2015

@ypresto

This comment has been minimized.

Show comment
Hide comment
@ypresto

ypresto Apr 14, 2016

Contributor

For your (who found this PR by searching) information:

RxJava is quite slow for synchronous operation.
Especially for zip(), it has significant overhead of waiting for async (and even if it is sync actually) observables.

So it'll better to use Java 8 Stream API or its backport for sync stream operations.
Lightweight-Stream-API has only <500 methods and also suitable for Android.

Contributor

ypresto commented Apr 14, 2016

For your (who found this PR by searching) information:

RxJava is quite slow for synchronous operation.
Especially for zip(), it has significant overhead of waiting for async (and even if it is sync actually) observables.

So it'll better to use Java 8 Stream API or its backport for sync stream operations.
Lightweight-Stream-API has only <500 methods and also suitable for Android.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment