Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -392,23 +392,15 @@ protected List<CreateIndexDDL.Indexer> supportedIndexers()


String annotation;
if (rs.nextBoolean())
{
builder.between(ckSymbol, state.value(rs, low, ckSymbol.type()), state.value(rs, high, ckSymbol.type()));
annotation = "clustering BETWEEN";
}
else if (rs.nextBoolean())
{
builder.where(ckSymbol, Inequality.GREATER_THAN_EQ, low);
builder.where(ckSymbol, Inequality.LESS_THAN_EQ, high);
annotation = "clustering >= AND <=";
}
else
{
builder.where(ckSymbol, Inequality.GREATER_THAN, low);
builder.where(ckSymbol, Inequality.LESS_THAN, high);
annotation = "clustering > AND <";
}
ASTGenerators.RangeType rangeType = rs.pick(ASTGenerators.RangeType.BOUND, ASTGenerators.RangeType.BETWEEN);

ASTGenerators.applyRangeCondition(builder, rangeType, ckSymbol,
state.greaterThanGen.next(rs),
state.lessThanGen.next(rs),
state.value(rs, low, ckSymbol.type()),
state.value(rs, high, ckSymbol.type()));

annotation = "clustering " + rangeType.name();

if (rs.nextBoolean())
builder.orderByColumn(ckSymbol, rs.pick(Select.OrderBy.Ordering.values()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@
import org.apache.cassandra.cql3.ast.Visitor;
import org.apache.cassandra.db.BufferClustering;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ClusteringBound;
import org.apache.cassandra.db.Slice;
import org.apache.cassandra.db.Slices;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BooleanType;
import org.apache.cassandra.db.marshal.CollectionType;
Expand Down Expand Up @@ -621,14 +624,13 @@ nowTs, filter(set, factory.regularColumns),
}

private enum DeleteKind
{PARTITION, ROW, COLUMN}
{PARTITION, ROW, COLUMN, RANGE}
private void update(Mutation.Delete delete)
{
long nowTs = delete.timestampOrDefault(numMutations);
//TODO (coverage): range deletes
var split = splitOnPartition(delete.where.simplify());
List<Clustering<ByteBuffer>> pks = split.left;
List<Clustering<ByteBuffer>> clusterings = split.right.isEmpty() ? Collections.emptyList() : clustering(split.right);
List<Clustering<ByteBuffer>> clusterings = Collections.emptyList();
HashSet<Symbol> columns = delete.columns.isEmpty() ? null : new HashSet<>(delete.columns);
for (Clustering<ByteBuffer> pd : pks)
{
Expand All @@ -638,8 +640,15 @@ private void update(Mutation.Delete delete)
DeleteKind kind = DeleteKind.PARTITION;
if (!delete.columns.isEmpty())
kind = DeleteKind.COLUMN;
else if (!clusterings.isEmpty())
kind = DeleteKind.ROW;
else if (containsRangeConditionOnClustering(split.right))
kind = DeleteKind.RANGE;

if (kind != DeleteKind.RANGE)
{
clusterings = split.right.isEmpty() ? Collections.emptyList() : clustering(split.right);
if (kind == DeleteKind.PARTITION && !clusterings.isEmpty())
kind = DeleteKind.ROW;
}

switch (kind)
{
Expand Down Expand Up @@ -671,6 +680,19 @@ else if (!clusterings.isEmpty())
}
}
break;
case RANGE:
assert clusterings.isEmpty();
LookupContext ctx = new LookupContext(delete);
for (BytesPartitionState.Row value : partition.rows())
{
if (ctx.include(value))
{
partition.deleteRow(value.clustering, nowTs);
if (partition.shouldDelete())
partitions.remove(partition.ref());
}
}
break;
default:
throw new UnsupportedOperationException();
}
Expand Down Expand Up @@ -1016,6 +1038,10 @@ private List<Clustering<ByteBuffer>> clustering(List<Conditional> conditionals)
if (factory.clusteringColumns.isEmpty()) return Collections.singletonList(Clustering.EMPTY);
throw new IllegalArgumentException("No clustering columns defined in the WHERE clause, but clustering columns exist; expected " + factory.clusteringColumns);
}

if (containsRangeConditionOnClustering(conditionals))
Comment thread
maedhroz marked this conversation as resolved.
return extractClusteringsFromSlices(conditionals);

