Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add index field to list updates #91

Open
wants to merge 3 commits into
base: 2.11.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 27 additions & 4 deletions src/main/java/io/reactivex/rxjavafx/sources/ListChange.java
Expand Up @@ -15,29 +15,52 @@
*/
package io.reactivex.rxjavafx.sources;

import java.util.Objects;

/**
* Holds an ADDED, REMOVED, or UPDATED flag with the associated value
* @param <T>
*/
public final class ListChange<T> {
private final T value;
private final Flag flag;
private final int index;

private ListChange(T value, Flag flag) {
private ListChange(T value, Flag flag, int index) {
this.value = value;
this.flag = flag;
this.index = index;
}
public static <T> ListChange<T> of(T value, Flag flag) {
return new ListChange<>(value, flag);
public static <T> ListChange<T> of(T value, Flag flag, int index) {
return new ListChange<>(value, flag, index);
}
public T getValue() {
return value;
}
public Flag getFlag() {
return flag;
}
public int getIndex() {
return index;
}

@Override
public String toString() {
return flag + " " + value;
return flag + " " + value + " " + index;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ListChange<?> that = (ListChange<?>) o;
return index == that.index &&
value.equals(that.value) &&
flag == that.flag;
}

@Override
public int hashCode() {
return Objects.hash(value, flag, index);
}
}
Expand Up @@ -103,14 +103,20 @@ public static <T> Observable<ListChange<T>> fromObservableListChanges(final Obse
ListChangeListener<T> listener = c -> {
while (c.next()) {
if (c.wasAdded()) {
c.getAddedSubList().forEach(v -> subscriber.onNext(ListChange.of(v,Flag.ADDED)));
for (int i = c.getFrom(); i < c.getTo(); i++) {
subscriber.onNext(ListChange.of(c.getList().get(i), Flag.ADDED, i));
}
}
if (c.wasRemoved()) {
c.getRemoved().forEach(v -> subscriber.onNext(ListChange.of(v,Flag.REMOVED)));
int removedIdx = 0;
for (int i = c.getFrom(); i < c.getTo() + 1; i++) {
subscriber.onNext(ListChange.of(c.getRemoved().get(removedIdx), Flag.REMOVED, i));
removedIdx++;
}
}
if (c.wasUpdated()) {
for (int i = c.getFrom(); i < c.getTo(); i++) {
subscriber.onNext(ListChange.of(c.getList().get(i),Flag.UPDATED));
subscriber.onNext(ListChange.of(c.getList().get(i), Flag.UPDATED, i));
}
}
}
Expand All @@ -132,12 +138,22 @@ public static <T> Observable<ListChange<T>> fromObservableListDistinctChanges(fi

while (c.next()) {
if (c.wasAdded()) {
c.getAddedSubList().stream().filter(v -> dupeCounter.add(v) == 1)
.forEach(v -> subscriber.onNext(ListChange.of(v,Flag.ADDED)));
for (int i = c.getFrom(); i < c.getTo(); i++) {
var item = c.getList().get(i);
if (dupeCounter.add(item) == 1) {
subscriber.onNext(ListChange.of(item, Flag.ADDED, i));
}
}
}
if (c.wasRemoved()) {
c.getRemoved().stream().filter(v -> dupeCounter.remove(v) == 0)
.forEach(v -> subscriber.onNext(ListChange.of(v,Flag.REMOVED)));
int removedIdx = 0;
for (int i = c.getFrom(); i < c.getTo() + 1; i++) {
var item = c.getRemoved().get(removedIdx);
if (dupeCounter.remove(item) == 0) {
subscriber.onNext(ListChange.of(item, Flag.REMOVED, i));
}
removedIdx++;
}
}
}
};
Expand All @@ -157,12 +173,22 @@ public static <T,R> Observable<ListChange<T>> fromObservableListDistinctChanges(

while (c.next()) {
if (c.wasAdded()) {
c.getAddedSubList().stream().filter(v -> dupeCounter.add(mapper.apply(v)) == 1)
.forEach(v -> subscriber.onNext(ListChange.of(v,Flag.ADDED)));
for (int i = c.getFrom(); i < c.getTo(); i++) {
var item = c.getList().get(i);
if (dupeCounter.add(mapper.apply(item)) == 1) {
subscriber.onNext(ListChange.of(item, Flag.ADDED, i));
}
}
}
if (c.wasRemoved()) {
c.getRemoved().stream().filter(v -> dupeCounter.remove(mapper.apply(v)) == 0)
.forEach(v -> subscriber.onNext(ListChange.of(v,Flag.REMOVED)));
int removedIdx = 0;
for (int i = c.getFrom(); i < c.getTo() + 1; i++) {
var item = c.getRemoved().get(removedIdx);
if (dupeCounter.remove(mapper.apply(item)) == 0) {
subscriber.onNext(ListChange.of(item, Flag.REMOVED, i));
}
removedIdx++;
}
}
}
};
Expand All @@ -181,14 +207,22 @@ public static <T,R> Observable<ListChange<R>> fromObservableListDistinctMappings

while (c.next()) {
if (c.wasAdded()) {
c.getAddedSubList().stream().map(mapper)
.filter(v -> dupeCounter.add(v) == 1)
.forEach(v -> subscriber.onNext(ListChange.of(v,Flag.ADDED)));
for (int i = c.getFrom(); i < c.getTo(); i++) {
var item = mapper.apply(c.getList().get(i));
if (dupeCounter.add(item) == 1) {
subscriber.onNext(ListChange.of(item, Flag.ADDED, i));
}
}
}
if (c.wasRemoved()) {
c.getRemoved().stream().map(mapper)
.filter(v -> dupeCounter.remove(v) == 0)
.forEach(v -> subscriber.onNext(ListChange.of(v,Flag.REMOVED)));
int removedIdx = 0;
for (int i = c.getFrom(); i < c.getTo() + 1; i++) {
var item = mapper.apply(c.getRemoved().get(removedIdx));
if (dupeCounter.remove(item) == 0) {
subscriber.onNext(ListChange.of(item, Flag.REMOVED, i));
}
removedIdx++;
}
}
}
};
Expand Down
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;

import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertTrue;

public final class JavaFxObservableTest {
Expand Down Expand Up @@ -180,38 +181,36 @@ public void testRxObservableListRemoves() {

@Test
public void testRxObservableListChanges() {
//new JFXPanel()();

ObservableList<String> sourceList = FXCollections.observableArrayList();
Observable<ListChange<String>> emissions = JavaFxObservable.changesOf(sourceList);

CountDownLatch gate = new CountDownLatch(1);

class FlagAndCount {
final Flag flag;
final long count;
FlagAndCount(Flag flag, long count) {
this.flag = flag;
this.count = count;
}

}
var expected = List.of(
ListChange.of("Alpha", Flag.ADDED, 0),
ListChange.of("Beta", Flag.ADDED, 1),
ListChange.of("Alpha", Flag.REMOVED, 0),
ListChange.of("Gamma", Flag.ADDED, 1),
ListChange.of("Gamma", Flag.REMOVED, 1),
ListChange.of("Epsilon", Flag.ADDED, 0)
);
emissions.observeOn(Schedulers.io())
.observeOn(JavaFxScheduler.platform())
.take(5)
.groupBy(ListChange::getFlag)
.flatMapSingle(grp -> grp.count().map(ct -> new FlagAndCount(grp.getKey(),ct)))
.subscribe(l -> {
if (l.flag.equals(Flag.ADDED)) { assertTrue(l.count == 3); }
if (l.flag.equals(Flag.REMOVED)) { assertTrue(l.count == 2); }
},Throwable::printStackTrace,gate::countDown);
.take(6)
.toList()
.toObservable()
.subscribe(l -> assertEquals(expected, l),
Throwable::printStackTrace,
gate::countDown);

Platform.runLater(() -> {
sourceList.add("Alpha");
sourceList.add("Beta");
sourceList.remove("Alpha");
sourceList.add("Gamma");
sourceList.remove("Gamma");
sourceList.add(0, "Epsilon");
});

try {
Expand All @@ -230,24 +229,21 @@ public void testRxObservableListDistinctChangeMappings() {

CountDownLatch gate = new CountDownLatch(1);

class FlagAndCount {
final Flag flag;
final long count;
FlagAndCount(Flag flag, long count) {
this.flag = flag;
this.count = count;
}

}
var expected = List.of(
ListChange.of(5, Flag.ADDED, 0),
ListChange.of(4, Flag.ADDED, 1),
ListChange.of(5, Flag.REMOVED, 1),
ListChange.of(7, Flag.ADDED, 1)
);
emissions.observeOn(Schedulers.io())
.observeOn(JavaFxScheduler.platform())
.take(3)
.groupBy(ListChange::getFlag)
.flatMapSingle(grp -> grp.count().map(ct -> new FlagAndCount(grp.getKey(),ct)))
.subscribe(l -> {
if (l.flag.equals(Flag.ADDED)) { assertTrue(l.count == 2); }
if (l.flag.equals(Flag.REMOVED)) { assertTrue(l.count == 1); }
},Throwable::printStackTrace,gate::countDown);
.take(4)
.toList()
.toObservable()
.subscribe(l -> assertEquals(expected, l),
Throwable::printStackTrace,
gate::countDown
);

Platform.runLater(() -> {
sourceList.add("Alpha");
Expand All @@ -257,6 +253,7 @@ class FlagAndCount {
sourceList.add("Gamma");
sourceList.remove("Gamma");
sourceList.remove("Alpha");
sourceList.add(1, "Epsilon");
});

try {
Expand All @@ -269,31 +266,29 @@ class FlagAndCount {

@Test
public void testRxObservableListDistinctChanges() {
//new JFXPanel()();

ObservableList<String> sourceList = FXCollections.observableArrayList();
Observable<ListChange<String>> emissions = JavaFxObservable.distinctChangesOf(sourceList);

CountDownLatch gate = new CountDownLatch(1);

class FlagAndCount {
final Flag flag;
final long count;
FlagAndCount(Flag flag, long count) {
this.flag = flag;
this.count = count;
}
var expected = List.of(
ListChange.of("Alpha", Flag.ADDED, 0),
ListChange.of("Beta", Flag.ADDED, 1),
ListChange.of("Gamma", Flag.ADDED, 2),
ListChange.of("Gamma", Flag.REMOVED, 2),
ListChange.of("Alpha", Flag.REMOVED, 1),
ListChange.of("Epsilon", Flag.ADDED, 1)
);

}
emissions.observeOn(Schedulers.io())
.observeOn(JavaFxScheduler.platform())
.take(5)
.groupBy(ListChange::getFlag)
.flatMapSingle(grp -> grp.count().map(ct -> new FlagAndCount(grp.getKey(),ct)))
.subscribe(l -> {
if (l.flag.equals(Flag.ADDED)) { assertTrue(l.count == 3); }
if (l.flag.equals(Flag.REMOVED)) { assertTrue(l.count == 2); }
},Throwable::printStackTrace,gate::countDown);
.take(6)
.toList()
.toObservable()
.subscribe(l -> assertEquals(expected, l),
Throwable::printStackTrace,
gate::countDown
);

Platform.runLater(() -> {
sourceList.add("Alpha");
Expand All @@ -303,6 +298,7 @@ class FlagAndCount {
sourceList.add("Gamma");
sourceList.remove("Gamma");
sourceList.remove("Alpha");
sourceList.add(1, "Epsilon");
});

try {
Expand Down