Skip to content

Commit

Permalink
Permit null components in clustering keys
Browse files Browse the repository at this point in the history
This is different from null values in types like bigint and varint,
which are represented by empty buffers. Actual null components can
appear in COMPACT STORAGE tables, and also in indexed partitions
with static content but no rows.
  • Loading branch information
blambov committed May 4, 2022
1 parent e4ea648 commit b47d034
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 20 deletions.
39 changes: 32 additions & 7 deletions src/java/org/apache/cassandra/db/ClusteringComparator.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@

import static org.apache.cassandra.utils.bytecomparable.ByteSource.EXCLUDED;
import static org.apache.cassandra.utils.bytecomparable.ByteSource.NEXT_COMPONENT;
import static org.apache.cassandra.utils.bytecomparable.ByteSource.NEXT_COMPONENT_EMPTY;
import static org.apache.cassandra.utils.bytecomparable.ByteSource.NEXT_COMPONENT_EMPTY_REVERSED;
import static org.apache.cassandra.utils.bytecomparable.ByteSource.NEXT_COMPONENT_NULL;
import static org.apache.cassandra.utils.bytecomparable.ByteSource.NEXT_COMPONENT_NULL_REVERSED;
import static org.apache.cassandra.utils.bytecomparable.ByteSource.TERMINATOR;

