Skip to content

Commit

Permalink
SSTable format API
Browse files Browse the repository at this point in the history
Summary of the changes:

Format, reader and writer
---------------------------
There are a lot of refactorings around sstable related classes aiming to extract the most generic functionality to the top-level entities and push down implementation-specific stuff to the actual implementation. In Particular, the top-level, implementation agnostic classes/interfaces are SSTableFormat interface, SSTable, SSTableReader, SSTableWriter, IVerifier, and IScrubber. The rest of the codebase has been reviewed for explicit usages of big table format-specific usages of sstable classes and refactored. SSTable, SSTableReader, and SSTableWriter have their builders. Builders make a hierarchy that follows the same inheritance structure as readers and writers.

There are also partial implementations that add support for some features and may or may not be used by the custom implementations. They include:
- AbstractSSTableFormat - adds an implementation of some initialization methods - in practice, all of the format implementations should extend this class
- SSTableReaderWithFilter - add support for Bloom filter to the reader
- SortedTableWriter - generic implementation for a writer which writes partitions in the default order to the data file, supports Bloom filter and some index of partitions
- IndexSummarySupport - interface implemented by the readers using index summaries
- KeyCacheSupport - interface implemented by the readers using row key cache

Descriptor
---------------------------
Refactored the Descriptor class so that:
- All paths are created from the base directory File rather than from a String
- All the methods named *filename* producing full paths were made private; their current implementations are returning file names rather than paths (the naming was inconsistent)
- The usages of the `filenameFor` method were refactored to use the `fileFor` method
- The usages of the `fromFilename` method were refactored to use a  `fromFileWithComponent(..., false).left` expression
In essence, the Descriptor class is no longer working on String-based paths.

Index summaries
---------------------------
Removed the index summary from the generic SSTableReader class and created an interface IndexSummarySupport to be implemented by the readers that need it. Methods in related classes that refer back to the reader were refactored to support just readers of the SSTableReader & IndexSummarySupport type. Therefore, we will no longer need to assume that the generic SSTableReader has anything to do with an index summary.

A new IndexSummaryComponent class encloses data fields from the index summary file (note that aside from the index summary itself, the file includes the first and last partition of the sstable). The class has been extracted to deal with those fields and have that logic in a single place.

Filter
---------------------------
Refactored IFilter and its serialization - in particular, added the `serialize` method to the IFilter interface and moved loading/saving logic to a separate utility class FilterComponent.
Extracted the SSTableReaderWithFilter abstract reader extending the generic SSTableReader with filter support.
Extracted bloom filter metrics into separate entities allowing to plug them in if the implementation uses a filter.

Cache
---------------------------
Refactored CacheService to support different key-cache values. CacheService now supports arbitrary IRowIndexEntry implementation as a key-cache value. A new version of the auto-saving cache was created ("g") because some information about the type of serialized row index entry needs to be known before it is deserialized (or skipped). Therefore, the SSTableFormat type ordinal number is stored, which is sufficient because the IRowIndexEntry serializer is specific to the sstable format type.
Similarly to the IndexSummarySupport, a new KeyCacheSupport interface has to be implemented to mark the reader as supporting key-cache. It contains the default implementation of several methods the rest of the system relies on when the key-cache is supported.

Other changes
---------------------------
- Fixed disabling chunk cache - enable(boolean) method in ChunkCache does not make any sense - it makes a false impression it can disable chunk cache once enabled, while in fact, it only clears it. Added setFileCacheEnabled to DatabaseDescriptor

- Made WrappingUnfilteredRowIterator an interface

- DataInputStreamPlus extends InputStream - this makes it possible for input stream-based inheritors of DataInputPlus to extend DataInputStreamPlus. It simplifies coding because sometimes we want to get DataInputPlus implementation extending InputStream as an argument.

- Table and keyspace metrics were made pluggable - in particular, added the ability for a certain format to register gauges that are specific only to that format and make no sense for others

- Implemented mmapped region extension for compressed data

- Refactored FileHandle so that it is no longer closable

