Skip to content

Commit

Permalink
rename new sstables with correct generation at load time
Browse files Browse the repository at this point in the history
patch by jbellis; reviewed by slebresne for CASSANDRA-3967
  • Loading branch information
jbellis committed Mar 16, 2012
1 parent 6423bfe commit 837ee0e
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 67 deletions.
93 changes: 27 additions & 66 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Expand Up @@ -449,96 +449,67 @@ public synchronized void loadNewSSTables()
{
logger.info("Loading new SSTables for " + table.name + "/" + columnFamily + "...");

// current view over ColumnFamilyStore
DataTracker.View view = data.getView();
// descriptors of currently registered SSTables
Set<Descriptor> currentDescriptors = new HashSet<Descriptor>();
// going to hold new SSTable view of the CFS containing old and new SSTables
Set<SSTableReader> sstables = new HashSet<SSTableReader>();
// get the max generation number, to prevent generation conflicts
int generation = 0;

for (SSTableReader reader : view.sstables)
{
sstables.add(reader); // first of all, add old SSTables
currentDescriptors.add(reader.descriptor);

if (reader.descriptor.generation > generation)
generation = reader.descriptor.generation;
}

SSTableReader reader;
// set to true if we have at least one new SSTable to load
boolean atLeastOneNew = false;
for (SSTableReader sstable : data.getView().sstables)
currentDescriptors.add(sstable.descriptor);
Set<SSTableReader> newSSTables = new HashSet<SSTableReader>();

Directories.SSTableLister lister = directories.sstableLister().skipCompacted(true).skipTemporary(true);
for (Map.Entry<Descriptor, Set<Component>> rawSSTable : lister.list().entrySet())
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
{
Descriptor descriptor = rawSSTable.getKey();
Descriptor descriptor = entry.getKey();

if (currentDescriptors.contains(descriptor))
continue; // old (initialized) SSTable found, skipping
if (descriptor.temporary) // in the process of being written
continue;

if (!descriptor.isCompatible())
throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s",
Descriptor.CURRENT_VERSION,
descriptor));

logger.info("Initializing new SSTable {}", rawSSTable);
Descriptor newDescriptor = new Descriptor(descriptor.directory,
descriptor.ksname,
descriptor.cfname,
fileIndexGenerator.incrementAndGet(),
false);
logger.info("Renaming new SSTable {} to {}", descriptor, newDescriptor);
SSTableWriter.rename(descriptor, newDescriptor, entry.getValue());

SSTableReader reader;
try
{
Set<DecoratedKey> savedKeys = CacheService.instance.keyCache.readSaved(descriptor.ksname, descriptor.cfname);
reader = SSTableReader.open(rawSSTable.getKey(), rawSSTable.getValue(), savedKeys, data, metadata, partitioner);
reader = SSTableReader.open(newDescriptor, entry.getValue(), Collections.<DecoratedKey>emptySet(), data, metadata, partitioner);
}
catch (IOException e)
{
SSTableReader.logOpenException(rawSSTable.getKey(), e);
SSTableReader.logOpenException(entry.getKey(), e);
continue;
}

sstables.add(reader);

if (descriptor.generation > generation)
generation = descriptor.generation;

if (!atLeastOneNew) // set flag only once
atLeastOneNew = true;
newSSTables.add(reader);
}

if (!atLeastOneNew)
if (newSSTables.isEmpty())
{
logger.info("No new SSTables where found for " + table.name + "/" + columnFamily);
return;
}

logger.info("Loading new SSTables and building secondary indexes for " + table.name + "/" + columnFamily + ": " + sstables);
SSTableReader.acquireReferences(sstables);
data.addSSTables(sstables);
logger.info("Loading new SSTables and building secondary indexes for " + table.name + "/" + columnFamily + ": " + newSSTables);
SSTableReader.acquireReferences(newSSTables);
data.addSSTables(newSSTables);
try
{
indexManager.maybeBuildSecondaryIndexes(sstables, indexManager.getIndexedColumns());
indexManager.maybeBuildSecondaryIndexes(newSSTables, indexManager.getIndexedColumns());
}
catch (IOException e)
{
throw new IOError(e);
}
finally
{
SSTableReader.releaseReferences(sstables);
}

if (fileIndexGenerator.get() < generation)
{
// we don't bother with CAS here since if the generations used in the new files overlap with
// files that we create during load, we're already screwed
logger.info("Setting up new generation: " + generation);
fileIndexGenerator.set(generation);
}
else
{
logger.warn("Largest generation seen in loaded sstables was {}, which may overlap with native sstable files (generation {}).",
generation, fileIndexGenerator.get());
SSTableReader.releaseReferences(newSSTables);
}

logger.info("Done loading load new SSTables for " + table.name + "/" + columnFamily);
Expand Down Expand Up @@ -1621,22 +1592,12 @@ public Iterable<DecoratedKey<?>> keySamples(Range<Token> range)
}

/**
* For testing. no effort is made to clear historical memtables, nor for
* thread safety
* For testing. No effort is made to clear historical or even the current memtables, nor for
* thread safety. All we do is wipe the sstable containers clean, while leaving the actual
* data files present on disk. (This allows tests to easily call loadNewSSTables on them.)
*/
public void clearUnsafe()
{
fileIndexGenerator.set(0); // Avoid unit test failures (see CASSANDRA-3735).

// Clear backups
Directories.SSTableLister lister = directories.sstableLister().onlyBackups(true);
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
{
Descriptor desc = entry.getKey();
for (Component comp : entry.getValue())
FileUtils.delete(desc.filenameFor(comp));
}

for (ColumnFamilyStore cfs : concatWithIndexes())
cfs.data.init();
}
Expand Down
7 changes: 6 additions & 1 deletion src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
Expand Up @@ -363,6 +363,12 @@ private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetada
static Descriptor rename(Descriptor tmpdesc, Set<Component> components)
{
Descriptor newdesc = tmpdesc.asTemporary(false);
rename(tmpdesc, newdesc, components);
return newdesc;
}

public static void rename(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components)
{
try
{
// do -Data last because -Data present should mean the sstable was completely renamed before crash
Expand All @@ -374,7 +380,6 @@ static Descriptor rename(Descriptor tmpdesc, Set<Component> components)
{
throw new IOError(e);
}
return newdesc;
}

public long getFilePointer()
Expand Down

0 comments on commit 837ee0e

Please sign in to comment.