diff --git a/criteria/reactor/pom.xml b/criteria/reactor/pom.xml
index a0690c6f5..851ded654 100644
--- a/criteria/reactor/pom.xml
+++ b/criteria/reactor/pom.xml
@@ -73,6 +73,12 @@
test-jar
test
+
+ org.immutables
+ criteria-inmemory
+ ${project.version}
+ test
+
io.projectreactor
reactor-test
diff --git a/criteria/reactor/src/org/immutables/criteria/reactor/ReactorFetcherDelegate.java b/criteria/reactor/src/org/immutables/criteria/reactor/ReactorFetcherDelegate.java
index 80bc1fd96..db93f5998 100644
--- a/criteria/reactor/src/org/immutables/criteria/reactor/ReactorFetcherDelegate.java
+++ b/criteria/reactor/src/org/immutables/criteria/reactor/ReactorFetcherDelegate.java
@@ -17,14 +17,16 @@
package org.immutables.criteria.reactor;
import org.immutables.criteria.backend.Backend;
+import org.immutables.criteria.expression.ImmutableQuery;
import org.immutables.criteria.expression.Query;
import org.immutables.criteria.repository.reactive.ReactiveFetcher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Objects;
+import java.util.function.UnaryOperator;
-class ReactorFetcherDelegate implements ReactorFetcher {
+class ReactorFetcherDelegate implements ReactorFetcher, ReactorFetcher.DistinctLimitOffset {
private final ReactiveFetcher delegate;
@@ -32,6 +34,26 @@ private ReactorFetcherDelegate(ReactiveFetcher delegate) {
this.delegate = Objects.requireNonNull(delegate, "delegate");
}
+ private ReactorFetcherDelegate changeQuery(UnaryOperator fn) {
+ return new ReactorFetcherDelegate<>(delegate.changeQuery(fn));
+ }
+
+
+ @Override
+ public LimitOffset distinct() {
+ return changeQuery(query -> ImmutableQuery.copyOf(query).withDistinct(true));
+ }
+
+ @Override
+ public Offset limit(long limit) {
+ return changeQuery(query -> ImmutableQuery.copyOf(query).withLimit(limit));
+ }
+
+ @Override
+ public ReactorFetcherDelegate offset(long offset) {
+ return changeQuery(query -> ImmutableQuery.copyOf(query).withOffset(offset));
+ }
+
@Override
public Flux fetch() {
return Flux.from(delegate.fetch());
diff --git a/criteria/reactor/test/org/immutables/criteria/reactor/ReactorTest.java b/criteria/reactor/test/org/immutables/criteria/reactor/ReactorTest.java
index ec69c772d..314648b48 100644
--- a/criteria/reactor/test/org/immutables/criteria/reactor/ReactorTest.java
+++ b/criteria/reactor/test/org/immutables/criteria/reactor/ReactorTest.java
@@ -17,6 +17,7 @@
package org.immutables.criteria.reactor;
import org.immutables.criteria.Criteria;
+import org.immutables.criteria.inmemory.InMemoryBackend;
import org.immutables.criteria.repository.FakeBackend;
import org.immutables.value.Value;
import org.junit.jupiter.api.Test;
@@ -33,10 +34,22 @@ void empty() {
@Test
void single() {
- ReactorModelRepository repo = new ReactorModelRepository(new FakeBackend(Flux.just(ImmutableReactorModel.builder().build())));
+ ReactorModelRepository repo = new ReactorModelRepository(new FakeBackend(Flux.just(ImmutableReactorModel.builder().id("id1").build())));
StepVerifier.create(repo.findAll().fetch()).thenRequest(1).expectNextCount(1).expectComplete().verify();
}
+ /**
+ * Validate the projections work with different types of facets (see {@link org.immutables.criteria.repository.Facet}).
+ */
+ @Test
+ void projection() {
+ // need in-memory backend because of projections. FakeBackend does not support projections.
+ InMemoryBackend backend = new InMemoryBackend();
+ ReactorModelRepository repo = new ReactorModelRepository(backend);
+ repo.insert(ImmutableReactorModel.builder().id("id1").build()).block();
+ StepVerifier.create(repo.findAll().select(ReactorModelCriteria.reactorModel.id).limit(1).offset(0).fetch()).thenRequest(1).expectNext("id1").expectComplete().verify();
+ }
+
@Test
void error() {
ReactorModelRepository repo = new ReactorModelRepository(new FakeBackend(Flux.error(new RuntimeException("boom"))));
@@ -46,5 +59,8 @@ void error() {
@Value.Immutable
@Criteria
@Criteria.Repository(facets = {ReactorReadable.class, ReactorWritable.class, ReactorWatchable.class})
- interface ReactorModel {}
-}
\ No newline at end of file
+ interface ReactorModel {
+ @Criteria.Id
+ String id();
+ }
+}
diff --git a/criteria/rxjava2/src/org/immutables/criteria/repository/rxjava2/RxJavaFetcherDelegate.java b/criteria/rxjava2/src/org/immutables/criteria/repository/rxjava2/RxJavaFetcherDelegate.java
index d65f0b2b1..8ce40932b 100644
--- a/criteria/rxjava2/src/org/immutables/criteria/repository/rxjava2/RxJavaFetcherDelegate.java
+++ b/criteria/rxjava2/src/org/immutables/criteria/repository/rxjava2/RxJavaFetcherDelegate.java
@@ -20,12 +20,14 @@
import io.reactivex.Maybe;
import io.reactivex.Single;
import org.immutables.criteria.backend.Backend;
+import org.immutables.criteria.expression.ImmutableQuery;
import org.immutables.criteria.expression.Query;
import org.immutables.criteria.repository.reactive.ReactiveFetcher;
import java.util.Objects;
+import java.util.function.UnaryOperator;
-class RxJavaFetcherDelegate implements RxJavaFetcher {
+class RxJavaFetcherDelegate implements RxJavaFetcher, RxJavaFetcher.DistinctLimitOffset {
private final ReactiveFetcher delegate;
@@ -65,4 +67,24 @@ static RxJavaFetcherDelegate fromReactive(ReactiveFetcher delegate) {
static RxJavaFetcherDelegate of(Query query, Backend.Session session) {
return fromReactive(ReactiveFetcher.of(query, session));
}
+
+ private RxJavaFetcherDelegate changeQuery(UnaryOperator fn) {
+ return new RxJavaFetcherDelegate<>(delegate.changeQuery(fn));
+ }
+
+
+ @Override
+ public LimitOffset distinct() {
+ return changeQuery(query -> ImmutableQuery.copyOf(query).withDistinct(true));
+ }
+
+ @Override
+ public Offset limit(long limit) {
+ return changeQuery(query -> ImmutableQuery.copyOf(query).withLimit(limit));
+ }
+
+ @Override
+ public RxJavaFetcher offset(long offset) {
+ return changeQuery(query -> ImmutableQuery.copyOf(query).withOffset(offset));
+ }
}
diff --git a/criteria/rxjava2/src/org/immutables/criteria/repository/rxjava2/RxJavaReader.java b/criteria/rxjava2/src/org/immutables/criteria/repository/rxjava2/RxJavaReader.java
index 3819f90f1..65ab17513 100644
--- a/criteria/rxjava2/src/org/immutables/criteria/repository/rxjava2/RxJavaReader.java
+++ b/criteria/rxjava2/src/org/immutables/criteria/repository/rxjava2/RxJavaReader.java
@@ -55,7 +55,7 @@ protected RxJavaReader newReader(Query query) {
return new RxJavaReader<>(query, session);
}
- public RxJavaMapper1 select(Projection proj1) {
+ public RxJavaMapper1.DistinctLimitOffset select(Projection proj1) {
Query newQuery = this.query.addProjections(Matchers.toExpression(proj1));
return new RxJavaMappers.Mapper1(newQuery, session);
}
diff --git a/criteria/rxjava2/test/org/immutables/criteria/repository/rxjava2/RxJavaTest.java b/criteria/rxjava2/test/org/immutables/criteria/repository/rxjava2/RxJavaTest.java
index a13964d50..b24361794 100644
--- a/criteria/rxjava2/test/org/immutables/criteria/repository/rxjava2/RxJavaTest.java
+++ b/criteria/rxjava2/test/org/immutables/criteria/repository/rxjava2/RxJavaTest.java
@@ -34,10 +34,27 @@ void empty() {
@Test
void single() {
- RxJavaModelRepository repo = new RxJavaModelRepository(new FakeBackend(Flowable.just(ImmutableRxJavaModel.builder().build())));
+ RxJavaModelRepository repo = new RxJavaModelRepository(new FakeBackend(Flowable.just(ImmutableRxJavaModel.builder().id("id1").build())));
repo.findAll().fetch().test().awaitDone(1, TimeUnit.SECONDS).assertValueCount(1);
}
+ /**
+ * Validate the projections work with different types of facets (see {@link org.immutables.criteria.repository.Facet}).
+ */
+ @Test
+ void projection() {
+ // TODO: can't use InMemoryBackend because of circular dependency.
+ RxJavaModelRepository repo = new RxJavaModelRepository(new FakeBackend(Flowable.just(ImmutableRxJavaModel.builder().id("id1").build())));
+ repo.findAll()
+ //.select(RxJavaModelCriteria.rxJavaModel.id) TODO: FakeBackend does not support projections
+ .limit(1)
+ .offset(0)
+ .fetch()
+ .test()
+ .awaitDone(1, TimeUnit.SECONDS)
+ .assertValueCount(1);
+ }
+
@Test
void error() {
RxJavaModelRepository repo = new RxJavaModelRepository(new FakeBackend(Flowable.error(new RuntimeException("boom"))));
@@ -47,5 +64,8 @@ void error() {
@Value.Immutable
@Criteria
@Criteria.Repository(facets = {RxJavaReadable.class, RxJavaWritable.class, RxJavaWatchable.class})
- interface RxJavaModel {}
+ interface RxJavaModel {
+ @Criteria.Id
+ String id();
+ }
}