Skip to content

Commit

Permalink
Fix ClassCastException when projections are used with ReactorFacet.
Browse files Browse the repository at this point in the history
`ReactorFetcherDelegate` did not implement certain interfaces (eg. `ReactorFetcher.DistinctLimitOffset`).

Related: #1456
  • Loading branch information
asereda-gs committed Sep 4, 2023
1 parent c98038e commit be599d4
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 8 deletions.
6 changes: 6 additions & 0 deletions criteria/reactor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.immutables</groupId>
<artifactId>criteria-inmemory</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,43 @@
package org.immutables.criteria.reactor;

import org.immutables.criteria.backend.Backend;
import org.immutables.criteria.expression.ImmutableQuery;
import org.immutables.criteria.expression.Query;
import org.immutables.criteria.repository.reactive.ReactiveFetcher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Objects;
import java.util.function.UnaryOperator;

class ReactorFetcherDelegate<T> implements ReactorFetcher<T> {
class ReactorFetcherDelegate<T> implements ReactorFetcher<T>, ReactorFetcher.DistinctLimitOffset<T> {

private final ReactiveFetcher<T> delegate;

private ReactorFetcherDelegate(ReactiveFetcher<T> delegate) {
this.delegate = Objects.requireNonNull(delegate, "delegate");
}

private ReactorFetcherDelegate<T> changeQuery(UnaryOperator<Query> fn) {
return new ReactorFetcherDelegate<>(delegate.changeQuery(fn));
}


@Override
public LimitOffset<T> distinct() {
return changeQuery(query -> ImmutableQuery.copyOf(query).withDistinct(true));
}

@Override
public Offset<T> limit(long limit) {
return changeQuery(query -> ImmutableQuery.copyOf(query).withLimit(limit));
}

@Override
public ReactorFetcherDelegate<T> offset(long offset) {
return changeQuery(query -> ImmutableQuery.copyOf(query).withOffset(offset));
}

@Override
public Flux<T> fetch() {
return Flux.from(delegate.fetch());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.immutables.criteria.reactor;

import org.immutables.criteria.Criteria;
import org.immutables.criteria.inmemory.InMemoryBackend;
import org.immutables.criteria.repository.FakeBackend;
import org.immutables.value.Value;
import org.junit.jupiter.api.Test;
Expand All @@ -33,10 +34,22 @@ void empty() {

@Test
void single() {
ReactorModelRepository repo = new ReactorModelRepository(new FakeBackend(Flux.just(ImmutableReactorModel.builder().build())));
ReactorModelRepository repo = new ReactorModelRepository(new FakeBackend(Flux.just(ImmutableReactorModel.builder().id("id1").build())));
StepVerifier.create(repo.findAll().fetch()).thenRequest(1).expectNextCount(1).expectComplete().verify();
}

/**
* Validate the projections work with different types of facets (see {@link org.immutables.criteria.repository.Facet}).
*/
@Test
void projection() {
// need in-memory backend because of projections. FakeBackend does not support projections.
InMemoryBackend backend = new InMemoryBackend();
ReactorModelRepository repo = new ReactorModelRepository(backend);
repo.insert(ImmutableReactorModel.builder().id("id1").build()).block();
StepVerifier.create(repo.findAll().select(ReactorModelCriteria.reactorModel.id).limit(1).offset(0).fetch()).thenRequest(1).expectNext("id1").expectComplete().verify();
}

@Test
void error() {
ReactorModelRepository repo = new ReactorModelRepository(new FakeBackend(Flux.error(new RuntimeException("boom"))));
Expand All @@ -46,5 +59,8 @@ void error() {
@Value.Immutable
@Criteria
@Criteria.Repository(facets = {ReactorReadable.class, ReactorWritable.class, ReactorWatchable.class})
interface ReactorModel {}
}
interface ReactorModel {
@Criteria.Id
String id();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import io.reactivex.Maybe;
import io.reactivex.Single;
import org.immutables.criteria.backend.Backend;
import org.immutables.criteria.expression.ImmutableQuery;
import org.immutables.criteria.expression.Query;
import org.immutables.criteria.repository.reactive.ReactiveFetcher;

import java.util.Objects;
import java.util.function.UnaryOperator;

class RxJavaFetcherDelegate<T> implements RxJavaFetcher<T> {
class RxJavaFetcherDelegate<T> implements RxJavaFetcher<T>, RxJavaFetcher.DistinctLimitOffset<T> {

private final ReactiveFetcher<T> delegate;

Expand Down Expand Up @@ -65,4 +67,24 @@ static <T> RxJavaFetcherDelegate<T> fromReactive(ReactiveFetcher<T> delegate) {
static <T> RxJavaFetcherDelegate<T> of(Query query, Backend.Session session) {
return fromReactive(ReactiveFetcher.of(query, session));
}

private RxJavaFetcherDelegate<T> changeQuery(UnaryOperator<Query> fn) {
return new RxJavaFetcherDelegate<>(delegate.changeQuery(fn));
}


@Override
public LimitOffset<T> distinct() {
return changeQuery(query -> ImmutableQuery.copyOf(query).withDistinct(true));
}

@Override
public Offset<T> limit(long limit) {
return changeQuery(query -> ImmutableQuery.copyOf(query).withLimit(limit));
}

@Override
public RxJavaFetcher<T> offset(long offset) {
return changeQuery(query -> ImmutableQuery.copyOf(query).withOffset(offset));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected RxJavaReader<T> newReader(Query query) {
return new RxJavaReader<>(query, session);
}

public <T1> RxJavaMapper1<T1> select(Projection<T1> proj1) {
public <T1> RxJavaMapper1.DistinctLimitOffset<T1> select(Projection<T1> proj1) {
Query newQuery = this.query.addProjections(Matchers.toExpression(proj1));
return new RxJavaMappers.Mapper1<T1>(newQuery, session);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,27 @@ void empty() {

@Test
void single() {
RxJavaModelRepository repo = new RxJavaModelRepository(new FakeBackend(Flowable.just(ImmutableRxJavaModel.builder().build())));
RxJavaModelRepository repo = new RxJavaModelRepository(new FakeBackend(Flowable.just(ImmutableRxJavaModel.builder().id("id1").build())));
repo.findAll().fetch().test().awaitDone(1, TimeUnit.SECONDS).assertValueCount(1);
}

/**
* Validate the projections work with different types of facets (see {@link org.immutables.criteria.repository.Facet}).
*/
@Test
void projection() {
// TODO: can't use InMemoryBackend because of circular dependency.
RxJavaModelRepository repo = new RxJavaModelRepository(new FakeBackend(Flowable.just(ImmutableRxJavaModel.builder().id("id1").build())));
repo.findAll()
//.select(RxJavaModelCriteria.rxJavaModel.id) TODO: FakeBackend does not support projections
.limit(1)
.offset(0)
.fetch()
.test()
.awaitDone(1, TimeUnit.SECONDS)
.assertValueCount(1);
}

@Test
void error() {
RxJavaModelRepository repo = new RxJavaModelRepository(new FakeBackend(Flowable.error(new RuntimeException("boom"))));
Expand All @@ -47,5 +64,8 @@ void error() {
@Value.Immutable
@Criteria
@Criteria.Repository(facets = {RxJavaReadable.class, RxJavaWritable.class, RxJavaWatchable.class})
interface RxJavaModel {}
interface RxJavaModel {
@Criteria.Id
String id();
}
}

0 comments on commit be599d4

Please sign in to comment.