Skip to content

Commit

Permalink
Removed static variant of refCount
Browse files Browse the repository at this point in the history
  • Loading branch information
johnhmarks committed Sep 25, 2013
1 parent d4b04d8 commit 490ef86
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,6 @@ protected ConnectableObservable(OnSubscribeFunc<T> onSubscribe) {
* @return a {@link Observable}
*/
public Observable<T> refCount() {
return refCount(this);
}

/**
* Returns an observable sequence that stays connected to the source as long
* as there is at least one subscription to the observable sequence.
* @return a {@link Observable}
* @param that a {@link ConnectableObservable}
*/
public static <T> Observable<T> refCount(ConnectableObservable<T> that) {
return Observable.create(OperationRefCount.refCount(that));
return Observable.create(OperationRefCount.refCount(this));
}
}
84 changes: 25 additions & 59 deletions rxjava-core/src/test/java/rx/RefCountTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockitoAnnotations;
import rx.observables.ConnectableObservable;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;

import static org.mockito.Mockito.*;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;

public class RefCountTests {

Expand All @@ -16,67 +19,30 @@ public void setUp() {
}

@Test
public void subscriptionToUnderlyingOnFirstSubscription() {
@SuppressWarnings("unchecked")
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
Observable<Integer> refCounted = ConnectableObservable.refCount(connectable);
@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
when(connectable.subscribe(any(Observer.class))).thenReturn(Subscriptions.empty());
when(connectable.connect()).thenReturn(Subscriptions.empty());
refCounted.subscribe(observer);
verify(connectable, times(1)).subscribe(any(Observer.class));
verify(connectable, times(1)).connect();
}

@Test
public void noSubscriptionToUnderlyingOnSecondSubscription() {
@SuppressWarnings("unchecked")
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
Observable<Integer> refCounted = ConnectableObservable.refCount(connectable);
@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
when(connectable.subscribe(any(Observer.class))).thenReturn(Subscriptions.empty());
when(connectable.connect()).thenReturn(Subscriptions.empty());
refCounted.subscribe(observer);
refCounted.subscribe(observer);
verify(connectable, times(2)).subscribe(any(Observer.class));
verify(connectable, times(1)).connect();
}

@Test
public void unsubscriptionFromUnderlyingOnLastUnsubscription() {
@SuppressWarnings("unchecked")
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
Observable<Integer> refCounted = ConnectableObservable.refCount(connectable);
@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
Subscription underlying = mock(Subscription.class);
when(connectable.subscribe(any(Observer.class))).thenReturn(underlying);
Subscription connection = mock(Subscription.class);
when(connectable.connect()).thenReturn(connection);
Subscription first = refCounted.subscribe(observer);
first.unsubscribe();
verify(underlying, times(1)).unsubscribe();
verify(connection, times(1)).unsubscribe();
}

@Test
public void noUnsubscriptionFromUnderlyingOnFirstUnsubscription() {
@SuppressWarnings("unchecked")
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
Observable<Integer> refCounted = ConnectableObservable.refCount(connectable);
@SuppressWarnings("unchecked")
public void onlyFirstShouldSubscribeAndLastUnsubscribe() {
final AtomicInteger subscriptionCount = new AtomicInteger();
final AtomicInteger unsubscriptionCount = new AtomicInteger();
Observable<Integer> observable = Observable.create(new Observable.OnSubscribeFunc<Integer>() {
@Override
public Subscription onSubscribe(Observer<? super Integer> observer) {
subscriptionCount.incrementAndGet();
return Subscriptions.create(new Action0() {
@Override
public void call() {
unsubscriptionCount.incrementAndGet();
}
});
}
});
Observable<Integer> refCounted = observable.publish().refCount();
Observer<Integer> observer = mock(Observer.class);
Subscription underlying = mock(Subscription.class);
when(connectable.subscribe(any(Observer.class))).thenReturn(underlying);
Subscription connection = mock(Subscription.class);
when(connectable.connect()).thenReturn(connection);
Subscription first = refCounted.subscribe(observer);
assertEquals(1, subscriptionCount.get());
Subscription second = refCounted.subscribe(observer);
assertEquals(1, subscriptionCount.get());
first.unsubscribe();
assertEquals(0, unsubscriptionCount.get());
second.unsubscribe();
verify(underlying, times(2)).unsubscribe();
verify(connection, times(1)).unsubscribe();
assertEquals(1, unsubscriptionCount.get());
}
}

0 comments on commit 490ef86

Please sign in to comment.