Skip to content

Commit

Permalink
Delete by query in Geode
Browse files Browse the repository at this point in the history
Allow deletes by query. Implementation will query all IDs (using OQL) then
delete elements using map API (`map.remove()`)
  • Loading branch information
asereda-gs committed Jun 18, 2019
1 parent fcdea1c commit e89d26f
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 19 deletions.
2 changes: 1 addition & 1 deletion criteria/common/src/org/immutables/criteria/Criteria.java
Expand Up @@ -37,7 +37,7 @@
*/
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.SOURCE)
@Retention(RetentionPolicy.RUNTIME)
@interface Id {}

/**
Expand Down
33 changes: 27 additions & 6 deletions criteria/geode/src/org/immutables/criteria/geode/GeodeBackend.java
Expand Up @@ -18,26 +18,29 @@

import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Single;
import org.apache.geode.cache.Region;
import org.immutables.criteria.Criteria;
import org.immutables.criteria.Criterias;
import org.immutables.criteria.Repository;
import org.immutables.criteria.adapter.Backend;
import org.immutables.criteria.adapter.Operations;
import org.immutables.criteria.expression.Expressions;
import org.reactivestreams.Publisher;

import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
* Backend for
*/
public class GeodeBackend implements Backend {

private final Region<?, ?> region;
private final Region<Object, Object> region;

public GeodeBackend(Region<?, ?> region) {
this.region = Objects.requireNonNull(region, "region is null");
this.region = Objects.requireNonNull((Region<Object, Object>) region, "region is null");
}

@Override
Expand Down Expand Up @@ -87,14 +90,32 @@ private <T> Flowable<Repository.Success> insert(Operations.Insert<T> op) {

private <T> Flowable<Repository.Success> delete(Operations.Delete op) {
if (!Geodes.hasPredicate(op.criteria())) {
// means delete all (clear region)
// means delete all (ie clear whole region)
return Completable.fromRunnable(region::clear)
.toSingleDefault(Repository.Success.SUCCESS)
.toFlowable();
}

// id only
throw new UnsupportedOperationException("Only deleteAll() currently supported. Not " + op.criteria());
final Optional<List<?>> ids = Geodes.canDeleteByKey(op.criteria());
// list of ids is present in the expression
if (ids.isPresent()) {
// delete by key: map.remove(key)
return Completable.fromRunnable(() -> region.removeAll(ids.get()))
.toSingleDefault(Repository.Success.SUCCESS)
.toFlowable();
}


final String predicate = Criterias.toExpression(op.criteria())
.accept(new GeodeQueryVisitor(path -> String.format("e.value.%s", path.toStringPath())));

final String query = String.format("select distinct e.key from %s.entries e where %s", region.getFullPath(), predicate);

return Single.fromCallable(() -> region.query(query))
.flatMapCompletable(list -> Completable.fromRunnable(() -> region.removeAll((Collection<Object>) list)))
.toSingleDefault(Repository.Success.SUCCESS)
.toFlowable();
}


}
Expand Up @@ -28,13 +28,21 @@

import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

class GeodeQueryVisitor extends AbstractExpressionVisitor<String> {

private final Function<Path, String> pathFn;

GeodeQueryVisitor() {
this(Path::toStringPath);
}

GeodeQueryVisitor(Function<Path, String> pathFn) {
super(e -> { throw new UnsupportedOperationException(); });
this.pathFn = Objects.requireNonNull(pathFn, "pathFn");
}

@Override
Expand All @@ -47,7 +55,7 @@ public String visit(Call call) {

final Path path = Visitors.toPath(args.get(0));
final Constant constant = Visitors.toConstant(args.get(1));
return String.format("%s = %s", path.toStringPath(), toString(constant.value()));
return String.format("%s = %s", pathFn.apply(path), toString(constant.value()));
}

if (op == Operators.AND || op == Operators.OR) {
Expand All @@ -65,7 +73,7 @@ public String visit(Call call) {
final String valuesAsString = StreamSupport.stream(values.spliterator(), false)
.map(GeodeQueryVisitor::toString).collect(Collectors.joining(", "));

final String query = String.format("%s in SET(%s)", field.toStringPath(), valuesAsString);
final String query = String.format("%s in SET(%s)", pathFn.apply(field), valuesAsString);

return op == Operators.NOT_IN ? "NOT " + query : query;
}
Expand All @@ -74,9 +82,7 @@ public String visit(Call call) {
}

private static String toString(Object value) {
if (value == null) {
return "null";
} else if (value instanceof CharSequence) {
if (value instanceof CharSequence) {
return "'" + value + "'";
} else {
return Objects.toString(value);
Expand Down
29 changes: 24 additions & 5 deletions criteria/geode/src/org/immutables/criteria/geode/Geodes.java
Expand Up @@ -17,6 +17,8 @@
package org.immutables.criteria.geode;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.immutables.criteria.Criteria;
import org.immutables.criteria.Criterias;
import org.immutables.criteria.DocumentCriteria;
import org.immutables.criteria.expression.Call;
Expand All @@ -25,9 +27,11 @@
import org.immutables.criteria.expression.ExpressionConverter;
import org.immutables.criteria.expression.Expressions;
import org.immutables.criteria.expression.Operators;
import org.immutables.criteria.expression.Root;
import org.immutables.criteria.expression.Path;
import org.immutables.criteria.expression.Visitors;

import java.nio.file.Path;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Member;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -103,9 +107,24 @@ static Optional<List<?>> canDeleteByKey(DocumentCriteria<?> criteria) {
return Optional.empty();
}

// field should be id
final Class<?> entityClass = ((Root) Criterias.toExpression(criteria)).entityClass();
final Path path = Visitors.toPath(predicate.arguments().get(0));

return Optional.empty();
if (!(path.paths().size() == 1 && isIdAttribute(path.paths().get(0)))) {
return Optional.empty();
}

final Object value = Visitors.toConstant(predicate.arguments().get(1)).value();
final List<?> values = value instanceof Iterable ? ImmutableList.copyOf((Iterable<?>) value) : ImmutableList.of(value);
return Optional.of(values);
}

private static boolean isIdAttribute(Member member) {
if (!(member instanceof AnnotatedElement)) {
return false;
}

final AnnotatedElement annotated = (AnnotatedElement) member;

return annotated.isAnnotationPresent(Criteria.Id.class);
}
}
Expand Up @@ -56,6 +56,7 @@ public static void setup() {
@Before
public void setUp() throws Exception {
repository = new PersonRepository(new GeodeBackend(region));
region.clear();
}

@Test
Expand Down Expand Up @@ -102,13 +103,27 @@ public void delete() {
check(Flowable.fromPublisher(repository.delete(PersonCriteria.create())).blockingFirst()).is(Repository.Success.SUCCESS);
check(region.keySet()).isEmpty();

Flowable.fromPublisher(repository.insert(generator.next()))
Flowable.fromPublisher(repository.insert(generator.next().withId("test")))
.test()
.awaitDone(1, TimeUnit.SECONDS)
.assertComplete();

check(region.keySet()).hasSize(1);
check(Flowable.fromPublisher(repository.delete(PersonCriteria.create())).blockingFirst()).is(Repository.Success.SUCCESS);
check(Flowable.fromPublisher(repository.delete(PersonCriteria.create().id.isIn("testBAD", "test")))
.blockingFirst()).is(Repository.Success.SUCCESS);
check(region.keySet()).hasSize(0);

// insert again
Flowable.fromPublisher(repository.insert(generator.next().withId("test").withNickName("nick123")))
.test()
.awaitDone(1, TimeUnit.SECONDS)
.assertComplete();
check(region.keySet()).hasSize(1);
check(Flowable.fromPublisher(repository.delete(PersonCriteria.create().nickName.value().isEqualTo("nick123")))
.blockingFirst()).is(Repository.Success.SUCCESS);

// delete by query doesn't work yet
// check(region.keySet()).hasSize(0);

}
}

0 comments on commit e89d26f

Please sign in to comment.