Skip to content

Commit

Permalink
1.x: fix and deprecate evicting groupBy and add new overload (#5917)
Browse files Browse the repository at this point in the history
* 1.x: fix bug and deprecate groupBy overload with evictingMapFactory, add new groupBy evicting overload (#5868)

* groupBy, do a null check on g because cancel(K) could have cleared the map
  • Loading branch information
davidmoten authored and akarnokd committed Mar 19, 2018
1 parent 6e6c514 commit e467798
Show file tree
Hide file tree
Showing 5 changed files with 928 additions and 6 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Expand Up @@ -26,7 +26,7 @@ dependencies {

testCompile 'junit:junit:4.12'
testCompile 'org.mockito:mockito-core:1.10.19'
testCompile 'com.google.guava:guava:19.0'
testCompile 'com.google.guava:guava:24.0-jre'
testCompile 'com.pushtorefresh.java-private-constructor-checker:checker:1.2.0'

perfCompile 'org.openjdk.jmh:jmh-core:1.11.3'
Expand Down
75 changes: 73 additions & 2 deletions src/main/java/rx/Observable.java
Expand Up @@ -7275,7 +7275,7 @@ public final void forEach(final Action1<? super T> onNext, final Action1<Throwab
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
*/
public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector) {
return lift(new OperatorGroupBy<T, K, R>(keySelector, elementSelector));
return lift(new OperatorGroupByEvicting<T, K, R>(keySelector, elementSelector));
}

/**
Expand Down Expand Up @@ -7334,7 +7334,12 @@ public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? su
* if {@code evictingMapFactory} is null
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
* @since 1.3
* @deprecated since 1.3.7, use {@link #groupBy(Func1, Func1, int, boolean, Func1)}
* instead which uses much less memory. Please take note of the
* usage difference involving the evicting action which now expects
* the value from the map instead of the key.
*/
@Deprecated
public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector,
final Func1<? super T, ? extends R> elementSelector, final Func1<Action1<K>, Map<K, Object>> evictingMapFactory) {
if (evictingMapFactory == null) {
Expand Down Expand Up @@ -7369,6 +7374,72 @@ public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? su
*
* @param keySelector
* a function that extracts the key for each item
* @param elementSelector
* a function that extracts the return element for each item
* @param bufferSize
* the size of the buffer ({@link RxRingBuffer.SIZE} may be suitable).
* @param delayError
* if and only if false then onError emissions can shortcut onNext emissions (emissions may be buffered)
* @param evictingMapFactory
* a function that given an eviction action returns a {@link Map} instance that will be used to assign
* items to the appropriate {@code GroupedObservable}s. The {@code Map} instance must be thread-safe
* and any eviction must trigger a call to the supplied action (synchronously or asynchronously).
* This can be used to limit the size of the map by evicting entries by map maximum size or access time for
* instance. Here's an example using Guava's {@code CacheBuilder} from v24.0:
* <pre>
* {@code
* Func1<Action1<Object>, Map<K, Object>> mapFactory
* = action -> CacheBuilder.newBuilder()
* .maximumSize(1000)
* .expireAfterAccess(12, TimeUnit.HOURS)
* .removalListener(entry -> action.call(entry.getValue()))
* .<K, Object> build().asMap();
* }
* </pre>
*
* @param <K>
* the key type
* @param <R>
* the element type
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a
* unique key value and each of which emits those items from the source Observable that share that
* key value
* @throws NullPointerException
* if {@code evictingMapFactory} is null
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
* @since 1.3.7
*/
@Experimental
public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector,
final Func1<? super T, ? extends R> elementSelector, int bufferSize, boolean delayError,
final Func1<Action1<Object>, Map<K, Object>> evictingMapFactory) {
if (evictingMapFactory == null) {
throw new NullPointerException("evictingMapFactory cannot be null");
}
return lift(new OperatorGroupByEvicting<T, K, R>(
keySelector, elementSelector, bufferSize, delayError, evictingMapFactory));
}

/**
* Groups the items emitted by an {@code Observable} according to a specified criterion, and emits these
* grouped items as {@link GroupedObservable}s. The emitted {@code GroupedObservable} allows only a single
* {@link Subscriber} during its lifetime and if this {@code Subscriber} unsubscribes before the
* source terminates, the next emission by the source having the same key will trigger a new
* {@code GroupedObservable} emission.
* <p>
* <img width="640" height="360" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/groupBy.png" alt="">
* <p>
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param keySelector
* a function that extracts the key for each item
* @param <K>
* the key type
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a
Expand All @@ -7377,7 +7448,7 @@ public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? su
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
*/
public final <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector) {
return lift(new OperatorGroupBy<T, K, T>(keySelector));
return lift(new OperatorGroupByEvicting<T, K, T>(keySelector));
}

/**
Expand Down
24 changes: 21 additions & 3 deletions src/main/java/rx/internal/operators/OperatorGroupBy.java
Expand Up @@ -42,13 +42,16 @@
* the source and group value type
* @param <V>
* the value type of the groups
* @deprecated
* since 1.3.7, use {@link OperatorGroupByEvicting} instead
*/
@Deprecated
public final class OperatorGroupBy<T, K, V> implements Operator<GroupedObservable<K, V>, T> {
final Func1<? super T, ? extends K> keySelector;
final Func1<? super T, ? extends V> valueSelector;
final int bufferSize;
final boolean delayError;
final Func1<Action1<K>, Map<K, Object>> mapFactory;
final Func1<Action1<K>, Map<K, Object>> mapFactory; //nullable

@SuppressWarnings({ "unchecked", "rawtypes" })
public OperatorGroupBy(Func1<? super T, ? extends K> keySelector) {
Expand Down Expand Up @@ -116,6 +119,10 @@ public static final class GroupBySubscriber<T, K, V>
final int bufferSize;
final boolean delayError;
final Map<Object, GroupedUnicast<K, V>> groups;

// double store the groups to workaround the bug in the
// signature of groupBy with evicting map factory
final Map<Object, GroupedUnicast<K, V>> groupsCopy;
final Queue<GroupedObservable<K, V>> queue;
final GroupByProducer producer;
final Queue<K> evictedKeys;
Expand All @@ -134,7 +141,7 @@ public static final class GroupBySubscriber<T, K, V>
volatile boolean done;

final AtomicInteger wip;

public GroupBySubscriber(Subscriber<? super GroupedObservable<K, V>> actual, Func1<? super T, ? extends K> keySelector,
Func1<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError,
Func1<Action1<K>, Map<K, Object>> mapFactory) {
Expand All @@ -158,6 +165,7 @@ public GroupBySubscriber(Subscriber<? super GroupedObservable<K, V>> actual, Fun
this.evictedKeys = new ConcurrentLinkedQueue<K>();
this.groups = createMap(mapFactory, new EvictionAction<K>(evictedKeys));
}
this.groupsCopy = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
}

static class EvictionAction<K> implements Action1<K> {
Expand Down Expand Up @@ -211,6 +219,9 @@ public void onNext(T t) {
if (!cancelled.get()) {
group = GroupedUnicast.createWith(key, bufferSize, this, delayError);
groups.put(mapKey, group);
if (evictedKeys != null) {
groupsCopy.put(mapKey, group);
}

groupCount.getAndIncrement();

Expand All @@ -234,7 +245,9 @@ public void onNext(T t) {
if (evictedKeys != null) {
K evictedKey;
while ((evictedKey = evictedKeys.poll()) != null) {
GroupedUnicast<K, V> g = groups.get(evictedKey);
GroupedUnicast<K, V> g = groupsCopy.remove(evictedKey);
// do a null check on g because cancel(K) could have cleared
// the map
if (g != null) {
g.onComplete();
}
Expand Down Expand Up @@ -270,6 +283,7 @@ public void onCompleted() {
}
groups.clear();
if (evictedKeys != null) {
groupsCopy.clear();
evictedKeys.clear();
}

Expand Down Expand Up @@ -304,6 +318,9 @@ public void cancel(K key) {
unsubscribe();
}
}
if (evictedKeys != null) {
groupsCopy.remove(mapKey);
}
}

void drain() {
Expand Down Expand Up @@ -364,6 +381,7 @@ void errorAll(Subscriber<? super GroupedObservable<K, V>> a, Queue<?> q, Throwab
List<GroupedUnicast<K, V>> list = new ArrayList<GroupedUnicast<K, V>>(groups.values());
groups.clear();
if (evictedKeys != null) {
groupsCopy.clear();
evictedKeys.clear();
}

Expand Down

0 comments on commit e467798

Please sign in to comment.