Skip to content

Commit

Permalink
Merge pull request #478 from zsxwing/min-max
Browse files Browse the repository at this point in the history
Implemented the "Operator: Min and MinBy" and "Operator: Max and MaxBy"
  • Loading branch information
benjchristensen committed Nov 12, 2013
2 parents 0b160cb + db04d56 commit 7475a15
Show file tree
Hide file tree
Showing 3 changed files with 628 additions and 0 deletions.
122 changes: 122 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -54,6 +55,7 @@
import rx.operators.OperationMaterialize;
import rx.operators.OperationMerge;
import rx.operators.OperationMergeDelayError;
import rx.operators.OperationMinMax;
import rx.operators.OperationMulticast;
import rx.operators.OperationObserveOn;
import rx.operators.OperationOnErrorResumeNextViaFunction;
Expand Down Expand Up @@ -3631,6 +3633,126 @@ public static Observable<Double> averageDoubles(Observable<Double> source) {
return OperationAverage.averageDoubles(source);
}

/**
* Returns the minimum element in an observable sequence.
* If there are more than one minimum elements, returns the last one.
* For an empty source, it causes an {@link IllegalArgumentException}.
*
* @param source
* an observable sequence to determine the minimum element of.
* @return an observable emitting the minimum element.
* @throws IllegalArgumentException
* if the source is empty
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229715(v=vs.103).aspx">MSDN: Observable.Min</a>
*/
public static <T extends Comparable<T>> Observable<T> min(Observable<T> source) {
return OperationMinMax.min(source);
}

/**
* Returns the minimum element in an observable sequence according to the specified comparator.
* If there are more than one minimum elements, returns the last one.
* For an empty source, it causes an {@link IllegalArgumentException}.
*
* @param comparator
* the comparer used to compare elements.
* @return an observable emitting the minimum value according to the specified comparator.
* @throws IllegalArgumentException
* if the source is empty
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229095(v=vs.103).aspx">MSDN: Observable.Min</a>
*/
public Observable<T> min(Comparator<T> comparator) {
return OperationMinMax.min(this, comparator);
}

/**
* Returns the elements in an observable sequence with the minimum key value.
* For an empty source, it returns an observable emitting an empty List.
*
* @param selector
* the key selector function.
* @return an observable emitting a List of the elements with the minimum key value.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh228970(v=vs.103).aspx">MSDN: Observable.MinBy</a>
*/
public <R extends Comparable<R>> Observable<List<T>> minBy(Func1<T, R> selector) {
return OperationMinMax.minBy(this, selector);
}

/**
* Returns the elements in an observable sequence with the minimum key value according to the specified comparator.
* For an empty source, it returns an observable emitting an empty List.
*
* @param selector
* the key selector function.
* @param comparator
* the comparator used to compare key values.
* @return an observable emitting a List of the elements with the minimum key value according to the specified comparator.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh228970(v=vs.103).aspx">MSDN: Observable.MinBy</a>
*/
public <R> Observable<List<T>> minBy(Func1<T, R> selector, Comparator<R> comparator) {
return OperationMinMax.minBy(this, selector, comparator);
}

/**
* Returns the maximum element in an observable sequence.
* If there are more than one maximum elements, returns the last one.
* For an empty source, it causes an {@link IllegalArgumentException}.
*
* @param source
* an observable sequence to determine the maximum element of.
* @return an observable emitting the maximum element.
* @throws IllegalArgumentException
* if the source is empty.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211837(v=vs.103).aspx">MSDN: Observable.Max</a>
*/
public static <T extends Comparable<T>> Observable<T> max(Observable<T> source) {
return OperationMinMax.max(source);
}

/**
* Returns the maximum element in an observable sequence according to the specified comparator.
* If there are more than one maximum elements, returns the last one.
* For an empty source, it causes an {@link IllegalArgumentException}.
*
* @param comparator
* the comparer used to compare elements.
* @return an observable emitting the maximum value according to the specified comparator.
* @throws IllegalArgumentException
* if the source is empty.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211635(v=vs.103).aspx">MSDN: Observable.Max</a>
*/
public Observable<T> max(Comparator<T> comparator) {
return OperationMinMax.max(this, comparator);
}