var split = splitOnClustering(conditionals);
var clusterings = split.left;
var remaining = split.right;
Expand Down Expand Up @@ -1088,6 +1114,131 @@ else if (c instanceof Conditional.In)
return Pair.create(partitionKeys, other);
}

private boolean containsRangeConditionOnClustering(List<Conditional> conditionals)
{
for (Conditional cond : conditionals)
{
if (cond instanceof Conditional.Where)
{
Conditional.Where where = (Conditional.Where) cond;
if (factory.clusteringColumns.contains(where.lhs))
{
switch (where.kind)
{
case GREATER_THAN:
case GREATER_THAN_EQ:
case LESS_THAN:
case LESS_THAN_EQ:
return true;
}
}
}
else if (cond instanceof Conditional.Between)
{
Conditional.Between between = (Conditional.Between) cond;
if (between.ref instanceof Symbol && factory.clusteringColumns.contains((Symbol) between.ref))
return true;
}
}
return false;
}

private List<Clustering<ByteBuffer>> extractClusteringsFromSlices(List<Conditional> conditionals)
{
Map<Symbol, List<Conditional.Where>> rangeConditions = new HashMap<>();
// Build slices
Slices.Builder builder = new Slices.Builder(factory.clusteringComparator);

for (Conditional cond : conditionals)
{
if (cond instanceof Conditional.Where)
{
Conditional.Where where = (Conditional.Where) cond;
if (factory.clusteringColumns.contains(where.lhs)
&& (where.kind != Inequality.EQUAL)) // skip equality
{
Symbol col = (Symbol) where.lhs;
rangeConditions.computeIfAbsent(col, __ -> new ArrayList<>()).add(where);
}
}
else if (cond instanceof Conditional.Between)
{
Conditional.Between between = (Conditional.Between) cond;
ByteBuffer start = eval(between.start);
ByteBuffer end = eval(between.end);

ClusteringBound<ByteBuffer> first = ClusteringBound.inclusiveStartOf(Clustering.make(start));
ClusteringBound<ByteBuffer> last = ClusteringBound.inclusiveEndOf(Clustering.make(end));
Slice slice = Slice.make(first, last);
// To avoid adding empty slices
if (!slice.isEmpty(factory.clusteringComparator))
builder.add(slice);
}
}

for (Map.Entry<Symbol, List<Conditional.Where>> entry : rangeConditions.entrySet())
{
ByteBuffer lower = null;
ByteBuffer upper = null;
boolean includeLower = false;
boolean includeUpper = false;

for (Conditional.Where cond : entry.getValue())
{
ByteBuffer val = eval(cond.rhs);
switch (cond.kind)
{
case GREATER_THAN: lower = val; includeLower = false; break;
case GREATER_THAN_EQ: lower = val; includeLower = true; break;
case LESS_THAN: upper = val; includeUpper = false; break;
case LESS_THAN_EQ: upper = val; includeUpper = true; break;
}
}

ClusteringBound<?> start = (lower == null)
? ClusteringBound.BOTTOM
: (includeLower
? ClusteringBound.inclusiveStartOf(Clustering.make(lower))
: ClusteringBound.exclusiveStartOf(Clustering.make(lower)));

ClusteringBound<?> end = (upper == null)
? ClusteringBound.TOP
: (includeUpper
? ClusteringBound.inclusiveEndOf(Clustering.make(upper))
: ClusteringBound.exclusiveEndOf(Clustering.make(upper)));

Slice slice = Slice.make(start, end);
// To avoid adding empty slices
if (!slice.isEmpty(factory.clusteringComparator))
builder.add(slice);
}
List<Clustering<ByteBuffer>> clusterings = new ArrayList<>();
for (Slice slice : builder.build())
{
if (!slice.start().isBottom())
clusterings.add(createClusteringBound(slice.start()));
if (!slice.end().isTop())
clusterings.add(createClusteringBound(slice.end()));
}

return clusterings;
}


private static Clustering<ByteBuffer> createClusteringBound(ClusteringBound<?> bound)
{
if (bound == null)
throw new IllegalArgumentException("ClusteringBound should not be null.");

int size = bound.size();
ByteBuffer[] values = new ByteBuffer[size];

for (int i = 0; i < size; i++)
values[i] = (ByteBuffer) bound.get(i);

return Clustering.make(values);
}

private static ImmutableUniqueList<Clustering<ByteBuffer>> keys(Collection<Symbol> columns, Map<? extends ReferenceExpression, List<ByteBuffer>> columnValues)
{
return keys(columns, columnValues, Function.identity());
Expand Down
Loading