Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/java/org/apache/cassandra/db/rows/ArrayCell.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@

import java.nio.ByteBuffer;

import org.apache.cassandra.db.ExpirationDateOverflowHandling;
import org.apache.cassandra.db.marshal.ByteArrayAccessor;
import org.apache.cassandra.db.marshal.ByteType;
import org.apache.cassandra.db.marshal.ValueAccessor;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.utils.ByteArrayUtil;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.memory.AbstractAllocator;

import static org.apache.cassandra.utils.ByteArrayUtil.EMPTY_BYTE_ARRAY;

public class ArrayCell extends AbstractCell<byte[]>
{
private static final long EMPTY_SIZE = ObjectSizes.measure(new ArrayCell(ColumnMetadata.regularColumn("", "", "", ByteType.instance), 0L, 0, 0, ByteArrayUtil.EMPTY_BYTE_ARRAY, null));
private static final long EMPTY_SIZE = ObjectSizes.measure(new ArrayCell(ColumnMetadata.regularColumn("", "", "", ByteType.instance), 0L, 0, 0, EMPTY_BYTE_ARRAY, null));

private final long timestamp;
private final int ttl;
Expand Down Expand Up @@ -96,6 +96,12 @@ public Cell<?> withUpdatedTimestampAndLocalDeletionTime(long newTimestamp, int n
return new ArrayCell(column, newTimestamp, ttl, newLocalDeletionTime, value, path);
}

@Override
public Cell<?> withSkippedValue()
{
return new ArrayCell(column, timestamp, ttl, localDeletionTime, EMPTY_BYTE_ARRAY, path);
}

public Cell<?> copy(AbstractAllocator allocator)
{
if (value.length == 0)
Expand Down
12 changes: 9 additions & 3 deletions src/java/org/apache/cassandra/db/rows/BTreeRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setA
{
Map<ByteBuffer, DroppedColumn> droppedColumns = metadata.droppedColumns;

boolean mayFilterColumns = !filter.fetchesAllColumns(isStatic());
boolean mayFilterColumns = !filter.fetchesAllColumns(isStatic()) || !filter.allFetchedColumnsAreQueried();
// When merging sstable data in Row.Merger#merge(), rowDeletion is removed if it doesn't supersede activeDeletion.
boolean mayHaveShadowed = !activeDeletion.isLive() && !deletion.time().supersedes(activeDeletion);

Expand Down Expand Up @@ -365,8 +365,14 @@ public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setA
// is lower than the row timestamp (see #10657 or SerializationHelper.includes() for details).
boolean isForDropped = dropped != null && cell.timestamp() <= dropped.droppedTime;
boolean isShadowed = mayHaveShadowed && activeDeletion.deletes(cell);
boolean isSkippable = !queriedByUserTester.test(column) && cell.timestamp() < rowLiveness.timestamp();
return isForDropped || isShadowed || isSkippable ? null : cell;
boolean isSkippable = !queriedByUserTester.test(column);

if (isForDropped || isShadowed || (isSkippable && cell.timestamp() < rowLiveness.timestamp()))
return null;

// We should apply the same "optimization" as in Cell.deserialize to avoid discrepances
// between sstables and memtables data, i.e resulting in a digest mismatch.
return isSkippable ? cell.withSkippedValue() : cell;
});
}

Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/db/rows/BufferCell.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ public Cell<?> withUpdatedTimestampAndLocalDeletionTime(long newTimestamp, int n
return new BufferCell(column, newTimestamp, ttl, newLocalDeletionTime, value, path);
}

public Cell<?> withSkippedValue()
{
return withUpdatedValue(ByteBufferUtil.EMPTY_BYTE_BUFFER);
}

public Cell<?> copy(AbstractAllocator allocator)
{
if (!value.hasRemaining())
Expand Down
7 changes: 7 additions & 0 deletions src/java/org/apache/cassandra/db/rows/Cell.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ public ByteBuffer buffer()

public abstract Cell<?> withUpdatedTimestampAndLocalDeletionTime(long newTimestamp, int newLocalDeletionTime);

/**
* Used to apply the same optimization as in {@link Cell.Serializer#deserialize} when
* the column is not queried but eventhough it's used for digest calculation.
* @return a cell with an empty buffer as value
*/
public abstract Cell<?> withSkippedValue();

public abstract Cell<?> copy(AbstractAllocator allocator);

@Override
Expand Down
15 changes: 11 additions & 4 deletions src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,24 @@ public ComplexColumnData markCounterLocalToBeCleared()
public ComplexColumnData filter(ColumnFilter filter, DeletionTime activeDeletion, DroppedColumn dropped, LivenessInfo rowLiveness)
{
ColumnFilter.Tester cellTester = filter.newTester(column);
if (cellTester == null && activeDeletion.isLive() && dropped == null)
boolean isQueriedColumn = filter.fetchedColumnIsQueried(column);
if (cellTester == null && activeDeletion.isLive() && dropped == null && isQueriedColumn)
return this;

DeletionTime newDeletion = activeDeletion.supersedes(complexDeletion) ? DeletionTime.LIVE : complexDeletion;
return transformAndFilter(newDeletion, (cell) ->
{
CellPath path = cell.path();
boolean isForDropped = dropped != null && cell.timestamp() <= dropped.droppedTime;
boolean isShadowed = activeDeletion.deletes(cell);
boolean isSkippable = cellTester != null && (!cellTester.fetches(cell.path())
|| (!cellTester.fetchedCellIsQueried(cell.path()) && cell.timestamp() < rowLiveness.timestamp()));
return isForDropped || isShadowed || isSkippable ? null : cell;
boolean isFetchedCell = cellTester == null || cellTester.fetches(path);
boolean isQueriedCell = isQueriedColumn && isFetchedCell && (cellTester == null || cellTester.fetchedCellIsQueried(path));
boolean isSkippableCell = !isFetchedCell || (!isQueriedCell && cell.timestamp() < rowLiveness.timestamp());
if (isForDropped || isShadowed || isSkippableCell)
return null;
// We should apply the same "optimization" as in Cell.deserialize to avoid discrepances
// between sstables and memtables data, i.e resulting in a digest mismatch.
return isQueriedCell ? cell : cell.withSkippedValue();
});
}

Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/db/rows/NativeCell.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.db.marshal.ValueAccessor;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.memory.MemoryUtil;
Expand Down Expand Up @@ -160,6 +161,11 @@ public Cell<?> withUpdatedColumn(ColumnMetadata column)
return new BufferCell(column, timestamp(), ttl(), localDeletionTime(), value(), path());
}

