Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSANDRA-18345 Stream all components registered by an SSTable #2420

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -121,7 +121,7 @@ public SSTableMultiWriter read(DataInputPlus in) throws IOException
prettyPrintMemory(bytesRead),
prettyPrintMemory(totalSize));

writer.writeComponent(component.type, in, length);
writer.writeComponent(component, in, length);
session.progress(writer.descriptor.fileFor(component).toString(), ProgressInfo.Direction.IN, length, length, length);
bytesRead += length;

Expand Down
Expand Up @@ -65,7 +65,7 @@ public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref,

this.filename = sstable.getFilename();
this.shouldStreamEntireSSTable = computeShouldStreamEntireSSTables();
ComponentManifest manifest = ComponentManifest.create(sstable.descriptor);
ComponentManifest manifest = ComponentManifest.create(sstable);
this.header = makeHeader(sstable, operation, sections, estimatedKeys, shouldStreamEntireSSTable, manifest);
}

Expand Down Expand Up @@ -154,7 +154,7 @@ public void write(StreamSession session, StreamingDataOutputPlus out, int versio
// redistribution, otherwise file sizes recorded in component manifest will be different from actual
// file sizes.
// Recreate the latest manifest and hard links for mutatable components in case they are modified.
try (ComponentContext context = sstable.runWithLock(ignored -> ComponentContext.create(sstable.descriptor)))
try (ComponentContext context = sstable.runWithLock(ignored -> ComponentContext.create(sstable)))
{
CassandraStreamHeader current = makeHeader(sstable, operation, sections, estimatedKeys, true, context.manifest());
CassandraStreamHeader.serializer.serialize(current, out, version);
Expand Down
Expand Up @@ -28,6 +28,7 @@

import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileUtils;

Expand All @@ -44,8 +45,9 @@ private ComponentContext(Map<Component, File> hardLinks, ComponentManifest manif
this.manifest = manifest;
}

public static ComponentContext create(Descriptor descriptor)
public static ComponentContext create(SSTable sstable)
{
Descriptor descriptor = sstable.descriptor;
Map<Component, File> hardLinks = new HashMap<>(1);

for (Component component : descriptor.getFormat().mutableComponents())
Expand All @@ -59,7 +61,7 @@ public static ComponentContext create(Descriptor descriptor)
hardLinks.put(component, hardlink);
}

return new ComponentContext(hardLinks, ComponentManifest.create(descriptor));
return new ComponentContext(hardLinks, ComponentManifest.create(sstable));
}

public ComponentManifest manifest()
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
Expand All @@ -33,7 +34,7 @@
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
Expand All @@ -52,13 +53,14 @@ public ComponentManifest(Map<Component, Long> components)
}

