diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java index 9c5e048a1295..98e2b6f7ef12 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java @@ -121,7 +121,7 @@ public SSTableMultiWriter read(DataInputPlus in) throws IOException prettyPrintMemory(bytesRead), prettyPrintMemory(totalSize)); - writer.writeComponent(component.type, in, length); + writer.writeComponent(component, in, length); session.progress(writer.descriptor.fileFor(component).toString(), ProgressInfo.Direction.IN, length, length, length); bytesRead += length; diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java index 88ecff8a4464..9569769eba65 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java @@ -65,7 +65,7 @@ public CassandraOutgoingFile(StreamOperation operation, Ref ref, this.filename = sstable.getFilename(); this.shouldStreamEntireSSTable = computeShouldStreamEntireSSTables(); - ComponentManifest manifest = ComponentManifest.create(sstable.descriptor); + ComponentManifest manifest = ComponentManifest.create(sstable); this.header = makeHeader(sstable, operation, sections, estimatedKeys, shouldStreamEntireSSTable, manifest); } @@ -154,7 +154,7 @@ public void write(StreamSession session, StreamingDataOutputPlus out, int versio // redistribution, otherwise file sizes recorded in component manifest will be different from actual // file sizes. // Recreate the latest manifest and hard links for mutatable components in case they are modified. - try (ComponentContext context = sstable.runWithLock(ignored -> ComponentContext.create(sstable.descriptor))) + try (ComponentContext context = sstable.runWithLock(ignored -> ComponentContext.create(sstable))) { CassandraStreamHeader current = makeHeader(sstable, operation, sections, estimatedKeys, true, context.manifest()); CassandraStreamHeader.serializer.serialize(current, out, version); diff --git a/src/java/org/apache/cassandra/db/streaming/ComponentContext.java b/src/java/org/apache/cassandra/db/streaming/ComponentContext.java index 164dd6ba5c22..c03e7b4c3436 100644 --- a/src/java/org/apache/cassandra/db/streaming/ComponentContext.java +++ b/src/java/org/apache/cassandra/db/streaming/ComponentContext.java @@ -28,6 +28,7 @@ import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileUtils; @@ -44,8 +45,9 @@ private ComponentContext(Map hardLinks, ComponentManifest manif this.manifest = manifest; } - public static ComponentContext create(Descriptor descriptor) + public static ComponentContext create(SSTable sstable) { + Descriptor descriptor = sstable.descriptor; Map hardLinks = new HashMap<>(1); for (Component component : descriptor.getFormat().mutableComponents()) @@ -59,7 +61,7 @@ public static ComponentContext create(Descriptor descriptor) hardLinks.put(component, hardlink); } - return new ComponentContext(hardLinks, ComponentManifest.create(descriptor)); + return new ComponentContext(hardLinks, ComponentManifest.create(sstable)); } public ComponentManifest manifest() diff --git a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java index 5e3cc0c6b6cf..f220be3a48e3 100644 --- a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java +++ b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java @@ -24,6 +24,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; @@ -33,7 +34,7 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.sstable.Component; -import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -52,13 +53,14 @@ public ComponentManifest(Map components) } @VisibleForTesting - public static ComponentManifest create(Descriptor descriptor) + public static ComponentManifest create(SSTable sstable) { - LinkedHashMap components = new LinkedHashMap<>(descriptor.getFormat().streamingComponents().size()); + Set streamingComponents = sstable.getStreamingComponents(); + LinkedHashMap components = new LinkedHashMap<>(streamingComponents.size()); - for (Component component : descriptor.getFormat().streamingComponents()) + for (Component component : streamingComponents) { - File file = descriptor.fileFor(component); + File file = sstable.descriptor.fileFor(component); if (!file.exists()) continue; diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java b/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java index 670ba82c094b..4d802ccae98a 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java +++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java @@ -18,8 +18,15 @@ package org.apache.cassandra.index.sai.disk.format; +import java.util.regex.Pattern; + import org.apache.cassandra.index.sai.disk.v1.postings.PostingsWriter; import org.apache.cassandra.index.sai.disk.v1.trie.TrieTermsDictionaryWriter; +import org.apache.cassandra.io.sstable.Component; + +import static org.apache.cassandra.index.sai.disk.format.Version.SAI_DESCRIPTOR; +import static org.apache.cassandra.index.sai.disk.format.Version.SAI_SEPARATOR; + /** * This is a definitive list of all the on-disk components for all versions @@ -80,9 +87,20 @@ public enum IndexComponent GROUP_COMPLETION_MARKER("GroupComplete"); public final String name; + public final Component.Type type; IndexComponent(String name) { this.name = name; + this.type = componentType(name); + } + + private static Component.Type componentType(String name) + { + String componentName = SAI_DESCRIPTOR + SAI_SEPARATOR + name; + String repr = Pattern.quote(SAI_DESCRIPTOR + SAI_SEPARATOR) + + ".*" + + Pattern.quote(SAI_SEPARATOR + name + ".db"); + return Component.Type.create(componentName, repr, true, null); } } diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/Version.java b/src/java/org/apache/cassandra/index/sai/disk/format/Version.java index 16e84d94c708..8dd57581a3b2 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/format/Version.java +++ b/src/java/org/apache/cassandra/index/sai/disk/format/Version.java @@ -29,7 +29,6 @@ import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.format.SSTableFormat; /** * Format version of indexing component, denoted as [major][minor]. Same forward-compatibility rules apply as to @@ -37,8 +36,8 @@ */ public class Version implements Comparable { - private static final String SAI_DESCRIPTOR = "SAI"; - private static final String SAI_SEPARATOR = "+"; + public static final String SAI_DESCRIPTOR = "SAI"; + public static final String SAI_SEPARATOR = "+"; // Current version public static final Version AA = new Version("aa", V1OnDiskFormat.instance, (c, i) -> defaultFileNameFormat(c, i, "aa")); @@ -113,12 +112,12 @@ public OnDiskFormat onDiskFormat() public Component makePerSSTableComponent(IndexComponent indexComponent) { - return SSTableFormat.Components.Types.CUSTOM.createComponent(fileNameFormatter.format(indexComponent, null)); + return indexComponent.type.createComponent(fileNameFormatter.format(indexComponent, null)); } public Component makePerIndexComponent(IndexComponent indexComponent, IndexContext indexContext) { - return SSTableFormat.Components.Types.CUSTOM.createComponent(fileNameFormatter.format(indexComponent, indexContext)); + return indexComponent.type.createComponent(fileNameFormatter.format(indexComponent, indexContext)); } public FileNameFormatter fileNameFormatter() diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java index c55d4b07ef51..189d9b6a5f55 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java @@ -22,6 +22,7 @@ import java.util.EnumSet; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,13 +55,15 @@ public class V1OnDiskFormat implements OnDiskFormat { private static final Logger logger = LoggerFactory.getLogger(V1OnDiskFormat.class); - private static final Set PER_SSTABLE_COMPONENTS = EnumSet.of(IndexComponent.GROUP_COMPLETION_MARKER, + @VisibleForTesting + public static final Set PER_SSTABLE_COMPONENTS = EnumSet.of(IndexComponent.GROUP_COMPLETION_MARKER, IndexComponent.GROUP_META, IndexComponent.TOKEN_VALUES, IndexComponent.PRIMARY_KEY_TRIE, IndexComponent.PRIMARY_KEY_BLOCKS, IndexComponent.PRIMARY_KEY_BLOCK_OFFSETS); - private static final Set LITERAL_COMPONENTS = EnumSet.of(IndexComponent.COLUMN_COMPLETION_MARKER, + @VisibleForTesting + public static final Set LITERAL_COMPONENTS = EnumSet.of(IndexComponent.COLUMN_COMPLETION_MARKER, IndexComponent.META, IndexComponent.TERMS_DATA, IndexComponent.POSTING_LISTS); @@ -170,10 +173,11 @@ public boolean validatePerSSTableIndexComponents(IndexDescriptor indexDescriptor { if (logger.isDebugEnabled()) { - logger.debug(indexDescriptor.logMessage("{} failed for index component {} on SSTable {}"), - (checksum ? "Checksum validation" : "Validation"), + logger.debug(indexDescriptor.logMessage("{} failed for index component {} on SSTable {}. Error: {}"), + checksum ? "Checksum validation" : "Validation", indexComponent, - indexDescriptor.sstableDescriptor); + indexDescriptor.sstableDescriptor, + e); } return false; } diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java index f2eea992a3de..0d89cf0b927d 100644 --- a/src/java/org/apache/cassandra/io/sstable/Component.java +++ b/src/java/org/apache/cassandra/io/sstable/Component.java @@ -52,6 +52,7 @@ public final static class Type public final int id; public final String name; public final String repr; + public final boolean streamable; private final Component singleton; @SuppressWarnings("rawtypes") @@ -60,31 +61,34 @@ public final static class Type /** * Creates a new non-singleton type and registers it a global type registry - see {@link #registerType(Type)}. * - * @param name type name, must be unique for this and all parent formats - * @param repr the regular expression to be used to recognize a name represents this type - * @param formatClass format class for which this type is defined for + * @param name type name, must be unique for this and all parent formats + * @param repr the regular expression to be used to recognize a name represents this type + * @param streamable whether components of this type should be streamed to other nodes + * @param formatClass format class for which this type is defined for */ - public static Type create(String name, String repr, Class> formatClass) + public static Type create(String name, String repr, boolean streamable, Class> formatClass) { - return new Type(name, repr, false, formatClass); + return new Type(name, repr, false, streamable, formatClass); } /** * Creates a new singleton type and registers it in a global type registry - see {@link #registerType(Type)}. * - * @param name type name, must be unique for this and all parent formats - * @param repr the regular expression to be used to recognize a name represents this type - * @param formatClass format class for which this type is defined for + * @param name type name, must be unique for this and all parent formats + * @param repr the regular expression to be used to recognize a name represents this type + * @param streamable whether components of this type should be streamed to other nodes + * @param formatClass format class for which this type is defined for */ - public static Type createSingleton(String name, String repr, Class> formatClass) + public static Type createSingleton(String name, String repr, boolean streamable, Class> formatClass) { - return new Type(name, repr, true, formatClass); + return new Type(name, repr, true, streamable, formatClass); } - private Type(String name, String repr, boolean isSingleton, Class> formatClass) + private Type(String name, String repr, boolean isSingleton, boolean streamable, Class> formatClass) { this.name = Objects.requireNonNull(name); this.repr = repr; + this.streamable = streamable; this.id = typesCollector.size(); this.formatClass = formatClass == null ? SSTableFormat.class : formatClass; this.singleton = isSingleton ? new Component(this) : null; diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java index 7936e2ea53af..475f92beeb49 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTable.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java @@ -26,6 +26,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.stream.Collectors; import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; @@ -162,6 +163,16 @@ public Set getComponents() return ImmutableSet.copyOf(components); } + /** + * Returns all SSTable components that should be streamed. + */ + public Set getStreamingComponents() + { + return components.stream() + .filter(c -> c.type.streamable) + .collect(Collectors.toSet()); + } + public TableMetadata metadata() { return metadata.get(); diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java index 91e6490971cb..5306661b9931 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java @@ -23,10 +23,10 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +50,7 @@ public class SSTableZeroCopyWriter extends SSTable implements SSTableMultiWriter private static final Logger logger = LoggerFactory.getLogger(SSTableZeroCopyWriter.class); private volatile SSTableReader finalReader; - private final Map componentWriters; + private final Map componentWriters; // indexed by component name public SSTableZeroCopyWriter(Builder builder, LifecycleNewTracker lifecycleNewTracker, @@ -61,12 +61,14 @@ public SSTableZeroCopyWriter(Builder builder, lifecycleNewTracker.trackNew(this); this.componentWriters = new HashMap<>(); - if (!descriptor.getFormat().streamingComponents().containsAll(components)) - throw new AssertionError(format("Unsupported streaming component detected %s", - Sets.difference(ImmutableSet.copyOf(components), descriptor.getFormat().streamingComponents()))); + Set unsupported = components.stream() + .filter(c -> !c.type.streamable) + .collect(Collectors.toSet()); + if (!unsupported.isEmpty()) + throw new AssertionError(format("Unsupported streaming components detected: %s", unsupported)); for (Component c : components) - componentWriters.put(c.type, makeWriter(descriptor, c)); + componentWriters.put(c.name, makeWriter(descriptor, c)); } @Override @@ -195,14 +197,16 @@ public void close() writer.close(); } - public void writeComponent(Component.Type type, DataInputPlus in, long size) throws ClosedChannelException + public void writeComponent(Component component, DataInputPlus in, long size) throws ClosedChannelException { - logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size)); + @SuppressWarnings({"resource", "RedundantSuppression"}) // all writers are closed in close() + SequentialWriter writer = componentWriters.get(component.name); + logger.info("Writing component {} to {} length {}", component, writer.getPath(), prettyPrintMemory(size)); if (in instanceof AsyncStreamingInputPlus) - write((AsyncStreamingInputPlus) in, size, componentWriters.get(type)); + write((AsyncStreamingInputPlus) in, size, writer); else - write(in, size, componentWriters.get(type)); + write(in, size, writer); } private void write(AsyncStreamingInputPlus in, long size, SequentialWriter writer) throws ClosedChannelException diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable_API.md b/src/java/org/apache/cassandra/io/sstable/SSTable_API.md index 6a0440655e80..7fb23ca4d8d9 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTable_API.md +++ b/src/java/org/apache/cassandra/io/sstable/SSTable_API.md @@ -77,19 +77,19 @@ Apart from the generic components, each sstable format implementation may descri For example, the _big table_ format describes additionally `PRIMARY_INDEX` and `SUMMARY` singleton types and the corresponding singleton components (see [`BigFormat.Components`](format/big/BigFormat.java)). -Custom types can be created with one of the `Component.Type.create(name, repr, formatClass)`, -`Component.Type.createSingleton(name, repr, formatClass)` methods. Each created type is registered in a global types' -registry. Types registry is hierarchical which means that an sstable implementation may use types defined for its -format class and for all parent format classes (for example, the types defined for the `BigFormat` class extend the set -of types defined for the `SSTableFormat` interface). +Custom types can be created with one of the `Component.Type.create(name, repr, streamable, formatClass)`, +`Component.Type.createSingleton(name, repr, streamable, formatClass)` methods. Each created type is registered in +a global types' registry. Types registry is hierarchical which means that an sstable implementation may use types +defined for its format class and for all parent format classes (for example, the types defined for the `BigFormat` class +extend the set of types defined for the `SSTableFormat` interface). For example, types defined for `BigFormat`: ```java public static class Types extends SSTableFormat.Components.Types { - public static final Component.Type PRIMARY_INDEX = Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", BigFormat.class); - public static final Component.Type SUMMARY = Component.Type.createSingleton("SUMMARY", "Summary.db", BigFormat.class); + public static final Component.Type PRIMARY_INDEX = Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", true, BigFormat.class); + public static final Component.Type SUMMARY = Component.Type.createSingleton("SUMMARY", "Summary.db", true, BigFormat.class); } ``` diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java index 1caa87b9cce3..654880c2c175 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java @@ -59,7 +59,6 @@ public interface SSTableFormat */ Set allComponents(); - Set streamingComponents(); Set primaryComponents(); @@ -156,23 +155,23 @@ public static class Types { // the base data for an sstable: the remaining components can be regenerated // based on the data component - public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", null); + public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", true, null); // file to hold information about uncompressed data length, chunk offsets etc. - public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", null); + public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", true, null); // statistical metadata about the content of the sstable - public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", null); + public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", true, null); // serialized bloom filter for the row keys in the sstable - public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", null); + public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", true, null); // holds CRC32 checksum of the data file - public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", null); + public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", true, null); // holds the CRC32 for chunks in an uncompressed file. - public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", null); + public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", true, null); // table of contents, stores the list of all components for the sstable - public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", null); + public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", false, null); // built-in secondary index (may exist multiple per sstable) - public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", null); + public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", false, null); // custom component, used by e.g. custom compaction strategy - public static final Component.Type CUSTOM = Component.Type.create("CUSTOM", null, null); + public static final Component.Type CUSTOM = Component.Type.create("CUSTOM", null, true, null); } // singleton components for types that don't need ids diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java index 97f244ca6670..bee089a16d03 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java @@ -82,9 +82,9 @@ public static class Components extends SSTableFormat.Components public static class Types extends SSTableFormat.Components.Types { // index of the row keys with pointers to their positions in the data file - public static final Component.Type PRIMARY_INDEX = Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", BigFormat.class); + public static final Component.Type PRIMARY_INDEX = Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", true, BigFormat.class); // holds SSTable Index Summary (sampling of Index component) - public static final Component.Type SUMMARY = Component.Type.createSingleton("SUMMARY", "Summary.db", BigFormat.class); + public static final Component.Type SUMMARY = Component.Type.createSingleton("SUMMARY", "Summary.db", true, BigFormat.class); } public final static Component PRIMARY_INDEX = Types.PRIMARY_INDEX.getSingleton(); @@ -109,16 +109,6 @@ public static class Types extends SSTableFormat.Components.Types SUMMARY, COMPRESSION_INFO, STATS); - - private static final Set STREAM_COMPONENTS = ImmutableSet.of(DATA, - PRIMARY_INDEX, - STATS, - COMPRESSION_INFO, - FILTER, - SUMMARY, - DIGEST, - CRC); - private static final Set ALL_COMPONENTS = ImmutableSet.of(DATA, PRIMARY_INDEX, STATS, @@ -180,12 +170,6 @@ public Set allComponents() return Components.ALL_COMPONENTS; } - @Override - public Set streamingComponents() - { - return Components.STREAM_COMPONENTS; - } - @Override public Set primaryComponents() { diff --git a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.java b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.java index 45cba4fcaed9..d60c3369d64a 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.java @@ -71,23 +71,14 @@ public static class Components extends SSTableFormat.Components { public static class Types extends AbstractSSTableFormat.Components.Types { - public static final Component.Type PARTITION_INDEX = Component.Type.createSingleton("PARTITION_INDEX", "Partitions.db", BtiFormat.class); - public static final Component.Type ROW_INDEX = Component.Type.createSingleton("ROW_INDEX", "Rows.db", BtiFormat.class); + public static final Component.Type PARTITION_INDEX = Component.Type.createSingleton("PARTITION_INDEX", "Partitions.db", true, BtiFormat.class); + public static final Component.Type ROW_INDEX = Component.Type.createSingleton("ROW_INDEX", "Rows.db", true, BtiFormat.class); } public final static Component PARTITION_INDEX = Types.PARTITION_INDEX.getSingleton(); public final static Component ROW_INDEX = Types.ROW_INDEX.getSingleton(); - private final static Set STREAMING_COMPONENTS = ImmutableSet.of(DATA, - PARTITION_INDEX, - ROW_INDEX, - STATS, - COMPRESSION_INFO, - FILTER, - DIGEST, - CRC); - private final static Set PRIMARY_COMPONENTS = ImmutableSet.of(DATA, PARTITION_INDEX); @@ -159,12 +150,6 @@ public BtiTableReaderFactory getReaderFactory() return readerFactory; } - @Override - public Set streamingComponents() - { - return Components.STREAMING_COMPONENTS; - } - @Override public Set primaryComponents() { diff --git a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java new file mode 100644 index 000000000000..3069e807a825 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.sai; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; + +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.Row; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.distributed.util.QueryResultUtil; +import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat; +import org.assertj.core.api.Assertions; + +import static org.assertj.core.api.Assertions.assertThat; + +public class IndexStreamingTest extends TestBaseImpl +{ + // streaming sends events every 65k, so need to make sure that the files are larger than this to hit + // all cases of the vtable - hence we add a big enough blob column + private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]); + private static final int NUM_COMPONENTS; + + static + { + DatabaseDescriptor.clientInitialization(); + NUM_COMPONENTS = sstableStreamingComponentsCount() + + V1OnDiskFormat.PER_SSTABLE_COMPONENTS.size() + + V1OnDiskFormat.LITERAL_COMPONENTS.size(); + } + + private static int sstableStreamingComponentsCount() + { + return (int) DatabaseDescriptor.getSelectedSSTableFormat() + .allComponents() + .stream() + .filter(c -> c.type.streamable) + .count() - 1; // -1 because we don't include the compression component + } + + @Test + public void zeroCopy() throws IOException + { + test(true); + } + + @Test + public void notZeroCopy() throws IOException + { + test(false); + } + + private void test(boolean zeroCopyStreaming) throws IOException + { + try (Cluster cluster = init(Cluster.build(2) + .withConfig(c -> c.with(Feature.values()) + .set("stream_entire_sstables", zeroCopyStreaming) + .set("streaming_slow_events_log_timeout", "0s")) + .start())) + { + cluster.schemaChange(withKeyspace( + "CREATE TABLE %s.test (pk int PRIMARY KEY, v text, b blob) WITH compression = { 'enabled' : false };" + )); + cluster.schemaChange(withKeyspace( + "CREATE CUSTOM INDEX ON %s.test(v) USING 'StorageAttachedIndex';" + )); + cluster.stream().forEach(i -> + i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success() + ); + IInvokableInstance first = cluster.get(1); + IInvokableInstance second = cluster.get(2); + long sstableCount = 10; + long expectedFiles = zeroCopyStreaming ? sstableCount * NUM_COMPONENTS : sstableCount; + for (int i = 0; i < sstableCount; i++) + { + first.executeInternal(withKeyspace("insert into %s.test(pk, v, b) values (?, ?, ?)"), i, "v" + i, BLOB); + first.flush(KEYSPACE); + } + + second.nodetoolResult("rebuild", "--keyspace", KEYSPACE).asserts().success(); + + SimpleQueryResult qr = first.executeInternalWithResult("SELECT * FROM system_views.streaming"); + String txt = QueryResultUtil.expand(qr); + qr.reset(); + assertThat(qr.toObjectArrays().length).describedAs("Found rows\n%s", txt).isEqualTo(1); + assertThat(qr.hasNext()).isTrue(); + Row row = qr.next(); + QueryResultUtil.assertThat(row) + .isEqualTo("peers", Collections.singletonList(second.broadcastAddress().toString())) + .isEqualTo("follower", true) + .isEqualTo("operation", "Rebuild") + .isEqualTo("status", "success") + .isEqualTo("progress_percentage", 100.0F) + .isEqualTo("success_message", null).isEqualTo("failure_cause", null) + .isEqualTo("files_sent", expectedFiles) + .columnsEqualTo("files_sent", "files_to_send") + .columnsEqualTo("bytes_sent", "bytes_to_send") + .isEqualTo("files_received", 0L) + .columnsEqualTo("files_received", "files_to_receive", "bytes_received", "bytes_to_receive"); + long totalBytes = row.getLong("bytes_sent"); + assertThat(totalBytes).isGreaterThan(0); + + qr = second.executeInternalWithResult("SELECT * FROM system_views.streaming"); + txt = QueryResultUtil.expand(qr); + qr.reset(); + assertThat(qr.toObjectArrays().length).describedAs("Found rows\n%s", txt).isEqualTo(1); + assertThat(qr.hasNext()).isTrue(); + + QueryResultUtil.assertThat(qr.next()) + .isEqualTo("peers", Collections.singletonList(first.broadcastAddress().toString())) + .isEqualTo("follower", false) + .isEqualTo("operation", "Rebuild") + .isEqualTo("status", "success") + .isEqualTo("progress_percentage", 100.0F) + .isEqualTo("success_message", null).isEqualTo("failure_cause", null) + .columnsEqualTo("files_to_receive", "files_received").isEqualTo("files_received", expectedFiles) + .columnsEqualTo("bytes_to_receive", "bytes_received").isEqualTo("bytes_received", totalBytes) + .columnsEqualTo("files_sent", "files_to_send", "bytes_sent", "bytes_to_send").isEqualTo("files_sent", 0L); + + // did we trigger slow event log? + cluster.forEach(i -> Assertions.assertThat(i.logs().grep("Handling streaming events took longer than").getResult()) + .describedAs("Unable to find slow log for node%d", i.config().num()) + .isNotEmpty()); + + for (int i = 0; i < sstableCount; i++) + { + Object[][] rs = second.executeInternal(withKeyspace("select pk from %s.test where v = ?"), "v" + i); + assertThat(rs.length).isEqualTo(1); + assertThat(rs[0][0]).isEqualTo(i); + } + } + } +} diff --git a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java index 59acfc6ae696..dd522d0805c4 100644 --- a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java +++ b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java @@ -121,7 +121,7 @@ public void setupBenchmark() throws IOException sstable = store.getLiveSSTables().iterator().next(); session = setupStreamingSessionForTest(); - context = ComponentContext.create(sstable.descriptor); + context = ComponentContext.create(sstable); blockStreamWriter = new CassandraEntireSSTableStreamWriter(sstable, session, context); CapturingNettyChannel blockStreamCaptureChannel = new CapturingNettyChannel(STREAM_SIZE); diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java index 898da7c78290..1b5112076d5a 100644 --- a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java +++ b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java @@ -117,7 +117,7 @@ public void testBlockWriterOverWire() throws IOException EmbeddedChannel channel = new EmbeddedChannel(); try (AsyncStreamingOutputPlus out = new AsyncStreamingOutputPlus(channel); - ComponentContext context = ComponentContext.create(descriptor)) + ComponentContext context = ComponentContext.create(sstable)) { CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, context); @@ -140,7 +140,7 @@ public void testBlockReadingAndWritingOverWire() throws Throwable ByteBuf serializedFile = Unpooled.buffer(8192); EmbeddedChannel channel = createMockNettyChannel(serializedFile); try (AsyncStreamingOutputPlus out = new AsyncStreamingOutputPlus(channel); - ComponentContext context = ComponentContext.create(descriptor)) + ComponentContext context = ComponentContext.create(sstable)) { CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, context); writer.write(out); diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java index a0e4e510681a..851ac9cb3f77 100644 --- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java +++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java @@ -108,7 +108,7 @@ public void transferedSizeWithZeroCopyStreamingTest() // verify all component on-disk length is used for ZCS CassandraStreamHeader header = header(true, true); long transferedSize = header.size(); - assertEquals(ComponentManifest.create(sstable.descriptor).totalSize(), transferedSize); + assertEquals(ComponentManifest.create(sstable).totalSize(), transferedSize); assertEquals(transferedSize, header.calculateSize()); // verify that computing file chunks doesn't change transferred size for ZCS @@ -142,7 +142,7 @@ private CassandraStreamHeader header(boolean entireSSTable, boolean compressed) TableMetadata metadata = store.metadata(); SerializationHeader.Component serializationHeader = SerializationHeader.makeWithoutStats(metadata).toComponent(); - ComponentManifest componentManifest = entireSSTable ? ComponentManifest.create(sstable.descriptor) : null; + ComponentManifest componentManifest = entireSSTable ? ComponentManifest.create(sstable) : null; DecoratedKey firstKey = entireSSTable ? sstable.first : null; return CassandraStreamHeader.builder() diff --git a/test/unit/org/apache/cassandra/io/sstable/ComponentTest.java b/test/unit/org/apache/cassandra/io/sstable/ComponentTest.java index a9b76303f262..e58238c444ba 100644 --- a/test/unit/org/apache/cassandra/io/sstable/ComponentTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/ComponentTest.java @@ -57,18 +57,18 @@ public void testTypes() Function componentFactory = Mockito.mock(Function.class); // do not allow to define a type with the same name or repr as the existing type for this or parent format - assertThatExceptionOfType(AssertionError.class).isThrownBy(() -> Type.createSingleton(Components.Types.TOC.name, Components.Types.TOC.repr + "x", Format1.class)); - assertThatExceptionOfType(AssertionError.class).isThrownBy(() -> Type.createSingleton(Components.Types.TOC.name + "x", Components.Types.TOC.repr, Format2.class)); + assertThatExceptionOfType(AssertionError.class).isThrownBy(() -> Type.createSingleton(Components.Types.TOC.name, Components.Types.TOC.repr + "x", true, Format1.class)); + assertThatExceptionOfType(AssertionError.class).isThrownBy(() -> Type.createSingleton(Components.Types.TOC.name + "x", Components.Types.TOC.repr, true, Format2.class)); // allow to define a format with other name and repr - Type t1 = Type.createSingleton("ONE", "One.db", Format1.class); + Type t1 = Type.createSingleton("ONE", "One.db", true, Format1.class); // allow to define a format with the same name and repr for two different formats - Type t2f1 = Type.createSingleton("TWO", "Two.db", Format1.class); - Type t2f2 = Type.createSingleton("TWO", "Two.db", Format2.class); + Type t2f1 = Type.createSingleton("TWO", "Two.db", true, Format1.class); + Type t2f2 = Type.createSingleton("TWO", "Two.db", true, Format2.class); assertThat(t2f1).isNotEqualTo(t2f2); - assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> Type.createSingleton(null, "-Three.db", Format1.class)); + assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> Type.createSingleton(null, "-Three.db", true, Format1.class)); assertThat(Type.fromRepresentation("should be custom", BigFormat.getInstance())).isSameAs(Components.Types.CUSTOM); assertThat(Type.fromRepresentation(Components.Types.TOC.repr, BigFormat.getInstance())).isSameAs(Components.Types.TOC); @@ -80,10 +80,10 @@ public void testTypes() @Test public void testComponents() { - Type t3f1 = Type.createSingleton("THREE", "Three.db", Format1.class); - Type t3f2 = Type.createSingleton("THREE", "Three.db", Format2.class); - Type t4f1 = Type.create("FOUR", ".*-Four.db", Format1.class); - Type t4f2 = Type.create("FOUR", ".*-Four.db", Format2.class); + Type t3f1 = Type.createSingleton("THREE", "Three.db", true, Format1.class); + Type t3f2 = Type.createSingleton("THREE", "Three.db", true, Format2.class); + Type t4f1 = Type.create("FOUR", ".*-Four.db", true, Format1.class); + Type t4f2 = Type.create("FOUR", ".*-Four.db", true, Format2.class); Component c1 = t3f1.getSingleton(); Component c2 = t3f2.getSingleton(); diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableZeroCopyWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableZeroCopyWriterTest.java index 11b92ac4994d..4d64a73602cb 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableZeroCopyWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableZeroCopyWriterTest.java @@ -168,7 +168,7 @@ private void writeDataTestCycle(Function bufferMapper try { - btzcw.writeComponent(component.type, pair.left, pair.right); + btzcw.writeComponent(component, pair.left, pair.right); } catch (ClosedChannelException e) { diff --git a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java index 5a8f4a58cfae..a4022cb34604 100644 --- a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java +++ b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java @@ -137,7 +137,7 @@ public void test() throws Exception int totalNumberOfFiles = session.transfers.get(store.metadata.id).getTotalNumberOfFiles(); - assertEquals(ComponentManifest.create(sstable.descriptor).components().size(), totalNumberOfFiles); + assertEquals(ComponentManifest.create(sstable).components().size(), totalNumberOfFiles); assertEquals(streamEventHandler.fileNames.size(), totalNumberOfFiles); }