Skip to content

Commit

Permalink
Avoid leaking references during parallel repairs
Browse files Browse the repository at this point in the history
patch by Marcus Olsson; reviewed by Marcus Eriksson for CASSANDRA-11215
  • Loading branch information
emolsson committed Feb 29, 2016
1 parent ace7fe0 commit f10faff
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Expand Up @@ -49,6 +49,7 @@ Merged from 3.0:
* Add dropped_columns to the list of schema table so it gets handled
properly (CASSANDRA-11050)
Merged from 2.2:
* Reference leak with parallel repairs on the same table (CASSANDRA-11215)
* Range.compareTo() violates the contract of Comparable (CASSANDRA-11216)
* Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167)
* Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840)
Expand Down
66 changes: 38 additions & 28 deletions src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Expand Up @@ -1194,34 +1194,7 @@ private void doValidationCompaction(ColumnFamilyStore cfs, Validator validator)
{
// flush first so everyone is validating data that is as similar as possible
StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, (s) -> !prs.isIncremental || !s.isRepaired()));
Set<SSTableReader> sstablesToValidate = new HashSet<>();

for (SSTableReader sstable : sstableCandidates.sstables)
{
if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(validator.desc.ranges))
{
sstablesToValidate.add(sstable);
}
}

Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);

if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty())
{
logger.error("Cannot start multiple repair sessions over the same sstables");
throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
}

sstables = Refs.tryRef(sstablesToValidate);
if (sstables == null)
{
logger.error("Could not reference sstables");
throw new RuntimeException("Could not reference sstables");
}
sstableCandidates.release();
prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
sstables = getSSTablesToValidate(cfs, validator);

if (validator.gcBefore > 0)
gcBefore = validator.gcBefore;
Expand Down Expand Up @@ -1286,6 +1259,43 @@ private void doValidationCompaction(ColumnFamilyStore cfs, Validator validator)
}
}

private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
{
Refs<SSTableReader> sstables;

ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
Set<SSTableReader> sstablesToValidate = new HashSet<>();
try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, (s) -> !prs.isIncremental || !s.isRepaired())))
{
for (SSTableReader sstable : sstableCandidates.sstables)
{
if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(validator.desc.ranges))
{
sstablesToValidate.add(sstable);
}
}

Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);

if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty())
{
logger.error("Cannot start multiple repair sessions over the same sstables");
throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
}

sstables = Refs.tryRef(sstablesToValidate);
if (sstables == null)
{
logger.error("Could not reference sstables");
throw new RuntimeException("Could not reference sstables");
}
}

prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);

return sstables;
}

/**
* Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second
* will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted
Expand Down

0 comments on commit f10faff

Please sign in to comment.