Skip to content

Commit

Permalink
Merge pull request #13956 from ahmetmircik/fix/3.11/CacheExpirationBo…
Browse files Browse the repository at this point in the history
…uncingMemberTest3

Sent queued expired keys on commit migration step
  • Loading branch information
mmedenjak committed Oct 15, 2018
2 parents 99a59a1 + 1c55a2c commit e6d277b
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ public void beforeMigration(PartitionMigrationEvent event) {

@Override
public void commitMigration(PartitionMigrationEvent event) {
expirationManager.onCommitMigration(event);

if (event.getMigrationEndpoint() == MigrationEndpoint.SOURCE) {
clearCachesHavingLesserBackupCountThan(event.getPartitionId(), event.getNewReplicaIndex());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,6 @@ public CacheClearExpiredRecordsTask(CachePartitionSegment[] containers, NodeEngi
super(SERVICE_NAME, containers, CLEANUP_OPERATION_COUNT, CLEANUP_PERCENTAGE, TASK_PERIOD_SECONDS, nodeEngine);
}

@Override
public void sendResponse(Operation op, Object response) {
CachePartitionSegment container = containers[op.getPartitionId()];
Iterator<ICacheRecordStore> iterator = container.recordStoreIterator();
while (iterator.hasNext()) {
tryToSendBackupExpiryOp(iterator.next(), false);
}
}

@Override
public void tryToSendBackupExpiryOp(ICacheRecordStore store, boolean checkIfReachedBatch) {
InvalidationQueue<ExpiredKey> expiredKeys = store.getExpiredKeysQueue();
Expand All @@ -125,6 +116,11 @@ public void tryToSendBackupExpiryOp(ICacheRecordStore store, boolean checkIfReac
toBackupSender.trySendExpiryOp(store, expiredKeys, totalBackupCount, partitionId, checkIfReachedBatch);
}

@Override
public Iterator<ICacheRecordStore> storeIterator(CachePartitionSegment container) {
return container.recordStoreIterator();
}

@Override
protected Operation newPrimaryExpiryOp(int expirationPercentage, CachePartitionSegment container) {
return new CacheClearExpiredOperation(expirationPercentage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.hazelcast.spi.ObjectNamespace;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.ServiceNamespace;
import com.hazelcast.util.Clock;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -67,7 +66,7 @@ public CacheReplicationOperation() {
}

public final void prepare(CachePartitionSegment segment, Collection<ServiceNamespace> namespaces,
int replicaIndex) {
int replicaIndex) {

for (ServiceNamespace namespace : namespaces) {
ObjectNamespace ns = (ObjectNamespace) namespace;
Expand Down Expand Up @@ -144,7 +143,6 @@ protected void writeInternal(ObjectDataOutput out)
}
int count = data.size();
out.writeInt(count);
long now = Clock.currentTimeMillis();
for (Map.Entry<String, Map<Data, CacheRecord>> entry : data.entrySet()) {
Map<Data, CacheRecord> cacheMap = entry.getValue();
int subCount = cacheMap.size();
Expand All @@ -154,9 +152,6 @@ protected void writeInternal(ObjectDataOutput out)
final Data key = e.getKey();
final CacheRecord record = e.getValue();

if (record.isExpiredAt(now)) {
continue;
}
out.writeData(key);
out.writeObject(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -42,7 +43,7 @@
import static java.lang.Integer.parseInt;
import static java.lang.Math.min;

@SuppressWarnings("checkstyle:magicnumber")
@SuppressWarnings({"checkstyle:magicnumber", "checkstyle:methodcount"})
@SuppressFBWarnings({"URF_UNREAD_FIELD"})
public abstract class ClearExpiredRecordsTask<T, S> implements Runnable {

Expand All @@ -61,9 +62,9 @@ public abstract class ClearExpiredRecordsTask<T, S> implements Runnable {
private final Address thisAddress;
private final InternalOperationService operationService;
private final AtomicBoolean singleRunPermit = new AtomicBoolean(false);
private final AtomicInteger partitionLostCounter = new AtomicInteger();
private final AtomicInteger lostPartitionCounter = new AtomicInteger();

private volatile int lastSeenPartitionLostCount;
private volatile int lastKnownLostPartitionCount;

private int runningCleanupOperationsCount;

Expand Down Expand Up @@ -121,15 +122,15 @@ private void runInternal() {
runningCleanupOperationsCount = 0;

long nowInMillis = nowInMillis();
boolean shouldEqualizeBackupSizeWithPrimary = shouldEqualizeBackupSizeWithPrimary();
boolean lostPartitionDetected = lostPartitionDetected();

List<T> containersToProcess = null;
for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
T container = this.containers[partitionId];

IPartition partition = partitionService.getPartition(partitionId, false);
if (partition.isLocal()) {
if (shouldEqualizeBackupSizeWithPrimary) {
if (lostPartitionDetected) {
equalizeBackupSizeWithPrimary(container);
}
} else {
Expand Down Expand Up @@ -180,8 +181,8 @@ private boolean canProcessContainer(T container, IPartition partition, long nowI
* remove leftover backup entries. Otherwise leftover entries can remain on
* backups forever.
*/
public final void partitionLost(PartitionLostEvent event) {
partitionLostCounter.incrementAndGet();
public final void partitionLost(PartitionLostEvent ignored) {
lostPartitionCounter.incrementAndGet();
}

private static long nowInMillis() {
Expand All @@ -191,14 +192,14 @@ private static long nowInMillis() {
/**
* see {@link #partitionLost}
*/
private boolean shouldEqualizeBackupSizeWithPrimary() {
boolean equalizeBackupSizeWithPrimary = false;
int currentPartitionLostCount = partitionLostCounter.get();
if (currentPartitionLostCount != lastSeenPartitionLostCount) {
lastSeenPartitionLostCount = currentPartitionLostCount;
equalizeBackupSizeWithPrimary = true;
private boolean lostPartitionDetected() {
int currentLostPartitionCount = lostPartitionCounter.get();
if (currentLostPartitionCount == lastKnownLostPartitionCount) {
return false;
}
return equalizeBackupSizeWithPrimary;

lastKnownLostPartitionCount = currentLostPartitionCount;
return true;
}

private static int calculateCleanupOperationCount(HazelcastProperties properties,
Expand Down Expand Up @@ -262,6 +263,17 @@ public Operation apply(S recordStore, Collection<ExpiredKey> expiredKeys) {
};
}

public final void sendResponse(Operation op, Object response) {
sendQueuedExpiredKeys(containers[op.getPartitionId()]);
}

public final void sendQueuedExpiredKeys(T container) {
Iterator<S> storeIterator = storeIterator(container);
while (storeIterator.hasNext()) {
tryToSendBackupExpiryOp(storeIterator.next(), false);
}
}

// only used for testing purposes
int getCleanupPercentage() {
return cleanupPercentage;
Expand Down Expand Up @@ -301,6 +313,8 @@ int getCleanupOperationCount() {

public abstract void tryToSendBackupExpiryOp(S store, boolean checkIfReachedBatch);

public abstract Iterator<S> storeIterator(T container);

/**
* Used when traversing partitions.
* Map needs to traverse both backup and primary partitions due to catch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import com.hazelcast.partition.PartitionLostEvent;
import com.hazelcast.partition.PartitionLostListener;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.PartitionMigrationEvent;
import com.hazelcast.spi.TaskScheduler;
import com.hazelcast.spi.partition.MigrationEndpoint;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -131,6 +133,19 @@ public void onClusterStateChange(ClusterState newState) {
}
}

/**
* On commit migration step, we need to send already queued expired keys if
* migration source end point is a primary replica.
*
* @param event partition migration event
*/
public void onCommitMigration(PartitionMigrationEvent event) {
if (event.getMigrationEndpoint() == MigrationEndpoint.SOURCE
&& event.getCurrentReplicaIndex() == 0) {
task.sendQueuedExpiredKeys(task.containers[event.getPartitionId()]);
}
}

/**
* Called upon shutdown of {@link com.hazelcast.map.impl.MapService}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,19 @@
*/
class MapMigrationAwareService implements FragmentedMigrationAwareService {

protected final PartitionContainer[] containers;
protected final MapServiceContext mapServiceContext;
protected final SerializationService serializationService;

MapMigrationAwareService(MapServiceContext mapServiceContext) {
this.mapServiceContext = mapServiceContext;
this.serializationService = mapServiceContext.getNodeEngine().getSerializationService();
this.containers = mapServiceContext.getPartitionContainers();
}

@Override
public Collection<ServiceNamespace> getAllServiceNamespaces(PartitionReplicationEvent event) {
PartitionContainer container = mapServiceContext.getPartitionContainer(event.getPartitionId());
return container.getAllNamespaces(event.getReplicaIndex());
return containers[event.getPartitionId()].getAllNamespaces(event.getReplicaIndex());
}

@Override
Expand Down Expand Up @@ -115,9 +116,8 @@ private void flushAndRemoveQueryCaches(PartitionMigrationEvent event) {
@Override
public Operation prepareReplicationOperation(PartitionReplicationEvent event) {
int partitionId = event.getPartitionId();
PartitionContainer container = mapServiceContext.getPartitionContainer(partitionId);

Operation operation = new MapReplicationOperation(container, partitionId, event.getReplicaIndex());
Operation operation = new MapReplicationOperation(containers[partitionId], partitionId, event.getReplicaIndex());
operation.setService(mapServiceContext.getService());
operation.setNodeEngine(mapServiceContext.getNodeEngine());

Expand All @@ -126,13 +126,12 @@ public Operation prepareReplicationOperation(PartitionReplicationEvent event) {

@Override
public Operation prepareReplicationOperation(PartitionReplicationEvent event, Collection<ServiceNamespace> namespaces) {

assert assertAllKnownNamespaces(namespaces);

int partitionId = event.getPartitionId();
PartitionContainer container = mapServiceContext.getPartitionContainer(partitionId);

Operation operation = new MapReplicationOperation(container, namespaces, partitionId, event.getReplicaIndex());
Operation operation = new MapReplicationOperation(containers[partitionId],
namespaces, partitionId, event.getReplicaIndex());
operation.setService(mapServiceContext.getService());
operation.setNodeEngine(mapServiceContext.getNodeEngine());

Expand All @@ -148,6 +147,8 @@ private boolean assertAllKnownNamespaces(Collection<ServiceNamespace> namespaces

@Override
public void commitMigration(PartitionMigrationEvent event) {
mapServiceContext.getExpirationManager().onCommitMigration(event);

if (event.getMigrationEndpoint() == DESTINATION) {
populateIndexes(event, TargetIndexes.GLOBAL);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;

Expand Down Expand Up @@ -119,25 +120,23 @@ public boolean canPrimaryDriveExpiration() {
}

@Override
public void sendResponse(Operation op, Object response) {
public void tryToSendBackupExpiryOp(RecordStore store, boolean checkIfReachedBatch) {
if (!canPrimaryDriveExpiration()) {
return;
}

for (RecordStore recordStore : containers[op.getPartitionId()].getMaps().values()) {
tryToSendBackupExpiryOp(recordStore, false);
}
}

@Override
public void tryToSendBackupExpiryOp(RecordStore store, boolean checkIfReachedBatch) {
InvalidationQueue expiredKeys = store.getExpiredKeysQueue();
int totalBackupCount = store.getMapContainer().getTotalBackupCount();
int partitionId = store.getPartitionId();

toBackupSender.trySendExpiryOp(store, expiredKeys, totalBackupCount, partitionId, checkIfReachedBatch);
}

@Override
public Iterator<RecordStore> storeIterator(PartitionContainer container) {
return container.getMaps().values().iterator();
}

@Override
protected Operation newPrimaryExpiryOp(int expirationPercentage, PartitionContainer container) {
int partitionId = container.getPartitionId();
Expand Down Expand Up @@ -264,5 +263,4 @@ protected long getLastCleanupTime(PartitionContainer container) {
public String toString() {
return MapClearExpiredRecordsTask.class.getName();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
@Category(SlowTest.class)
public class CacheExpirationBouncingMemberTest extends HazelcastTestSupport {

private static final long FIVE_MINUNTES = 5 * 60 * 1000;

private String cacheName = "test";
private int backupCount = 3;
private int keySpace = 1000;
Expand All @@ -71,7 +73,7 @@ protected CacheConfig getCacheConfig() {
return cacheConfig;
}

@Test
@Test(timeout = FIVE_MINUNTES)
public void backups_should_be_empty_after_expiration() {
Runnable[] methods = new Runnable[2];
HazelcastInstance testDriver = bounceMemberRule.getNextTestDriver();
Expand Down

0 comments on commit e6d277b

Please sign in to comment.