Skip to content

Commit

Permalink
Clear schedulers for ttl upon clear of replicated-map otherwise memor…
Browse files Browse the repository at this point in the history
…y leaks
  • Loading branch information
ahmetmircik committed Jul 6, 2018
1 parent 8f24d51 commit a2bcdfd
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 15 deletions.
Expand Up @@ -28,11 +28,16 @@
import com.hazelcast.nio.serialization.PortableFactory;
import com.hazelcast.nio.serialization.PortableReader;
import com.hazelcast.nio.serialization.PortableWriter;
import com.hazelcast.replicatedmap.impl.ReplicatedMapProxy;
import com.hazelcast.replicatedmap.impl.ReplicatedMapService;
import com.hazelcast.replicatedmap.impl.record.AbstractBaseReplicatedRecordStore;
import com.hazelcast.replicatedmap.impl.record.ReplicatedRecordStore;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParametersRunnerFactory;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.util.scheduler.SecondsBasedEntryTaskScheduler;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -52,6 +57,7 @@
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -520,6 +526,35 @@ public Portable create(int classId) {
assertEquals(666, samplePortable.a);
}

@Test
public void clear_empties_internal_ttl_schedulers() {
String mapName = "test";
HazelcastInstance node = hazelcastFactory.newHazelcastInstance(config);
HazelcastInstance client = hazelcastFactory.newHazelcastClient();

ReplicatedMap map = client.getReplicatedMap(mapName);

for (int i = 0; i < 1000; i++) {
map.put(i, i, 100, TimeUnit.DAYS);
}

map.clear();

assertAllTtlSchedulersEmpty(node.getReplicatedMap(mapName));
}

private static void assertAllTtlSchedulersEmpty(ReplicatedMap map) {
String mapName = map.getName();
ReplicatedMapProxy replicatedMapProxy = (ReplicatedMapProxy) map;
ReplicatedMapService service = (ReplicatedMapService) replicatedMapProxy.getService();
Collection<ReplicatedRecordStore> stores = service.getAllReplicatedRecordStores(mapName);
for (ReplicatedRecordStore store : stores) {
assertEquals(0,
((SecondsBasedEntryTaskScheduler) ((AbstractBaseReplicatedRecordStore) store)
.getTtlEvictionScheduler()).size());
}
}