- Implemented WrappingRebufferer

- Introduced the SSTable.Owner interface to make SSTable implementation not reference higher-level entities directly. SSTable accepts passing null as the owner when there is no owner (like sometimes in offline tools) or passing a mock when needed in tests.

Individual commits
---------------------------

[4a87cd3] Fix disabling chunk cache
[c84c75c] Made WrappingUnfilteredRowIterator an interface
[253d2b8] Add getType to SSTableFormat
[3f169dc] Remove getIndexSerializer from SSTableFormat
[05bae18] Pull down rowIndexEntrySerializer field
[da675f2] Moved RowIndexEntry
[673f0c5] Reduce usages of RowIndexEntry
[c72538b] Refactor CacheService to support for different key cache values
[54d33ee] Minor refactoring of ColumnIndex
[93862df] Just moved AbstractSSTableIterator to o.a.c.io.sstable.format
[9e4566a] Refactored AbstractSSTableIterator
[a4e61e8] Extracted IScrubber and IVerifier interfaces
[20f78c7] Push down implementation of SSTableReader.firstKeyBeyond
[f2c24e5] Moved SSTableReader.getSampleIndexesForRanges to IndexSummary
[b6c3a6c] Moved SSTableReader.getKeySamples implementation to IndexSummary
[c4b90eb] Refactor InstanceTidier so that it is more generic
[918d5a9] Refactor dropping page cache
[a52fb4d] Refactor sstable metrics
[f6d10f9] NEW (fix up) - DataInputStreamPlus extends InputStream
[8f6a56d] Getting rid of index summary in SSTableReader
[4a918bf] Removed direct usages of primary index from SSTableReader
[358fa32] Refactor KeyIterator so that it is sstable format agnostic
[14c09d8] Remove explicit usage of Components outside of format specific classes
[feff14e] Move clone methods implementation from SSTableReader to BigTableReader
[64e9787] Move saveIndexSummary and saveBloomFilter to SSTableReaderBuilder
[ae71fe6] Moved indexSummary field to BigTableReader and made it private
[df9fd8c] Moved ifile field to BigTableReader and made it private
[2be6ea9] Moved static open methods for BigTableReader to the reader factory
[bc0e55a] Minor refactoring around IFilter and its serialization
[5b95704] Minor refactorings around IndexSummary
[8781233] Extracted TOCComponent class to deal with TOC file
[fdad092] Extracted CompressionInfoComponent class
[39b47e3] Extracted StatsComponent as a helper for elements of SSTable metadata
[cdb55bf] Fix SSTable.getMinimalKey
[b99c6d5] Refactor FileHandle so that it is no longer closable
[77b7f7a] Implement WrappingRebufferer
[b686891] Add progressPercentage to ProgressInfo
[7fd4956] Moved copy/rename/hardLink methods from SSTableWriter to SSTable
[1ccc6bf] Create generic SSTableBuilder and IOOptions
[da58a81] Refactor SSTableReaderBuilder
[4501ddb] Refactor ColumnIndex
[d4f9e1a] Extracted non-big-table-specific functionality from BigTableWriter to SortedTableWriter
[379525d] Refactor BigTableZeroCopyWriter to SSTableZeroCopyWriter as it is not specific to big format
[8ac37f8] Extract EmptySSTableScanner out from BigTableScanner
[ee6673f] Implement SSTableWriterBuilder
[bb26629] Refactor opening early / final
[a327595] Refactored SSTableWriter factory
[16ffd73] Extract non-big-format-specific logic from scrubber and verifier
[75e02db] Allow to specify the default SSTableFormat via system property
[a7b9d0d] Small fixes around streaming
[407f977] Move guard collection size
[0529e57] Remove explicit references to big format
[6150996] Unclassified minor changes
[da28d1a] Replaced getCreationTimeFor(Component) with getDataCreationTime()
[e99c834] !!! Reformatting
[882b7ba] Rename SSTableReader.maybePresent and fix its redundant usages
[b70c983] Implement mmapped region extension for compressed data
[d7ff397] Introduce SSTable.Owner interface
[e9feb9c] Replaced getCreationTimeFor(Component) with getDataCreationTime()
[ee8082f] Created SSTableFormat.deleteOrphanedComponents
[e62950f] Refactor metrics further
[cefa5b3] Extract key cache support into separate entity
[dd55101] Extracted SSTableReaderWithFilter
[510b651] Implement customizable component types
[2be512d] Pluggable SSTableFormat by making SSTableFormat.Type not an enum
[670836b] Refactor CRC and digest validators
[00c9110] Extract delete method to delete SSTables and purge row cache entries
[0819dc9] Extracted trySkipFileCacheBefore(key) to SSTableReader
[732f841] Added missing overrides in ForwardingSSTableReader
[db62321] Update DatabaseDescriptorRefTest
[c018c46] Cleanup
[eafc836] Add @SuppressWarnings("resource") where needed
[3b7c911] Documentation