@VisibleForTesting
public static ComponentManifest create(Descriptor descriptor)
public static ComponentManifest create(SSTable sstable)
{
LinkedHashMap<Component, Long> components = new LinkedHashMap<>(descriptor.getFormat().streamingComponents().size());
Set<Component> streamingComponents = sstable.getStreamingComponents();
LinkedHashMap<Component, Long> components = new LinkedHashMap<>(streamingComponents.size());

for (Component component : descriptor.getFormat().streamingComponents())
for (Component component : streamingComponents)
{
File file = descriptor.fileFor(component);
File file = sstable.descriptor.fileFor(component);
if (!file.exists())
continue;

Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.EnumSet;
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -54,13 +55,15 @@ public class V1OnDiskFormat implements OnDiskFormat
{
private static final Logger logger = LoggerFactory.getLogger(V1OnDiskFormat.class);

private static final Set<IndexComponent> PER_SSTABLE_COMPONENTS = EnumSet.of(IndexComponent.GROUP_COMPLETION_MARKER,
@VisibleForTesting
public static final Set<IndexComponent> PER_SSTABLE_COMPONENTS = EnumSet.of(IndexComponent.GROUP_COMPLETION_MARKER,
IndexComponent.GROUP_META,
IndexComponent.TOKEN_VALUES,
IndexComponent.PRIMARY_KEY_TRIE,
IndexComponent.PRIMARY_KEY_BLOCKS,
IndexComponent.PRIMARY_KEY_BLOCK_OFFSETS);
private static final Set<IndexComponent> LITERAL_COMPONENTS = EnumSet.of(IndexComponent.COLUMN_COMPLETION_MARKER,
@VisibleForTesting
public static final Set<IndexComponent> LITERAL_COMPONENTS = EnumSet.of(IndexComponent.COLUMN_COMPLETION_MARKER,
IndexComponent.META,
IndexComponent.TERMS_DATA,
IndexComponent.POSTING_LISTS);
Expand Down Expand Up @@ -170,10 +173,11 @@ public boolean validatePerSSTableIndexComponents(IndexDescriptor indexDescriptor
{
if (logger.isDebugEnabled())
{
logger.debug(indexDescriptor.logMessage("{} failed for index component {} on SSTable {}"),
(checksum ? "Checksum validation" : "Validation"),
logger.debug(indexDescriptor.logMessage("{} failed for index component {} on SSTable {}. Error: {}"),
checksum ? "Checksum validation" : "Validation",
indexComponent,
indexDescriptor.sstableDescriptor);
indexDescriptor.sstableDescriptor,
e);
}
return false;
}
Expand Down
26 changes: 15 additions & 11 deletions src/java/org/apache/cassandra/io/sstable/Component.java
Expand Up @@ -52,6 +52,7 @@ public final static class Type
public final int id;
public final String name;
public final String repr;
public final boolean streamable;
private final Component singleton;

@SuppressWarnings("rawtypes")
Expand All @@ -60,31 +61,34 @@ public final static class Type
/**
* Creates a new non-singleton type and registers it a global type registry - see {@link #registerType(Type)}.
*
* @param name type name, must be unique for this and all parent formats
* @param repr the regular expression to be used to recognize a name represents this type
* @param formatClass format class for which this type is defined for
* @param name type name, must be unique for this and all parent formats
* @param repr the regular expression to be used to recognize a name represents this type
* @param isStreamable whether components of this type should be streamed to other nodes
* @param formatClass format class for which this type is defined for
*/
public static Type create(String name, String repr, Class<? extends SSTableFormat<?, ?>> formatClass)
public static Type create(String name, String repr, boolean isStreamable, Class<? extends SSTableFormat<?, ?>> formatClass)
{
return new Type(name, repr, false, formatClass);
return new Type(name, repr, false, isStreamable, formatClass);
pkolaczk marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Creates a new singleton type and registers it in a global type registry - see {@link #registerType(Type)}.
*
* @param name type name, must be unique for this and all parent formats
* @param repr the regular expression to be used to recognize a name represents this type
* @param formatClass format class for which this type is defined for
* @param name type name, must be unique for this and all parent formats
* @param repr the regular expression to be used to recognize a name represents this type
* @param isStreamable whether components of this type should be streamed to other nodes
* @param formatClass format class for which this type is defined for
*/
public static Type createSingleton(String name, String repr, Class<? extends SSTableFormat<?, ?>> formatClass)
public static Type createSingleton(String name, String repr, boolean isStreamable, Class<? extends SSTableFormat<?, ?>> formatClass)
pkolaczk marked this conversation as resolved.
Show resolved Hide resolved
{
return new Type(name, repr, true, formatClass);
return new Type(name, repr, true, isStreamable, formatClass);
}

private Type(String name, String repr, boolean isSingleton, Class<? extends SSTableFormat<?, ?>> formatClass)
private Type(String name, String repr, boolean isSingleton, boolean streamable, Class<? extends SSTableFormat<?, ?>> formatClass)
{
this.name = Objects.requireNonNull(name);
this.repr = repr;
this.streamable = streamable;
this.id = typesCollector.size();
this.formatClass = formatClass == null ? SSTableFormat.class : formatClass;
this.singleton = isSingleton ? new Component(this) : null;
Expand Down
11 changes: 11 additions & 0 deletions src/java/org/apache/cassandra/io/sstable/SSTable.java
Expand Up @@ -26,6 +26,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -162,6 +163,16 @@ public Set<Component> getComponents()
return ImmutableSet.copyOf(components);
}

/**
* Returns all SSTable components that should be streamed.
*/
public Set<Component> getStreamingComponents()
{
return components.stream()
.filter(c -> c.type.streamable)
.collect(Collectors.toSet());
}

public TableMetadata metadata()
{
return metadata.get();
Expand Down
27 changes: 16 additions & 11 deletions src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
Expand Up @@ -23,10 +23,10 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +36,7 @@
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.SequentialWriter;
Expand All @@ -50,7 +51,7 @@ public class SSTableZeroCopyWriter extends SSTable implements SSTableMultiWriter
private static final Logger logger = LoggerFactory.getLogger(SSTableZeroCopyWriter.class);

private volatile SSTableReader finalReader;
private final Map<Component.Type, SequentialWriter> componentWriters;
private final Map<String, SequentialWriter> componentWriters; // indexed by component name

public SSTableZeroCopyWriter(Builder<?, ?> builder,
LifecycleNewTracker lifecycleNewTracker,
Expand All @@ -61,12 +62,15 @@ public SSTableZeroCopyWriter(Builder<?, ?> builder,
lifecycleNewTracker.trackNew(this);
this.componentWriters = new HashMap<>();

if (!descriptor.getFormat().streamingComponents().containsAll(components))
throw new AssertionError(format("Unsupported streaming component detected %s",
Sets.difference(ImmutableSet.copyOf(components), descriptor.getFormat().streamingComponents())));
Set<Component> unsupported = components.stream()
.filter(c -> !descriptor.getFormat().streamingComponents().contains(c))
.filter(c-> c.type != SSTableFormat.Components.Types.CUSTOM)
.collect(Collectors.toSet());
if (!unsupported.isEmpty())
throw new AssertionError(format("Unsupported streaming component detected %s", unsupported));
pkolaczk marked this conversation as resolved.
Show resolved Hide resolved

pkolaczk marked this conversation as resolved.
Show resolved Hide resolved
for (Component c : components)
componentWriters.put(c.type, makeWriter(descriptor, c));
componentWriters.put(c.name, makeWriter(descriptor, c));
}

@Override
Expand Down Expand Up @@ -195,14 +199,15 @@ public void close()
writer.close();
}

public void writeComponent(Component.Type type, DataInputPlus in, long size) throws ClosedChannelException
public void writeComponent(Component component, DataInputPlus in, long size) throws ClosedChannelException
{
logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size));
SequentialWriter writer = componentWriters.get(component.name);
pkolaczk marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Writing component {} to {} length {}", component, writer.getPath(), prettyPrintMemory(size));

if (in instanceof AsyncStreamingInputPlus)
write((AsyncStreamingInputPlus) in, size, componentWriters.get(type));
write((AsyncStreamingInputPlus) in, size, writer);
else
write(in, size, componentWriters.get(type));
write(in, size, writer);
}

private void write(AsyncStreamingInputPlus in, long size, SequentialWriter writer) throws ClosedChannelException
Expand Down
32 changes: 22 additions & 10 deletions src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

import org.apache.cassandra.db.ColumnFamilyStore;
Expand Down Expand Up @@ -59,7 +60,18 @@ public interface SSTableFormat<R extends SSTableReader, W extends SSTableWriter>
*/
Set<Component> allComponents();

Set<Component> streamingComponents();
/**
* Returns the components that should be streamed to other nodes on repair / rebuild.
* This includes only the core SSTable components produced by this format.
* Custom components registered by e.g. secondary indexes are not included.
* Use {@link SSTableReader#getStreamingComponents()} for the list of all components including the custom ones.
*/
default Set<Component> streamingComponents()
pkolaczk marked this conversation as resolved.
Show resolved Hide resolved
{
return allComponents().stream()
.filter(c -> c.type.streamable)
.collect(Collectors.toSet());
}

Set<Component> primaryComponents();

Expand Down Expand Up @@ -156,23 +168,23 @@ public static class Types
{
// the base data for an sstable: the remaining components can be regenerated
// based on the data component
public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", null);
public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", true, null);
// file to hold information about uncompressed data length, chunk offsets etc.
public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", null);
public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", true, null);
// statistical metadata about the content of the sstable
public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", null);
public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", true, null);
// serialized bloom filter for the row keys in the sstable
public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", null);
public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", true, null);
// holds CRC32 checksum of the data file
public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", null);
public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", true, null);
// holds the CRC32 for chunks in an uncompressed file.
public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", null);
public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", true, null);
// table of contents, stores the list of all components for the sstable
public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", null);
public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", false, null);
// built-in secondary index (may exist multiple per sstable)
public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", null);
public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", true, null);
pkolaczk marked this conversation as resolved.
Show resolved Hide resolved
// custom component, used by e.g. custom compaction strategy
public static final Component.Type CUSTOM = Component.Type.create("CUSTOM", null, null);
public static final Component.Type CUSTOM = Component.Type.create("CUSTOM", null, true, null);
pkolaczk marked this conversation as resolved.
Show resolved Hide resolved
}

// singleton components for types that don't need ids
Expand Down
Expand Up @@ -82,9 +82,9 @@ public static class Components extends SSTableFormat.Components
public static class Types extends SSTableFormat.Components.Types
{
// index of the row keys with pointers to their positions in the data file
public static final Component.Type PRIMARY_INDEX = Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", BigFormat.class);
public static final Component.Type PRIMARY_INDEX = Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", true, BigFormat.class);
pkolaczk marked this conversation as resolved.
Show resolved Hide resolved
// holds SSTable Index Summary (sampling of Index component)
public static final Component.Type SUMMARY = Component.Type.createSingleton("SUMMARY", "Summary.db", BigFormat.class);
public static final Component.Type SUMMARY = Component.Type.createSingleton("SUMMARY", "Summary.db", true, BigFormat.class);
}

public final static Component PRIMARY_INDEX = Types.PRIMARY_INDEX.getSingleton();
Expand Down Expand Up @@ -180,12 +180,6 @@ public Set<Component> allComponents()
return Components.ALL_COMPONENTS;
}

@Override
public Set<Component> streamingComponents()
{
return Components.STREAM_COMPONENTS;
}

@Override
public Set<Component> primaryComponents()
{
Expand Down