Skip to content

Commit

Permalink
Ensure that empty SAI column indexes do not fail on validation after …
Browse files Browse the repository at this point in the history
…full-SSTable streaming

patch by Andrés de la Peña; reviewed by Caleb Rackliffe for CASSANDRA-19017

Co-authored-by: Andrés de la Peña <a.penya.garcia@gmail.com>
Co-authored-by: Caleb Rackliffe <calebrackliffe@gmail.com>
  • Loading branch information
adelapena and maedhroz committed Nov 22, 2023
1 parent a2911c7 commit e8fb4b2
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
5.0-beta1
* Ensure that empty SAI column indexes do not fail on validation after full-SSTable streaming (CASSANDRA-19017)
* SAI in-memory index should check max term size (CASSANDRA-18926)
* Set default disk_access_mode to mmap_index_only (CASSANDRA-19021)
* Exclude net.java.dev.jna:jna dependency from dependencies of org.caffinitas.ohc:ohc-core (CASSANDRA-18992)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.index.sai.disk.v1;

import java.io.IOException;

import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.disk.format.IndexComponent;
import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
import org.apache.cassandra.index.sai.disk.io.IndexOutputWriter;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;

/**
* Utility class for creating and reading the column completion marker, {@link IndexComponent#COLUMN_COMPLETION_MARKER}.
* </p>
* The file has a header and a footer, as written by {@link SAICodecUtils#writeHeader(IndexOutput)} and
* {@link SAICodecUtils#writeFooter(IndexOutput)}. The only content of the file is a single byte indicating whether the
* column index is empty or not. If the index is empty the completion marker will be the only per-index component.
*/
public class ColumnCompletionMarkerUtil
{
private static final byte EMPTY = (byte) 1;
private static final byte NOT_EMPTY = (byte) 0;

/**
* Creates a column index completion marker for the specified column index, storing in it whether the index is empty.
*
* @param descriptor the index descriptor
* @param context the column index context
* @param isEmpty whether the index is empty
*/
public static void create(IndexDescriptor descriptor, IndexContext context, boolean isEmpty) throws IOException
{
try (IndexOutputWriter output = descriptor.openPerIndexOutput(IndexComponent.COLUMN_COMPLETION_MARKER, context))
{
SAICodecUtils.writeHeader(output);
output.writeByte(isEmpty ? EMPTY : NOT_EMPTY);
SAICodecUtils.writeFooter(output);
}
}

/**
* Reads the column index completion marker and returns whether if the index is empty.
*
* @param descriptor the index descriptor
* @param context the column index context
* @return {@code true} if the index is empty, {@code false} otherwise.
*/
public static boolean isEmptyIndex(IndexDescriptor descriptor, IndexContext context) throws IOException
{
try (IndexInput input = descriptor.openPerIndexInput(IndexComponent.COLUMN_COMPLETION_MARKER, context))
{
SAICodecUtils.checkHeader(input); // consume header
return input.readByte() == EMPTY;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public void complete(Stopwatch stopwatch) throws IOException
logger.debug(indexContext.logMessage("No indexed rows to flush from SSTable {}."), indexDescriptor.sstableDescriptor);
// Write a completion marker even though we haven't written anything to the index,
// so we won't try to build the index again for the SSTable
indexDescriptor.createComponentOnDisk(IndexComponent.COLUMN_COMPLETION_MARKER, indexContext);
ColumnCompletionMarkerUtil.create(indexDescriptor, indexContext, true);

return;
}

Expand Down Expand Up @@ -204,7 +205,8 @@ private void flushVectorIndex(long startTime, Stopwatch stopwatch) throws IOExce

private void completeIndexFlush(long cellCount, long startTime, Stopwatch stopwatch) throws IOException
{
indexDescriptor.createComponentOnDisk(IndexComponent.COLUMN_COMPLETION_MARKER, indexContext);
// create a completion marker indicating that the index is complete and not-empty
ColumnCompletionMarkerUtil.create(indexDescriptor, indexContext, false);

indexContext.getIndexMetrics().memtableIndexFlushCount.inc();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ public void complete(Stopwatch stopwatch) throws IOException
}

writeSegmentsMetadata();
indexDescriptor.createComponentOnDisk(IndexComponent.COLUMN_COMPLETION_MARKER, indexContext);

// write column index completion marker, indicating whether the index is empty
ColumnCompletionMarkerUtil.create(indexDescriptor, indexContext, segments.isEmpty());
}
finally
{
Expand Down
77 changes: 46 additions & 31 deletions src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.utils.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -188,51 +189,65 @@ public void validatePerSSTableIndexComponents(IndexDescriptor indexDescriptor, b
{
if (isNotBuildCompletionMarker(indexComponent))
{
try (IndexInput input = indexDescriptor.openPerSSTableInput(indexComponent))
{
if (checksum)
SAICodecUtils.validateChecksum(input);
else
SAICodecUtils.validate(input);
}
catch (Exception e)
{
logger.warn(indexDescriptor.logMessage("{} failed for index component {} on SSTable {}."),
checksum ? "Checksum validation" : "Validation",
indexComponent,
indexDescriptor.sstableDescriptor);
rethrowIOException(e);
}
validateIndexComponent(indexDescriptor, null, indexComponent, checksum);
}
}
}

@Override
public void validatePerColumnIndexComponents(IndexDescriptor indexDescriptor, IndexContext indexContext, boolean checksum)
{
// determine if the index is empty, which would be encoded in the column completion marker
boolean isEmptyIndex = false;
if (indexDescriptor.hasComponent(IndexComponent.COLUMN_COMPLETION_MARKER, indexContext))
{
// first validate the file...
validateIndexComponent(indexDescriptor, indexContext, IndexComponent.COLUMN_COMPLETION_MARKER, checksum);

// ...then read to check if the index is empty
try
{
isEmptyIndex = ColumnCompletionMarkerUtil.isEmptyIndex(indexDescriptor, indexContext);
}
catch (IOException e)
{
rethrowIOException(e);
}
}

for (IndexComponent indexComponent : perColumnIndexComponents(indexContext))
{
if (isNotBuildCompletionMarker(indexComponent))
if (!isEmptyIndex && isNotBuildCompletionMarker(indexComponent))
{
try (IndexInput input = indexDescriptor.openPerIndexInput(indexComponent, indexContext))
{
if (checksum)
SAICodecUtils.validateChecksum(input);
else
SAICodecUtils.validate(input);
}
catch (Exception e)
{
logger.warn(indexDescriptor.logMessage("{} failed for index component {} on SSTable {}"),
checksum ? "Checksum validation" : "Validation",
indexComponent,
indexDescriptor.sstableDescriptor);
rethrowIOException(e);
}
validateIndexComponent(indexDescriptor, indexContext, indexComponent, checksum);
}
}
}

private static void validateIndexComponent(IndexDescriptor indexDescriptor,
IndexContext indexContext,
IndexComponent indexComponent,
boolean checksum)
{
try (IndexInput input = indexContext == null
? indexDescriptor.openPerSSTableInput(indexComponent)
: indexDescriptor.openPerIndexInput(indexComponent, indexContext))
{
if (checksum)
SAICodecUtils.validateChecksum(input);
else
SAICodecUtils.validate(input);
}
catch (Exception e)
{
logger.warn(indexDescriptor.logMessage("{} failed for index component {} on SSTable {}"),
checksum ? "Checksum validation" : "Validation",
indexComponent,
indexDescriptor.sstableDescriptor);
rethrowIOException(e);
}
}

private static void rethrowIOException(Exception e)
{
if (e instanceof IOException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,10 @@ public void testIndexComponentStreaming() throws IOException

int numSSTableComponents = isWide ? V1OnDiskFormat.WIDE_PER_SSTABLE_COMPONENTS.size() : V1OnDiskFormat.SKINNY_PER_SSTABLE_COMPONENTS.size();
int numIndexComponents = isLiteral ? V1OnDiskFormat.LITERAL_COMPONENTS.size() : V1OnDiskFormat.NUMERIC_COMPONENTS.size();
int numComponents = sstableStreamingComponentsCount() + numSSTableComponents + numIndexComponents;
int numComponents = sstableStreamingComponentsCount() + numSSTableComponents + numIndexComponents + 1;

if (isLiteral)
cluster.schemaChange(withKeyspace(
"CREATE CUSTOM INDEX ON %s.test(literal) USING 'StorageAttachedIndex';"
));
else
cluster.schemaChange(withKeyspace(
"CREATE CUSTOM INDEX ON %s.test(numeric) USING 'StorageAttachedIndex';"
));
cluster.schemaChange(withKeyspace("CREATE INDEX ON %s.test(literal) USING 'sai';"));
cluster.schemaChange(withKeyspace("CREATE INDEX ON %s.test(numeric) USING 'sai';"));

cluster.stream().forEach(i ->
i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success()
Expand All @@ -115,12 +109,19 @@ public void testIndexComponentStreaming() throws IOException
IInvokableInstance second = cluster.get(2);
long sstableCount = 10;
long expectedFiles = isZeroCopyStreaming ? sstableCount * numComponents : sstableCount;

for (int i = 0; i < sstableCount; i++)
{
if (isWide)
first.executeInternal(withKeyspace("insert into %s.test(pk, ck, literal, numeric, b) values (?, ?, ?, ?, ?)"), i, i, "v" + i, i, BLOB);
{
String insertTemplate = "INSERT INTO %s.test(pk, ck, " + (isLiteral ? "literal" : "numeric") + ", b) VALUES (?, ?, ?, ?)";
first.executeInternal(withKeyspace(insertTemplate), i, i, isLiteral ? "v" + i : Integer.valueOf(i), BLOB);
}
else
first.executeInternal(withKeyspace("insert into %s.test(pk, literal, numeric, b) values (?, ?, ?, ?)"), i, "v" + i, i, BLOB);
{
String insertTemplate = "INSERT INTO %s.test(pk, " + (isLiteral ? "literal" : "numeric") + ", b) VALUES (?, ?, ?)";
first.executeInternal(withKeyspace(insertTemplate), i, isLiteral ? "v" + i : Integer.valueOf(i), BLOB);
}
first.flush(KEYSPACE);
}

Expand Down

0 comments on commit e8fb4b2

Please sign in to comment.