patch by Jacek Lewandowski, reviewed by Branimir Lambov for CASSANDRA-17056

Co-authored-by: @jacek-lewandowski
Co-authored-by: @blambov
  • Loading branch information
jacek-lewandowski committed Mar 6, 2023
1 parent 0a0e068 commit b7e1e44
Show file tree
Hide file tree
Showing 299 changed files with 15,596 additions and 9,190 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
4.2
* CEP-17: SSTable API (CASSANDRA-17056)
* Gossip stateMapOrdering does not have correct ordering when both EndpointState are in the bootstrapping set (CASSANDRA-18292)
* Snapshot only sstables containing mismatching ranges on preview repair mismatch (CASSANDRA-17561)
* More accurate skipping of sstables in read path (CASSANDRA-18134)
Expand Down
4 changes: 3 additions & 1 deletion NEWS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ New features
- On virtual tables, it is not strictly necessary to specify `ALLOW FILTERING` for select statements which would
normally require it, except `system_views.system_logs`.
- More accurate skipping of sstables in read path due to better handling of min/max clustering and lower bound;
SSTable format has been bumped to 'nc' because there are new fields in stats metadata
SSTable format has been bumped to 'nc' because there are new fields in stats metadata\
- SSTal

Upgrading
---------
Expand All @@ -140,6 +141,7 @@ Upgrading
upgrades involving 3.x and 4.x nodes. The fix for that issue makes it can now appear during rolling upgrades from
4.1.0 or 4.0.0-4.0.7. If that is your case, please use protocol v4 or higher in your driver. See CASSANDRA-17507
for further details.
- Added API for alternative sstable implementations. For details, see src/java/org/apache/cassandra/io/sstable/SSTable_API.md

Deprecation
-----------
Expand Down
11 changes: 11 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1910,3 +1910,14 @@ drop_compact_storage_enabled: false
# excluded_keyspaces: # comma separated list of keyspaces to exclude from the check
# excluded_tables: # comma separated list of keyspace.table pairs to exclude from the check

# Supported sstable formats
# This is a list of elements consisting of class_name and parameters, where class_name should point to the class
# implementing org.apache.cassandra.io.sstable.format.SSTableFormat. Parameters must include unique 'id' integer
# which is used in some serialization to denote the format type in a compact way (such as local key cache); and 'name'
# which will be used to recognize the format type - in particular that name will be used in sstable file names and in
# stream headers so the name has to be the same for the same format across all the nodes in the cluster.
sstable_formats:
- class_name: org.apache.cassandra.io.sstable.format.big.BigFormat
parameters:
id: 0
name: big
53 changes: 33 additions & 20 deletions src/java/org/apache/cassandra/cache/AutoSavingCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
*/
package org.apache.cassandra.cache;

import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.NoSuchFileException;
import java.util.*;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

Expand All @@ -33,19 +33,28 @@

