Skip to content

Commit

Permalink
Merge pull request #936 from Applied-Duality/SkipFixed
Browse files Browse the repository at this point in the history
Skip fixed
  • Loading branch information
benjchristensen committed Mar 4, 2014
2 parents cbbf514 + 6b25b23 commit 1540f6c
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 188 deletions.
84 changes: 2 additions & 82 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,87 +49,7 @@
import rx.observables.ConnectableObservable;
import rx.observables.GroupedObservable;
import rx.observers.SafeSubscriber;
import rx.operators.OnSubscribeFromIterable;
import rx.operators.OnSubscribeRange;
import rx.operators.OperationAll;
import rx.operators.OperationAmb;
import rx.operators.OperationAny;
import rx.operators.OperationAsObservable;
import rx.operators.OperationAverage;
import rx.operators.OperationBuffer;
import rx.operators.OperationCache;
import rx.operators.OperationCombineLatest;
import rx.operators.OperationConcat;
import rx.operators.OperationDebounce;
import rx.operators.OperationDefaultIfEmpty;
import rx.operators.OperationDefer;
import rx.operators.OperationDelay;
import rx.operators.OperationDematerialize;
import rx.operators.OperationDistinct;
import rx.operators.OperationDistinctUntilChanged;
import rx.operators.OperationElementAt;
import rx.operators.OperationFinally;
import rx.operators.OperationFlatMap;
import rx.operators.OperationGroupByUntil;
import rx.operators.OperationGroupJoin;
import rx.operators.OperationInterval;
import rx.operators.OperationJoin;
import rx.operators.OperationJoinPatterns;
import rx.operators.OperationMaterialize;
import rx.operators.OperationMergeDelayError;
import rx.operators.OperationMergeMaxConcurrent;
import rx.operators.OperationMinMax;
import rx.operators.OperationMulticast;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationOnExceptionResumeNextViaObservable;
import rx.operators.OperationParallelMerge;
import rx.operators.OperationReplay;
import rx.operators.OperationRetry;
import rx.operators.OperationSample;
import rx.operators.OperationSequenceEqual;
import rx.operators.OperationSingle;
import rx.operators.OperationSkip;
import rx.operators.OperationSkipLast;
import rx.operators.OperationSkipUntil;
import rx.operators.OperationSkipWhile;
import rx.operators.OperationSum;
import rx.operators.OperationSwitch;
import rx.operators.OperationSynchronize;
import rx.operators.OperationTakeLast;
import rx.operators.OperationTakeTimed;
import rx.operators.OperationTakeUntil;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationThrottleFirst;
import rx.operators.OperationTimeInterval;
import rx.operators.OperationTimer;
import rx.operators.OperationToMap;
import rx.operators.OperationToMultimap;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationUsing;
import rx.operators.OperationWindow;
import rx.operators.OperatorCast;
import rx.operators.OperatorDoOnEach;
import rx.operators.OperatorFilter;
import rx.operators.OperatorGroupBy;
import rx.operators.OperatorMap;
import rx.operators.OperatorMerge;
import rx.operators.OperatorObserveOn;
import rx.operators.OperatorOnErrorResumeNextViaFunction;
import rx.operators.OperatorOnErrorFlatMap;
import rx.operators.OperatorParallel;
import rx.operators.OperatorRepeat;
import rx.operators.OperatorScan;
import rx.operators.OperatorSubscribeOn;
import rx.operators.OperatorTake;
import rx.operators.OperatorTimeout;
import rx.operators.OperatorTimeoutWithSelector;
import rx.operators.OperatorTimestamp;
import rx.operators.OperatorToObservableList;
import rx.operators.OperatorToObservableSortedList;
import rx.operators.OperatorUnsubscribeOn;
import rx.operators.OperatorZip;
import rx.operators.OperatorZipIterable;
import rx.operators.*;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -6274,7 +6194,7 @@ public final Observable<T> singleOrDefault(T defaultValue, Func1<? super T, Bool
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-skip">RxJava Wiki: skip()</a>
*/
public final Observable<T> skip(int num) {
return create(OperationSkip.skip(this, num));
return lift(new OperatorSkip<T>(num));
}

/**
Expand Down
75 changes: 0 additions & 75 deletions rxjava-core/src/main/java/rx/operators/OperationSkip.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,81 +39,6 @@
*/
public final class OperationSkip {

/**
* Skips a specified number of contiguous values from the start of a Observable sequence and then returns the remaining values.
*
* @param items
* @param num
* @return the observable sequence starting after a number of skipped values
*
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229847(v=vs.103).aspx">Observable.Skip(TSource) Method</a>
*/
public static <T> OnSubscribeFunc<T> skip(final Observable<? extends T> items, final int num) {
// wrap in a Observable so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take<T> rather than 1 handing both, which is not thread-safe.
return new OnSubscribeFunc<T>() {

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
return new Skip<T>(items, num).onSubscribe(observer);
}

};
}

/**
* This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads.
* <p>
* It IS thread-safe from within it while receiving onNext events from multiple threads.
*
* @param <T>
*/
private static class Skip<T> implements OnSubscribeFunc<T> {
private final int num;
private final Observable<? extends T> items;

Skip(final Observable<? extends T> items, final int num) {
this.num = num;
this.items = items;
}

public Subscription onSubscribe(Observer<? super T> observer) {
return items.subscribe(new ItemObserver(observer));
}

/**
* Used to subscribe to the 'items' Observable sequence and forward to the actualObserver up to 'num' count.
*/
private class ItemObserver implements Observer<T> {

private AtomicInteger counter = new AtomicInteger();
private final Observer<? super T> observer;

public ItemObserver(Observer<? super T> observer) {
this.observer = observer;
}

@Override
public void onCompleted() {
observer.onCompleted();
}

@Override
public void onError(Throwable e) {
observer.onError(e);
}

@Override
public void onNext(T args) {
// skip them until we reach the 'num' value
if (counter.incrementAndGet() > num) {
observer.onNext(args);
}
}

}

}

/**
* Skip the items after subscription for the given duration.
*
Expand Down
1 change: 1 addition & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,4 @@ public void onNext(T t) {
}

}

51 changes: 51 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorSkip.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package rx.operators;

import rx.Observable;
import rx.Subscriber;

/**
* Returns an Observable that skips the first <code>num</code> items emitted by the source
* Observable.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/skip.png">
* <p>
* You can ignore the first <code>num</code> items emitted by an Observable and attend only to
* those items that come after, by modifying the Observable with the skip operation.
*/
public final class OperatorSkip<T> implements Observable.Operator<T, T> {

final int n;

public OperatorSkip(int n) {
this.n = n;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
return new Subscriber<T>(child) {

int skipped = 0;

@Override
public void onCompleted() {
child.onCompleted();
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(T t) {
if(skipped >= n) {
child.onNext(t);
} else {
skipped += 1;
}
}

};
}

}
31 changes: 0 additions & 31 deletions rxjava-core/src/test/java/rx/operators/OperationSkipTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static rx.operators.OperationSkip.*;

import java.util.concurrent.TimeUnit;

Expand All @@ -31,36 +30,6 @@

public class OperationSkipTest {

@Test
public void testSkip1() {
Observable<String> w = Observable.from("one", "two", "three");
Observable<String> skip = Observable.create(skip(w, 2));

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
skip.subscribe(observer);
verify(observer, never()).onNext("one");
verify(observer, never()).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
}

@Test
public void testSkip2() {
Observable<String> w = Observable.from("one", "two", "three");
Observable<String> skip = Observable.create(skip(w, 1));

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
skip.subscribe(observer);
verify(observer, never()).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
}

@Test
public void testSkipTimed() {
TestScheduler scheduler = new TestScheduler();
Expand Down
Loading

0 comments on commit 1540f6c

Please sign in to comment.