public Cell withSkippedValue()
{
return new BufferCell(column, timestamp(), ttl(), localDeletionTime(), ByteBufferUtil.EMPTY_BYTE_BUFFER, path());
}

public long unsharedHeapSizeExcludingData()
{
return EMPTY_SIZE;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.db;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableSet;
import java.util.function.Function;

import com.google.common.collect.Sets;
import org.junit.Test;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;

import static org.junit.Assert.assertEquals;

public class SSTableAndMemTableDigestMatchTest extends CQLTester
{
private final static long writeTime = System.currentTimeMillis() * 1000L;

@Test
public void testSelectAllColumns() throws Throwable
{
testWithFilter(tableMetadata ->
ColumnFilter.all(tableMetadata));
}

@Test
public void testSelectNoColumns() throws Throwable
{
testWithFilter(tableMetadata ->
ColumnFilter.selection(tableMetadata, RegularAndStaticColumns.builder().build()));
}

@Test
public void testSelectEmptyColumn() throws Throwable
{
testWithFilter(tableMetadata ->
ColumnFilter.selection(tableMetadata, RegularAndStaticColumns.of(tableMetadata.getColumn(ColumnIdentifier.getInterned("e", false)))));
}

@Test
public void testSelectNonEmptyColumn() throws Throwable
{
testWithFilter(tableMetadata ->
ColumnFilter.selection(tableMetadata, RegularAndStaticColumns.of(tableMetadata.getColumn(ColumnIdentifier.getInterned("v1", false)))));
}

@Test
public void testSelectEachNonEmptyColumn() throws Throwable
{
testWithFilter(tableMetadata ->
ColumnFilter.selection(tableMetadata,
RegularAndStaticColumns.builder()
.add(tableMetadata.getColumn(ColumnIdentifier.getInterned("v1", false)))
.add(tableMetadata.getColumn(ColumnIdentifier.getInterned("v2", false)))
.build()));
}

@Test
public void testSelectCellsFromEmptyComplexColumn() throws Throwable
{
testWithFilter(tableMetadata -> ColumnFilter.selectionBuilder().select(tableMetadata.getColumn(ColumnIdentifier.getInterned("em", false)),
CellPath.create(Int32Type.instance.decompose(5))).build());
}

@Test
public void testSelectNonEmptyCellsFromComplexColumn() throws Throwable
{
testWithFilter(tableMetadata -> ColumnFilter.selectionBuilder().select(tableMetadata.getColumn(ColumnIdentifier.getInterned("m", false)),
CellPath.create(Int32Type.instance.decompose(1))).build());
}

@Test
public void testSelectEmptyCellsFromNonEmptyComplexColumn() throws Throwable
{
testWithFilter(tableMetadata -> ColumnFilter.selectionBuilder().select(tableMetadata.getColumn(ColumnIdentifier.getInterned("m", false)),
CellPath.create(Int32Type.instance.decompose(5))).build());
}

private void testWithFilter(Function<TableMetadata, ColumnFilter> filterFactory) throws Throwable
{
Map<Integer, Integer> m = new HashMap<>();
m.put(1, 10);
createTable("CREATE TABLE %s (k int PRIMARY KEY, v1 int, v2 int, e text, m map<int, int>, em map<int, int>)");
execute("INSERT INTO %s (k, v1, v2, m) values (?, ?, ?, ?) USING TIMESTAMP ?", 1, 2, 3, m, writeTime);

ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
ColumnFilter filter = filterFactory.apply(cfs.metadata());
String digest1 = getDigest(filter);
flush();
String digest2 = getDigest(filter);

assertEquals(digest1, digest2);
}

private String getDigest(ColumnFilter filter)
{
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
NavigableSet<Clustering<?>> clusterings = Sets.newTreeSet(new ClusteringComparator());
clusterings.add(Clustering.EMPTY);
BufferDecoratedKey key = new BufferDecoratedKey(DatabaseDescriptor.getPartitioner().getToken(Int32Type.instance.decompose(1)),
Int32Type.instance.decompose(1));
SinglePartitionReadCommand cmd = SinglePartitionReadCommand
.create(cfs.metadata(),
(int) (System.currentTimeMillis() / 1000),
key,
filter,
new ClusteringIndexNamesFilter(clusterings, false)).copyAsDigestQuery();
cmd.setDigestVersion(MessagingService.current_version);
ReadResponse resp;
try (ReadExecutionController ctrl = ReadExecutionController.forCommand(cmd); UnfilteredRowIterator iterator = cmd.queryMemtableAndDisk(cfs, ctrl))
{
resp = ReadResponse.createDataResponse(new SingletonUnfilteredPartitionIterator(iterator), cmd);
logger.info("Response is: {}", resp.toDebugString(cmd, key));
ByteBuffer digest = resp.digest(cmd);
return ByteBufferUtil.bytesToHex(digest);
}
}
}