import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionInfo.Unit;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.compaction.CompactionInfo.Unit;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.io.util.ChecksummedRandomAccessReader;
import org.apache.cassandra.io.util.ChecksummedSequentialWriter;
import org.apache.cassandra.io.util.CorruptFileException;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.SequentialWriterOption;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
Expand All @@ -59,8 +68,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
{
public interface IStreamFactory
{
InputStream getInputStream(File dataPath, File crcPath) throws IOException;
OutputStream getOutputStream(File dataPath, File crcPath);
DataInputStreamPlus getInputStream(File dataPath, File crcPath) throws IOException;

DataOutputStreamPlus getOutputStream(File dataPath, File crcPath);
}

private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
Expand All @@ -85,8 +95,10 @@ public interface IStreamFactory
* "e" introduced with CASSANDRA-11206, omits IndexInfo from key-cache, stores offset into index-file
*
* "f" introduced with CASSANDRA-9425, changes "keyspace.table.index" in cache keys to TableMetadata.id+TableMetadata.indexName
*
* "g" introduced an explicit sstable format type ordinal number so that the entry can be skipped regardless of the actual implementation and used serializer
*/
private static final String CURRENT_VERSION = "f";
private static final String CURRENT_VERSION = "g";

private static volatile IStreamFactory streamFactory = new IStreamFactory()
{
Expand All @@ -95,12 +107,12 @@ public interface IStreamFactory
.trickleFsyncByteInterval(DatabaseDescriptor.getTrickleFsyncIntervalInKiB() * 1024)
.finishOnClose(true).build();

public InputStream getInputStream(File dataPath, File crcPath) throws IOException
public DataInputStreamPlus getInputStream(File dataPath, File crcPath) throws IOException
{
return ChecksummedRandomAccessReader.open(dataPath, crcPath);
}

public OutputStream getOutputStream(File dataPath, File crcPath)
public DataOutputStreamPlus getOutputStream(File dataPath, File crcPath)
{
return new ChecksummedSequentialWriter(dataPath, crcPath, null, writerOption);
}
Expand Down Expand Up @@ -175,6 +187,7 @@ public Future<Integer> loadSavedAsync()
return cacheLoad;
}

@SuppressWarnings("resource")
public int loadSaved()
{
int count = 0;
Expand All @@ -189,15 +202,15 @@ public int loadSaved()
try
{
logger.info("reading saved cache {}", dataPath);
in = new DataInputStreamPlus(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(dataPath, crcPath)), dataPath.length()));
in = streamFactory.getInputStream(dataPath, crcPath);

//Check the schema has not changed since CFs are looked up by name which is ambiguous
UUID schemaVersion = new UUID(in.readLong(), in.readLong());
if (!schemaVersion.equals(Schema.instance.getVersion()))
throw new RuntimeException("Cache schema version "
+ schemaVersion
+ " does not match current schema version "
+ Schema.instance.getVersion());
+ schemaVersion
+ " does not match current schema version "
+ Schema.instance.getVersion());

ArrayDeque<Future<Pair<K, V>>> futures = new ArrayDeque<>();
long loadByNanos = start + TimeUnit.SECONDS.toNanos(DatabaseDescriptor.getCacheLoadTimeout());
Expand Down Expand Up @@ -268,7 +281,7 @@ public int loadSaved()
}
if (logger.isTraceEnabled())
logger.trace("completed reading ({} ms; {} keys) saved cache {}",
TimeUnit.NANOSECONDS.toMillis(nanoTime() - start), count, dataPath);
TimeUnit.NANOSECONDS.toMillis(nanoTime() - start), count, dataPath);
return count;
}

Expand Down
25 changes: 12 additions & 13 deletions src/java/org/apache/cassandra/cache/ChunkCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,22 @@
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;

import com.github.benmanes.caffeine.cache.*;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.ChunkReader;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.io.util.Rebufferer;
import org.apache.cassandra.io.util.RebuffererFactory;
import org.apache.cassandra.metrics.ChunkCacheMetrics;
import org.apache.cassandra.utils.memory.BufferPool;
import org.apache.cassandra.utils.memory.BufferPools;
Expand Down Expand Up @@ -170,7 +177,7 @@ public void close()
cache.invalidateAll();
}