/**
* Returns the elements in an observable sequence with the maximum key value.
* For an empty source, it returns an observable emitting an empty List.
*
* @param selector
* the key selector function.
* @return an observable emitting a List of the elements with the maximum key value.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229058(v=vs.103).aspx">MSDN: Observable.MaxBy</a>
*/
public <R extends Comparable<R>> Observable<List<T>> maxBy(Func1<T, R> selector) {
return OperationMinMax.maxBy(this, selector);
}

/**
* Returns the elements in an observable sequence with the maximum key value according to the specified comparator.
* For an empty source, it returns an observable emitting an empty List.
*
* @param selector
* the key selector function.
* @param comparator
* the comparator used to compare key values.
* @return an observable emitting a List of the elements with the maximum key value according to the specified comparator.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244330(v=vs.103).aspx">MSDN: Observable.MaxBy</a>
*/
public <R> Observable<List<T>> maxBy(Func1<T, R> selector, Comparator<R> comparator) {
return OperationMinMax.maxBy(this, selector, comparator);
}

/**
* Returns a {@link ConnectableObservable} that shares a single subscription to the underlying
* Observable that will replay all of its items and notifications to any future {@link Observer}.
Expand Down
147 changes: 147 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationMinMax.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;

import rx.Observable;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/**
* Returns the minimum element in an observable sequence.
*/
public class OperationMinMax {

public static <T extends Comparable<T>> Observable<T> min(
Observable<T> source) {
return minMax(source, -1L);
}

public static <T> Observable<T> min(Observable<T> source,
final Comparator<T> comparator) {
return minMax(source, comparator, -1L);
}

public static <T, R extends Comparable<R>> Observable<List<T>> minBy(
Observable<T> source, final Func1<T, R> selector) {
return minMaxBy(source, selector, -1L);
}

public static <T, R> Observable<List<T>> minBy(Observable<T> source,
final Func1<T, R> selector, final Comparator<R> comparator) {
return minMaxBy(source, selector, comparator, -1L);
}

public static <T extends Comparable<T>> Observable<T> max(
Observable<T> source) {
return minMax(source, 1L);
}

public static <T> Observable<T> max(Observable<T> source,
final Comparator<T> comparator) {
return minMax(source, comparator, 1L);
}

public static <T, R extends Comparable<R>> Observable<List<T>> maxBy(
Observable<T> source, final Func1<T, R> selector) {
return minMaxBy(source, selector, 1L);
}

public static <T, R> Observable<List<T>> maxBy(Observable<T> source,
final Func1<T, R> selector, final Comparator<R> comparator) {
return minMaxBy(source, selector, comparator, 1L);
}

private static <T extends Comparable<T>> Observable<T> minMax(
Observable<T> source, final long flag) {
return source.reduce(new Func2<T, T, T>() {
@Override
public T call(T acc, T value) {
if (flag * acc.compareTo(value) > 0) {
return acc;
}
return value;
}
});
}

private static <T> Observable<T> minMax(Observable<T> source,
final Comparator<T> comparator, final long flag) {
return source.reduce(new Func2<T, T, T>() {
@Override
public T call(T acc, T value) {
if (flag * comparator.compare(acc, value) > 0) {
return acc;
}
return value;
}
});
}

private static <T, R extends Comparable<R>> Observable<List<T>> minMaxBy(
Observable<T> source, final Func1<T, R> selector, final long flag) {
return source.reduce(new ArrayList<T>(),
new Func2<List<T>, T, List<T>>() {

@Override
public List<T> call(List<T> acc, T value) {
if (acc.isEmpty()) {
acc.add(value);
} else {
int compareResult = selector.call(acc.get(0))
.compareTo(selector.call(value));
if (compareResult == 0) {
acc.add(value);
} else if (flag * compareResult < 0) {
acc.clear();
acc.add(value);
}
}
return acc;
}
});
}

private static <T, R> Observable<List<T>> minMaxBy(Observable<T> source,
final Func1<T, R> selector, final Comparator<R> comparator,
final long flag) {
return source.reduce(new ArrayList<T>(),
new Func2<List<T>, T, List<T>>() {

@Override
public List<T> call(List<T> acc, T value) {
if (acc.isEmpty()) {
acc.add(value);
} else {
int compareResult = comparator.compare(
selector.call(acc.get(0)),
selector.call(value));
if (compareResult == 0) {
acc.add(value);
} else if (flag * compareResult < 0) {
acc.clear();
acc.add(value);
}
}
return acc;
}
});
}

}

0 comments on commit 7475a15

Please sign in to comment.