@SuppressWarnings("unchecked")
private static SimpleEntry<Integer, Integer>[] buildTestValues() {
Random random = new Random();
Expand Down
Expand Up @@ -78,6 +78,11 @@ public AtomicReference<InternalReplicatedMapStorage<K, V>> getStorageRef() {
return storageRef;
}

// only used for testing purposes
public EntryTaskScheduler getTtlEvictionScheduler() {
return ttlEvictionScheduler;
}

@Override
public int getPartitionId() {
return partitionId;
Expand All @@ -100,6 +105,14 @@ public void destroy() {
}
}

protected InternalReplicatedMapStorage<K, V> clearInternal() {
InternalReplicatedMapStorage<K, V> storage = getStorage();
storage.clear();
getStats().incrementOtherOperations();
ttlEvictionScheduler.cancelAll();
return storage;
}

@Override
public long getVersion() {
return storageRef.get().getVersion();
Expand Down
Expand Up @@ -289,18 +289,12 @@ public int size() {

@Override
public void clear() {
InternalReplicatedMapStorage<K, V> storage = getStorage();
storage.clear();
storage.incrementVersion();
getStats().incrementOtherOperations();
clearInternal().incrementVersion();
}

@Override
public void clearWithVersion(long version) {
InternalReplicatedMapStorage<K, V> storage = getStorage();
storage.clear();
storage.setVersion(version);
getStats().incrementOtherOperations();
clearInternal().setVersion(version);
}

@Override
Expand Down
Expand Up @@ -47,10 +47,16 @@
* @param <K> entry key type
* @param <V> entry value type
*/
final class SecondsBasedEntryTaskScheduler<K, V> implements EntryTaskScheduler<K, V> {
public final class SecondsBasedEntryTaskScheduler<K, V> implements EntryTaskScheduler<K, V> {

/**
* hash-map initial capacity
*/
public static final int INITIAL_CAPACITY = 10;

/**
* @see #ceilToSecond(long)
*/
public static final double FACTOR = 1000d;

private static final long INITIAL_TIME_MILLIS = Clock.currentTimeMillis();
Expand All @@ -67,9 +73,13 @@ public int compare(ScheduledEntry o1, ScheduledEntry o2) {
}
};

/** Map of keys to duration between this class being loaded and the time the key is scheduled */
/**
* Map of keys to duration between this class being loaded and the time the key is scheduled
*/
private final Map<Object, Integer> secondsOfKeys = new HashMap<Object, Integer>(1000);
/** Map from duration (see {@link #findRelativeSecond(long)} to scheduled key to scheduled entry map. */
/**
* Map from duration (see {@link #findRelativeSecond(long)} to scheduled key to scheduled entry map.
*/
private final Map<Integer, Map<Object, ScheduledEntry<K, V>>> scheduledEntries
= new HashMap<Integer, Map<Object, ScheduledEntry<K, V>>>(1000);
private final Map<Integer, ScheduledFuture> scheduledTaskMap = new HashMap<Integer, ScheduledFuture>(1000);
Expand Down Expand Up @@ -240,7 +250,9 @@ private int cancelByCompositeKey(K key, ScheduledEntry<K, V> entryToRemove) {
return cancelled;
}

/** Return all composite keys with the given {@code key} */
/**
* Return all composite keys with the given {@code key}
*/
private Set<CompositeKey> getCompositeKeys(K key) {
Set<CompositeKey> candidateKeys = new HashSet<CompositeKey>();
for (Object keyObj : secondsOfKeys.keySet()) {
Expand All @@ -252,7 +264,9 @@ private Set<CompositeKey> getCompositeKeys(K key) {
return candidateKeys;
}

/** Returns one scheduled entry for the given {@code key} with no guaranteed ordering */
/**
* Returns one scheduled entry for the given {@code key} with no guaranteed ordering
*/
public ScheduledEntry<K, V> getByCompositeKey(K key) {
Set<CompositeKey> candidateKeys = getCompositeKeys(key);
ScheduledEntry<K, V> result = null;
Expand Down Expand Up @@ -372,13 +386,15 @@ public String toString() {
}

// just for testing
int size() {
public int size() {
synchronized (mutex) {
return secondsOfKeys.size();
}
}

/** Returns the duration in seconds between the time this class was loaded and now+{@code delayMillis} */
/**
* Returns the duration in seconds between the time this class was loaded and now+{@code delayMillis}
*/
// package private for testing
static int findRelativeSecond(long delayMillis) {
long now = Clock.currentTimeMillis();
Expand Down
Expand Up @@ -18,18 +18,26 @@

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ReplicatedMap;
import com.hazelcast.replicatedmap.impl.ReplicatedMapProxy;
import com.hazelcast.replicatedmap.impl.ReplicatedMapService;
import com.hazelcast.replicatedmap.impl.record.AbstractBaseReplicatedRecordStore;
import com.hazelcast.replicatedmap.impl.record.ReplicatedRecordStore;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.TestHazelcastInstanceFactory;
import com.hazelcast.test.annotation.SlowTest;
import com.hazelcast.util.scheduler.SecondsBasedEntryTaskScheduler;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

@RunWith(HazelcastSerialClassRunner.class)
@Category(SlowTest.class)
public class ReplicatedMapTtlTest extends ReplicatedMapAbstractTest {
Expand Down Expand Up @@ -105,4 +113,31 @@ public void run() {
}
});
}

@Test
public void clear_empties_internal_ttl_schedulers() {
HazelcastInstance node = createHazelcastInstance();
String mapName = "test";
ReplicatedMap map = node.getReplicatedMap(mapName);

for (int i = 0; i < 1000; i++) {
map.put(i, i, 100, TimeUnit.DAYS);
}

map.clear();

assertAllTtlSchedulersEmpty(map);
}

private static void assertAllTtlSchedulersEmpty(ReplicatedMap map) {
String mapName = map.getName();
ReplicatedMapProxy replicatedMapProxy = (ReplicatedMapProxy) map;
ReplicatedMapService service = (ReplicatedMapService) replicatedMapProxy.getService();
Collection<ReplicatedRecordStore> stores = service.getAllReplicatedRecordStores(mapName);
for (ReplicatedRecordStore store : stores) {
assertEquals(0,
((SecondsBasedEntryTaskScheduler) ((AbstractBaseReplicatedRecordStore) store)
.getTtlEvictionScheduler()).size());
}
}
}

0 comments on commit a2bcdfd

Please sign in to comment.