private RebuffererFactory wrap(ChunkReader file)
public RebuffererFactory wrap(ChunkReader file)
{
return new CachingRebufferer(file);
}
Expand All @@ -196,14 +203,6 @@ public void invalidateFile(String fileName)
cache.invalidateAll(Iterables.filter(cache.asMap().keySet(), x -> x.path.equals(fileName)));
}

@VisibleForTesting
public void enable(boolean enabled)
{
ChunkCache.enabled = enabled;
cache.invalidateAll();
metrics.reset();
}

// TODO: Invalidate caches for obsoleted/MOVED_START tables?

/**
Expand Down Expand Up @@ -319,4 +318,4 @@ public long weightedSize()
.map(policy -> policy.weightedSize().orElseGet(cache::estimatedSize))
.orElseGet(cache::estimatedSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ public enum CassandraRelevantProperties
SEED_COUNT_WARN_THRESHOLD("cassandra.seed_count_warn_threshold"),


SSTABLE_FORMAT_DEFAULT("cassandra.sstable.format.default"),


/** When enabled, recursive directory deletion will be executed using a unix command `rm -rf` instead of traversing
* and removing individual files. This is now used only tests, but eventually we will make it true by default.*/
USE_NIX_RECURSIVE_DELETE("cassandra.use_nix_recursive_delete"),
Expand All @@ -356,6 +359,7 @@ public enum CassandraRelevantProperties
* can be also done manually for that particular case: {@code flush(SchemaConstants.SCHEMA_KEYSPACE_NAME);}. */
FLUSH_LOCAL_SCHEMA_CHANGES("cassandra.test.flush_local_schema_changes", "true"),

TOMBSTONE_HISTOGRAM_TTL_ROUND_SECONDS("cassandra.streaminghistogram.roundseconds", "60"),
;

CassandraRelevantProperties(String key, String defaultVal)
Expand Down Expand Up @@ -561,6 +565,21 @@ public <T extends Enum<T>> T getEnum(boolean toUppercase, T defaultValue) {
return Enum.valueOf(defaultValue.getDeclaringClass(), toUppercase ? value.toUpperCase() : value);
}

/**
* Gets the value of a system property as an enum, optionally calling {@link String#toUpperCase()} first.
* If the value is missing, the default value for this property is used
*
* @param toUppercase before converting to enum
* @param enumClass enumeration class
* @param <T> type
* @return enum value
*/
public <T extends Enum<T>> T getEnum(boolean toUppercase, Class<T> enumClass)
{
String value = System.getProperty(key, defaultVal);
return Enum.valueOf(enumClass, toUppercase ? value.toUpperCase() : value);
}

/**
* Sets the value into system properties.
* @param value to set
Expand Down
15 changes: 12 additions & 3 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,27 @@
import java.lang.reflect.Modifier;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Supplier;

import javax.annotation.Nullable;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.audit.AuditLogOptions;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.fql.FullQueryLoggerOptions;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.service.StartupChecks.StartupCheckType;

/**
Expand Down Expand Up @@ -70,6 +72,9 @@ public static Set<String> splitCommaDelimited(String src)
*/
public static final String PROPERTY_PREFIX = "cassandra.";

public static final String SSTABLE_FORMAT_ID = "id";
public static final String SSTABLE_FORMAT_NAME = "name";

public String cluster_name = "Test Cluster";
public String authenticator;
public String authorizer;
Expand Down Expand Up @@ -352,6 +357,10 @@ public MemtableOptions()

public String[] data_file_directories = new String[0];

public List<ParameterizedClass> sstable_formats = ImmutableList.of(new ParameterizedClass(BigFormat.class.getName(),// "org.apache.cassandra.io.sstable.format.big.BigFormat",
ImmutableMap.of(SSTABLE_FORMAT_ID, "0",
SSTABLE_FORMAT_NAME, "big")));

/**
* The directory to use for storing the system keyspaces data.
* If unspecified the data will be stored in the first of the data_file_directories.
Expand Down

0 comments on commit b7e1e44

Please sign in to comment.