/**
Expand Down Expand Up @@ -309,10 +310,25 @@ public int next()
if (srcnum == sz)
return src.kind().asByteComparableValue(version);

V component = src.get(srcnum);
current = component != null ? subtype(srcnum).asComparableBytes(src.accessor(), component, version) : null;
final V nextComponent = src.get(srcnum);
// We can have a null as the clustering component (this is a relic of COMPACT STORAGE, but also
// can appear in indexed partitions with no rows but static content),
if (nextComponent == null)
{
if (version != Version.LEGACY)
return NEXT_COMPONENT_NULL; // always sorts before non-nulls, including for reversed types
else
{
// legacy version did not permit nulls in clustering keys and treated these as null values
return subtype(srcnum).isReversed() ? NEXT_COMPONENT_EMPTY_REVERSED : NEXT_COMPONENT_EMPTY;
}
}

current = subtype(srcnum).asComparableBytes(src.accessor(), nextComponent, version);
// and also null values for some types (e.g. int, varint but not text) that are encoded as empty
// buffers.
if (current == null)
return subtype(srcnum).isReversed() ? NEXT_COMPONENT_NULL_REVERSED : NEXT_COMPONENT_NULL;
return subtype(srcnum).isReversed() ? NEXT_COMPONENT_EMPTY_REVERSED : NEXT_COMPONENT_EMPTY;

return NEXT_COMPONENT;
}
Expand Down Expand Up @@ -364,7 +380,10 @@ public <V> Clustering<V> clusteringFromByteComparable(ValueAccessor<V> accessor,
switch (sep)
{
case NEXT_COMPONENT_NULL:
case NEXT_COMPONENT_NULL_REVERSED:
components[cc] = null;
break;
case NEXT_COMPONENT_EMPTY:
case NEXT_COMPONENT_EMPTY_REVERSED:
components[cc] = subtype(cc).fromComparableBytes(accessor, null, version);
break;
case NEXT_COMPONENT:
Expand Down Expand Up @@ -416,7 +435,10 @@ public <V> ClusteringBound<V> boundFromByteComparable(ValueAccessor<V> accessor,
switch (sep)
{
case NEXT_COMPONENT_NULL:
case NEXT_COMPONENT_NULL_REVERSED:
components[cc] = null;
break;
case NEXT_COMPONENT_EMPTY:
case NEXT_COMPONENT_EMPTY_REVERSED:
components[cc] = subtype(cc).fromComparableBytes(accessor, null, version);
break;
case NEXT_COMPONENT:
Expand Down Expand Up @@ -470,7 +492,10 @@ public <V> ClusteringBoundary<V> boundaryFromByteComparable(ValueAccessor<V> acc
switch (sep)
{
case NEXT_COMPONENT_NULL:
case NEXT_COMPONENT_NULL_REVERSED:
components[cc] = null;
break;
case NEXT_COMPONENT_EMPTY:
case NEXT_COMPONENT_EMPTY_REVERSED:
components[cc] = subtype(cc).fromComparableBytes(accessor, null, version);
break;
case NEXT_COMPONENT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ public interface ByteSource

// Next component marker.
int NEXT_COMPONENT = 0x40;
int NEXT_COMPONENT_NULL = 0x3F;
int NEXT_COMPONENT_NULL_REVERSED = 0x41;
// Marker used to present null values represented by empty buffers (e.g. by Int32Type)
int NEXT_COMPONENT_EMPTY = 0x3F;
int NEXT_COMPONENT_EMPTY_REVERSED = 0x41;
// Marker for null components in tuples, maps, sets and clustering keys.
int NEXT_COMPONENT_NULL = 0x3E;
// Default terminator byte in sequences. Smaller than NEXT_COMPONENT_NULL, but larger than LT_NEXT_COMPONENT to
// ensure lexicographic compares go in the correct direction
int TERMINATOR = 0x38;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,8 @@ public static ByteSource.Peekable nextComponentSource(ByteSource.Peekable source

public static boolean nextComponentNull(int separator)
{
return separator == ByteSource.NEXT_COMPONENT_NULL || separator == ByteSource.NEXT_COMPONENT_NULL_REVERSED;
return separator == ByteSource.NEXT_COMPONENT_NULL || separator == ByteSource.NEXT_COMPONENT_EMPTY
|| separator == ByteSource.NEXT_COMPONENT_EMPTY_REVERSED;
}

private static void assertValidByte(int data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,19 +324,64 @@ public void testCombinations()
testCombinationSampling(rand, this::assertClusteringPairComparesSame);
}

@Test
public void testNullsInClustering()
{
ByteBuffer[][] inputs = new ByteBuffer[][]
{
new ByteBuffer[] {decomposeAndRandomPad(UTF8Type.instance, "a"),
decomposeAndRandomPad(Int32Type.instance, 0)},
new ByteBuffer[] {decomposeAndRandomPad(UTF8Type.instance, "a"),
decomposeAndRandomPad(Int32Type.instance, null)},
new ByteBuffer[] {decomposeAndRandomPad(UTF8Type.instance, "a"),
null},
new ByteBuffer[] {decomposeAndRandomPad(UTF8Type.instance, ""),
decomposeAndRandomPad(Int32Type.instance, 0)},
new ByteBuffer[] {decomposeAndRandomPad(UTF8Type.instance, ""),
decomposeAndRandomPad(Int32Type.instance, null)},
new ByteBuffer[] {decomposeAndRandomPad(UTF8Type.instance, ""),
null},
new ByteBuffer[] {null,
decomposeAndRandomPad(Int32Type.instance, 0)},
new ByteBuffer[] {null,
decomposeAndRandomPad(Int32Type.instance, null)},
new ByteBuffer[] {null,
null}
};
for (ByteBuffer[] input1 : inputs)
for (ByteBuffer[] input2 : inputs)
{
assertClusteringPairComparesSame(UTF8Type.instance, Int32Type.instance,
input1[0], input1[1], input2[0], input2[1],
(t, v) -> v,
input1[0] != null && input1[1] != null && input2[0] != null && input2[1] != null);
}
}

void assertClusteringPairComparesSame(AbstractType t1, AbstractType t2, Object o1, Object o2, Object o3, Object o4)
{
assertClusteringPairComparesSame(t1, t2, o1, o2, o3, o4, AbstractType::decompose, true);
}

<V> void assertClusteringPairComparesSame(AbstractType t1, AbstractType t2,
V o1, V o2, V o3, V o4,
BiFunction<AbstractType, V, ByteBuffer> decompose,
boolean testLegacy)
{
for (Version v : Version.values())
for (ClusteringPrefix.Kind k1 : ClusteringPrefix.Kind.values())
for (ClusteringPrefix.Kind k2 : ClusteringPrefix.Kind.values())
{
if (!testLegacy && v == Version.LEGACY)
continue;

ClusteringComparator comp = new ClusteringComparator(t1, t2);
ByteBuffer[] b = new ByteBuffer[2];
ByteBuffer[] d = new ByteBuffer[2];
b[0] = t1.decompose(o1);
b[1] = t2.decompose(o2);
d[0] = t1.decompose(o3);
d[1] = t2.decompose(o4);
b[0] = decompose.apply(t1, o1);
b[1] = decompose.apply(t2, o2);
d[0] = decompose.apply(t1, o3);
d[1] = decompose.apply(t2, o4);
ClusteringPrefix<ByteBuffer> c = makeBound(k1, b);
ClusteringPrefix<ByteBuffer> e = makeBound(k2, d);
final ByteComparable bsc = comp.asByteComparable(c);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -286,21 +287,54 @@ public void testCombinations()
testCombinationSampling(rand, this::assertClusteringPairConvertsSame);
}

@Test
public void testNullsInClustering()
{
ByteBuffer[][] inputs = new ByteBuffer[][]
{
new ByteBuffer[] {decomposeAndRandomPad(UTF8Type.instance, "a"),
decomposeAndRandomPad(Int32Type.instance, 0)},
new ByteBuffer[] {decomposeAndRandomPad(UTF8Type.instance, "a"),
decomposeAndRandomPad(Int32Type.instance, null)},
new ByteBuffer[] {decomposeAndRandomPad(UTF8Type.instance, "a"),
null},
new ByteBuffer[] {decomposeAndRandomPad(UTF8Type.instance, ""),
decomposeAndRandomPad(Int32Type.instance, 0)},
new ByteBuffer[] {decomposeAndRandomPad(UTF8Type.instance, ""),
decomposeAndRandomPad(Int32Type.instance, null)},
new ByteBuffer[] {decomposeAndRandomPad(UTF8Type.instance, ""),
null},
new ByteBuffer[] {null,
decomposeAndRandomPad(Int32Type.instance, 0)},
new ByteBuffer[] {null,
decomposeAndRandomPad(Int32Type.instance, null)},
new ByteBuffer[] {null,
null}
};
for (ByteBuffer[] input : inputs)
{
assertClusteringPairConvertsSame(ByteBufferAccessor.instance, UTF8Type.instance, Int32Type.instance, input[0], input[1], (t, v) -> v);
}
}

void assertClusteringPairConvertsSame(AbstractType t1, AbstractType t2, Object o1, Object o2)
{
for (ValueAccessor<?> accessor : ValueAccessors.ACCESSORS)
assertClusteringPairConvertsSame(accessor, t1, t2, o1, o2);
assertClusteringPairConvertsSame(accessor, t1, t2, o1, o2, AbstractType::decompose);
}

<V> void assertClusteringPairConvertsSame(ValueAccessor<V> accessor, AbstractType t1, AbstractType t2, Object o1, Object o2)
<T, V> void assertClusteringPairConvertsSame(ValueAccessor<V> accessor,
AbstractType t1, AbstractType t2,
T o1, T o2,
BiFunction<AbstractType, T, ByteBuffer> decompose)
{
boolean checkEquals = t1 != DecimalType.instance && t2 != DecimalType.instance;
for (ClusteringPrefix.Kind k1 : ClusteringPrefix.Kind.values())
{
ClusteringComparator comp = new ClusteringComparator(t1, t2);
V[] b = accessor.createArray(2);
b[0] = accessor.valueOf(t1.decompose(o1));
b[1] = accessor.valueOf(t2.decompose(o2));
b[0] = accessor.valueOf(decompose.apply(t1, o1));
b[1] = accessor.valueOf(decompose.apply(t2, o2));
ClusteringPrefix<V> c = ByteSourceComparisonTest.makeBound(accessor.factory(), k1, b);
final ByteComparable bsc = comp.asByteComparable(c);
logger.info("Clustering {} bytesource {}", c.clusteringString(comp.subtypes()), bsc.byteComparableAsString(VERSION));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ public void testGetBoundFromPrefixTerminator()
assertEquals(ByteSource.NEXT_COMPONENT, comparableBytes.next());
DECIMAL.fromComparableBytes(comparableBytes, version);
// Expect null-signifying separator here.
assertEquals(ByteSource.NEXT_COMPONENT_NULL, comparableBytes.next());
assertEquals(ByteSource.NEXT_COMPONENT_EMPTY, comparableBytes.next());
// No varint to read
// Expect the last separator (i.e. the terminator) to be the one specified by the prefix kind.
assertEquals(prefixKind.asByteComparableValue(version), comparableBytes.next());
Expand All @@ -504,7 +504,7 @@ public void testGetBoundFromPrefixTerminator()
assertEquals(ByteSource.NEXT_COMPONENT, comparableBytes.next());
DECIMAL.fromComparableBytes(comparableBytes, version);
// Expect reversed null-signifying separator here.
assertEquals(ByteSource.NEXT_COMPONENT_NULL_REVERSED, comparableBytes.next());
assertEquals(ByteSource.NEXT_COMPONENT_EMPTY_REVERSED, comparableBytes.next());
// No varint to read
// Expect the last separator (i.e. the terminator) to be the one specified by the prefix kind.
assertEquals(prefixKind.asByteComparableValue(version), comparableBytes.next());
Expand Down

0 comments on commit b47d034

Please sign in to comment.