Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented RefCount Operator #407

Merged
merged 11 commits into from
Oct 9, 2013
11 changes: 10 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.subjects.AsyncSubject;
import rx.subjects.PublishSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
Expand Down Expand Up @@ -3634,6 +3635,14 @@ public ConnectableObservable<T> publish() {
return OperationMulticast.multicast(this, PublishSubject.<T> create());
}

/**
* Returns a {@link ConnectableObservable} that shares a single subscription that contains the last notification only.
* @return a {@link ConnectableObservable}
*/
public ConnectableObservable<T> publishLast() {
return OperationMulticast.multicast(this, AsyncSubject.<T> create());
}

/**
* Synonymous with <code>reduce()</code>.
* <p>
Expand Down Expand Up @@ -4338,7 +4347,7 @@ public BlockingObservable<T> toBlockingObservable() {
* For why this is being used see https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*
* NOTE: If strong reasons for not depending on package names comes up then the implementation of this method can change to looking for a marker interface.
*
*
* @param o
* @return {@code true} if the given function is an internal implementation, and {@code false} otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.operators.OperationRefCount;
import rx.util.functions.Func1;

/**
* A ConnectableObservable resembles an ordinary {@link Observable}, except that it does not begin
Expand Down Expand Up @@ -46,4 +48,22 @@ protected ConnectableObservable(OnSubscribeFunc<T> onSubscribe) {
*/
public abstract Subscription connect();

/**
* Returns an observable sequence that stays connected to the source as long
* as there is at least one subscription to the observable sequence.
* @return a {@link Observable}
*/
public Observable<T> refCount() {
return refCount(this);
}

/**
* Returns an observable sequence that stays connected to the source as long
* as there is at least one subscription to the observable sequence.
* @return a {@link Observable}
* @param that a {@link ConnectableObservable}
*/
public static <T> Observable<T> refCount(ConnectableObservable<T> that) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a no arg instance method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi George, thanks for taking the time to review the code. Indeed there is a no arg instance method. I also included the static version to follow the pattern using in Observable.java where many operators have static and instance variants that delegate to the statics. It has an added bonus that it means you can test the operator on a mocked instance. I'm happy to remove the static method and test against a concrete instance though. Cheers, John

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, on closer inspection, those operations on Observable that have both static and instance variants like concat, merge and zip could be special cases. I'll remove the static method and rejig the tests.

return Observable.create(OperationRefCount.refCount(that));
}
}
66 changes: 66 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationRefCount.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.observables.ConnectableObservable;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;

/**
* Returns an observable sequence that stays connected to the source as long
* as there is at least one subscription to the observable sequence.
*/
public final class OperationRefCount<T> {
public static <T> Observable.OnSubscribeFunc<T> refCount(ConnectableObservable<T> connectableObservable) {
return new RefCount<T>(connectableObservable);
}

private static class RefCount<T> implements Observable.OnSubscribeFunc<T> {
private final ConnectableObservable<T> innerConnectableObservable;
private final Object gate = new Object();
private int count = 0;
private Subscription connection = null;

public RefCount(ConnectableObservable<T> innerConnectableObservable) {
this.innerConnectableObservable = innerConnectableObservable;
}

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
final Subscription subscription = innerConnectableObservable.subscribe(observer);
synchronized (gate) {
if (count++ == 0) {
connection = innerConnectableObservable.connect();
}
}
return Subscriptions.create(new Action0() {
@Override
public void call() {
synchronized (gate) {
if (--count == 0) {
connection.unsubscribe();
connection = null;
}
}
subscription.unsubscribe();
}
});
}
}
}
42 changes: 42 additions & 0 deletions rxjava-core/src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,48 @@ public void call(String v) {
}
}

@Test
public void testPublishLast() throws InterruptedException {
final AtomicInteger count = new AtomicInteger();
ConnectableObservable<String> connectable = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(final Observer<? super String> observer) {
count.incrementAndGet();
final BooleanSubscription subscription = new BooleanSubscription();
new Thread(new Runnable() {
@Override
public void run() {
observer.onNext("first");
observer.onNext("last");
observer.onCompleted();
}
}).start();
return subscription;
}
}).publishLast();

// subscribe once
final CountDownLatch latch = new CountDownLatch(1);
connectable.subscribe(new Action1<String>() {
@Override
public void call(String value) {
assertEquals("last", value);
latch.countDown();
}
});

// subscribe twice
connectable.subscribe(new Action1<String>() {
@Override
public void call(String _) {}
});

Subscription subscription = connectable.connect();
assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
assertEquals(1, count.get());
subscription.unsubscribe();
}

@Test
public void testReplay() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
Expand Down
82 changes: 82 additions & 0 deletions rxjava-core/src/test/java/rx/RefCountTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package rx;

import org.junit.Before;
import org.junit.Test;
import org.mockito.MockitoAnnotations;
import rx.observables.ConnectableObservable;
import rx.subscriptions.Subscriptions;

import static org.mockito.Mockito.*;

public class RefCountTests {

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}

@Test
public void subscriptionToUnderlyingOnFirstSubscription() {
@SuppressWarnings("unchecked")
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
Observable<Integer> refCounted = ConnectableObservable.refCount(connectable);
@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
when(connectable.subscribe(any(Observer.class))).thenReturn(Subscriptions.empty());
when(connectable.connect()).thenReturn(Subscriptions.empty());
refCounted.subscribe(observer);
verify(connectable, times(1)).subscribe(any(Observer.class));
verify(connectable, times(1)).connect();
}

@Test
public void noSubscriptionToUnderlyingOnSecondSubscription() {
@SuppressWarnings("unchecked")
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
Observable<Integer> refCounted = ConnectableObservable.refCount(connectable);
@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
when(connectable.subscribe(any(Observer.class))).thenReturn(Subscriptions.empty());
when(connectable.connect()).thenReturn(Subscriptions.empty());
refCounted.subscribe(observer);
refCounted.subscribe(observer);
verify(connectable, times(2)).subscribe(any(Observer.class));
verify(connectable, times(1)).connect();
}

@Test
public void unsubscriptionFromUnderlyingOnLastUnsubscription() {
@SuppressWarnings("unchecked")
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
Observable<Integer> refCounted = ConnectableObservable.refCount(connectable);
@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
Subscription underlying = mock(Subscription.class);
when(connectable.subscribe(any(Observer.class))).thenReturn(underlying);
Subscription connection = mock(Subscription.class);
when(connectable.connect()).thenReturn(connection);
Subscription first = refCounted.subscribe(observer);
first.unsubscribe();
verify(underlying, times(1)).unsubscribe();
verify(connection, times(1)).unsubscribe();
}

@Test
public void noUnsubscriptionFromUnderlyingOnFirstUnsubscription() {
@SuppressWarnings("unchecked")
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
Observable<Integer> refCounted = ConnectableObservable.refCount(connectable);
@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
Subscription underlying = mock(Subscription.class);
when(connectable.subscribe(any(Observer.class))).thenReturn(underlying);
Subscription connection = mock(Subscription.class);
when(connectable.connect()).thenReturn(connection);
Subscription first = refCounted.subscribe(observer);
Subscription second = refCounted.subscribe(observer);
first.unsubscribe();
second.unsubscribe();
verify(underlying, times(2)).unsubscribe();
verify(connection, times(1)).unsubscribe();
}
}