Skip to content

Commit

Permalink
STAR-1257 Port CNDB-2292 - fix memory leak due to lack of call to Bac…
Browse files Browse the repository at this point in the history
…kgroundCompactions.setPending (apache#420)

Adds UnifiedCompactionStrategy.setPendingCompactionAggregates for use by external callers.

Co-authored-by: Stefania Alborghetti <stef1927@users.noreply.github.com>
(cherry picked from commit 76c73d6)
  • Loading branch information
djatnieks authored and jacek-lewandowski committed May 27, 2022
1 parent 4f4a59e commit 741ee33
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 10 deletions.
Expand Up @@ -103,12 +103,14 @@ synchronized void setPending(CompactionStrategy strategy, Collection<? extends C
// Then add all the pending aggregates
for (CompactionAggregate aggregate : pending)
{
CompactionAggregate prev = aggregatesMap.put(aggregate.getKey(), aggregate);
CompactionAggregate prev = aggregatesMap.get(aggregate.getKey());
if (logger.isTraceEnabled())
logger.trace("Adding new pending aggregate: {}", aggregate);
logger.trace("Adding new pending aggregate: prev={}, current={}", prev, aggregate);

if (prev != null)
throw new IllegalArgumentException("Received pending aggregates with non unique keys: " + prev.getKey());
if (prev == null)
aggregatesMap.put(aggregate.getKey(), aggregate);
else
aggregatesMap.put(aggregate.getKey(), prev.withAdditionalCompactions(aggregate.getActive()));
}

// Then add the old aggregates but only if they have ongoing compactions
Expand Down
Expand Up @@ -493,6 +493,8 @@ private Collection<CompactionAggregate> getNextCompactionAggregates(int gcBefore
final CompactionLimits limits = getCurrentLimits(controller.maxConcurrentCompactions());

List<CompactionAggregate.UnifiedAggregate> pending = getPendingCompactionAggregates(limits.spaceAvailable, gcBefore);
setPendingCompactionAggregates(pending);

for (CompactionAggregate.UnifiedAggregate aggregate : pending)
{
// The space overhead limit also applies when a single compaction is above that limit. This should
Expand Down Expand Up @@ -546,6 +548,26 @@ public Collection<CompactionAggregate.UnifiedAggregate> getPendingCompactionAggr
return getPendingCompactionAggregates(controller.maxCompactionSpaceBytes(), gcBefore);
}

/**
* Set the compaction aggregates passed in as pending in {@link BackgroundCompactions}. This ensures
* that the compaction statistics will be accurate.
* <p/>
* This is called by {@link UnifiedCompactionStrategy#getNextCompactionAggregates(int)}
* and externally after calling {@link UnifiedCompactionStrategy#getPendingCompactionAggregates(int)}
* or before submitting tasks.
*
* Also, note that skipping the call to {@link BackgroundCompactions#setPending(CompactionStrategy, Collection)}
* would result in memory leaks: the aggregates added in {@link BackgroundCompactions#setSubmitted(CompactionStrategy, UUID, CompactionAggregate)}
* would never be removed, and the aggregates hold references to the compaction tasks, so they retain a significant
* size of heap memory.
*
* @param pending the aggregates that should be set as pending compactions
*/
public void setPendingCompactionAggregates(Collection<? extends CompactionAggregate> pending)
{
backgroundCompactions.setPending(this, pending);
}

private List<CompactionAggregate.UnifiedAggregate> getPendingCompactionAggregates(long spaceAvailable, int gcBefore)
{
maybeUpdateSelector();
Expand Down Expand Up @@ -578,8 +600,6 @@ private List<CompactionAggregate.UnifiedAggregate> getPendingCompactionAggregate
}
}

// Update the tracked background tasks.
backgroundCompactions.setPending(this, pending);
return pending;
}

Expand Down
Expand Up @@ -39,6 +39,7 @@

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -122,17 +123,33 @@ public void testNullPendingCompactions()
backgroundCompactions.setPending(strategyContainer, null);
}

@Test(expected = IllegalArgumentException.class)
public void testDuplicatePendingCompactions()
@Test
public void testDuplicatePendingCompactionsAreMerged()
{
BackgroundCompactions backgroundCompactions = new BackgroundCompactions(cfs);

List<CompactionAggregate> pending = new ArrayList<>(0);
CompactionAggregate prev = null;
for (int i = 0; i < 5; i++)
pending.add(mockAggregate(1, 1, 0));
{
CompactionAggregate aggregate = mockAggregate(1, 1, 0);
pending.add(aggregate);

if (prev != null)
{
CompactionAggregate combinedAggregate = mockAggregate(1, i + 1, 0);
when(prev.withAdditionalCompactions(anyCollection())).thenReturn(combinedAggregate);
}


// Two compactions with the same key are invalid
prev = aggregate;
}

// Compactions with the same key are merged
backgroundCompactions.setPending(strategyContainer, pending);

assertEquals(pending.size(), backgroundCompactions.getEstimatedRemainingTasks());
assertEquals(pending.size(), backgroundCompactions.getTotalCompactions());
}

@Test
Expand Down

0 comments on commit 741ee33

Please sign in to comment.