Skip to content

Commit

Permalink
Refactor MigrationListener API
Browse files Browse the repository at this point in the history
With new listener, an event is published when a new migration process starts
and another event when migration is completed. Those events include stats
about migration process; start time, planned migration count, completed migration count etc.

Additionally, on each replica migration, both for primary and backup replica migrations,
a migration event is published. This event includes partition-id, replica-index and
migration progress stats.
  • Loading branch information
mdogan committed Jun 18, 2019
1 parent 5d9b23b commit a1c15ca
Show file tree
Hide file tree
Showing 38 changed files with 1,279 additions and 571 deletions.
Expand Up @@ -24,10 +24,10 @@
import com.hazelcast.client.spi.EventHandler;
import com.hazelcast.client.spi.impl.ListenerMessageCodec;
import com.hazelcast.cluster.Member;
import com.hazelcast.internal.partition.PartitionLostEventImpl;
import com.hazelcast.nio.Address;
import com.hazelcast.partition.MigrationListener;
import com.hazelcast.partition.Partition;
import com.hazelcast.partition.PartitionLostEvent;
import com.hazelcast.partition.PartitionLostListener;
import com.hazelcast.partition.PartitionService;

Expand Down Expand Up @@ -141,7 +141,7 @@ private static class ClientPartitionLostEventHandler extends ClientAddPartitionL

@Override
public void handlePartitionLostEventV10(int partitionId, int lostBackupCount, Address source) {
listener.partitionLost(new PartitionLostEvent(partitionId, lostBackupCount, source));
listener.partitionLost(new PartitionLostEventImpl(partitionId, lostBackupCount, source));
}

@Override
Expand Down
Expand Up @@ -25,9 +25,9 @@
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.config.CacheConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.partition.PartitionLostEventImpl;
import com.hazelcast.spi.EventRegistration;
import com.hazelcast.spi.impl.eventservice.InternalEventService;
import com.hazelcast.spi.partition.IPartitionLostEvent;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
Expand Down Expand Up @@ -111,7 +111,7 @@ public void test_cachePartitionLostListener_invoked() {

final CacheService cacheService = getNode(instance).getNodeEngine().getService(CacheService.SERVICE_NAME);
final int partitionId = 5;
cacheService.onPartitionLost(new IPartitionLostEvent(partitionId, 0, null));
cacheService.onPartitionLost(new PartitionLostEventImpl(partitionId, 0, null));

assertCachePartitionLostEventEventually(listener, partitionId);
}
Expand Down Expand Up @@ -145,8 +145,8 @@ public void test_cachePartitionLostListener_invoked_fromOtherNode() {
final CacheService cacheService1 = getNode(instance1).getNodeEngine().getService(CacheService.SERVICE_NAME);
final CacheService cacheService2 = getNode(instance2).getNodeEngine().getService(CacheService.SERVICE_NAME);
final int partitionId = 5;
cacheService1.onPartitionLost(new IPartitionLostEvent(partitionId, 0, null));
cacheService2.onPartitionLost(new IPartitionLostEvent(partitionId, 0, null));
cacheService1.onPartitionLost(new PartitionLostEventImpl(partitionId, 0, null));
cacheService2.onPartitionLost(new PartitionLostEventImpl(partitionId, 0, null));

assertCachePartitionLostEventEventually(listener, partitionId);
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.hazelcast.config.Config;
import com.hazelcast.core.DistributedObject;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.partition.PartitionLostEventImpl;
import com.hazelcast.map.MapPartitionLostEvent;
import com.hazelcast.map.TestEventCollectingMapPartitionLostListener;
import com.hazelcast.map.impl.MapPartitionLostEventFilter;
Expand All @@ -31,7 +32,6 @@
import com.hazelcast.spi.EventRegistration;
import com.hazelcast.spi.impl.eventservice.InternalEventService;
import com.hazelcast.spi.impl.proxyservice.InternalProxyService;
import com.hazelcast.spi.partition.IPartitionLostEvent;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
Expand Down Expand Up @@ -106,7 +106,7 @@ public void test_mapPartitionLostListener_invoked() {

MapService mapService = getNode(instance).getNodeEngine().getService(MapService.SERVICE_NAME);
int partitionId = 5;
mapService.onPartitionLost(new IPartitionLostEvent(partitionId, 0, null));
mapService.onPartitionLost(new PartitionLostEventImpl(partitionId, 0, null));

assertMapPartitionLostEventEventually(listener, partitionId);
}
Expand Down Expand Up @@ -139,7 +139,7 @@ public void test_mapPartitionLostListener_invoked_fromOtherNode() {

MapService mapService = getNode(other).getNodeEngine().getService(SERVICE_NAME);
int partitionId = 5;
mapService.onPartitionLost(new IPartitionLostEvent(partitionId, 0, null));
mapService.onPartitionLost(new PartitionLostEventImpl(partitionId, 0, null));

assertMapPartitionLostEventEventually(listener, partitionId);
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.partition.PartitionLostEventImpl;
import com.hazelcast.internal.partition.impl.InternalPartitionServiceImpl;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.ObjectDataInput;
Expand All @@ -32,7 +33,6 @@
import com.hazelcast.spi.EventRegistration;
import com.hazelcast.spi.impl.PortablePartitionLostEvent;
import com.hazelcast.spi.impl.eventservice.InternalEventService;
import com.hazelcast.spi.partition.IPartitionLostEvent;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
Expand Down Expand Up @@ -112,7 +112,7 @@ public void test_partitionLostListener_invoked() {

final InternalPartitionServiceImpl partitionService = getNode(instance).getNodeEngine().getService(SERVICE_NAME);
final int partitionId = 5;
partitionService.onPartitionLost(new IPartitionLostEvent(partitionId, 0, null));
partitionService.onPartitionLost(new PartitionLostEventImpl(partitionId, 0, null));

assertPartitionLostEventEventually(listener, partitionId);
}
Expand Down Expand Up @@ -140,7 +140,7 @@ public void test_partitionLostListener_invoked_fromOtherNode() {

final InternalPartitionServiceImpl partitionService = getNode(other).getNodeEngine().getService(SERVICE_NAME);
final int partitionId = 5;
partitionService.onPartitionLost(new IPartitionLostEvent(partitionId, 0, null));
partitionService.onPartitionLost(new PartitionLostEventImpl(partitionId, 0, null));

assertPartitionLostEventEventually(listener, partitionId);
}
Expand Down
Expand Up @@ -20,6 +20,8 @@
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.partition.MigrationEvent;
import com.hazelcast.partition.MigrationListener;
import com.hazelcast.partition.MigrationPlan;
import com.hazelcast.partition.MigrationProgress;
import com.hazelcast.partition.Partition;
import com.hazelcast.partition.PartitionService;
import com.hazelcast.test.HazelcastParallelClassRunner;
Expand Down Expand Up @@ -94,16 +96,22 @@ public void testGetPartitions() {
}

class DumMigrationListener implements MigrationListener {

@Override
public void migrationStarted(MigrationEvent migrationEvent) {
public void migrationProcessStarted(MigrationPlan plan) {
}

@Override
public void migrationCompleted(MigrationEvent migrationEvent) {
public void migrationProcessFinished(MigrationProgress result) {
}

@Override
public void migrationFailed(MigrationEvent migrationEvent) {
public void migrationCompleted(MigrationEvent event) {
}

@Override
public void migrationFailed(MigrationEvent event) {

}
}
}
Expand Up @@ -31,6 +31,8 @@
import com.hazelcast.nio.ConnectionListener;
import com.hazelcast.partition.MigrationEvent;
import com.hazelcast.partition.MigrationListener;
import com.hazelcast.partition.MigrationPlan;
import com.hazelcast.partition.MigrationProgress;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.spi.properties.HazelcastProperty;
Expand Down Expand Up @@ -213,25 +215,30 @@ private void render(DiagnosticsLogWriter writer, MembershipEvent event) {
writer.endSection();
}


private void render(DiagnosticsLogWriter writer, MigrationEvent event) {
switch (event.getStatus()) {
case STARTED:
writer.startSection("MigrationStarted");
break;
case COMPLETED:
writer.startSection("MigrationCompleted");
break;
case FAILED:
writer.startSection("MigrationFailed");
break;
default:
return;
if (event.isSuccess()) {
writer.startSection("MigrationCompleted");
} else {
writer.startSection("MigrationFailed");
}

Member oldOwner = event.getOldOwner();
writer.writeKeyValueEntry("oldOwner", oldOwner == null ? "null" : oldOwner.getAddress().toString());
writer.writeKeyValueEntry("newOwner", event.getNewOwner().getAddress().toString());
Member source = event.getSource();
writer.writeKeyValueEntry("source", source == null ? "null" : source.getAddress().toString());
writer.writeKeyValueEntry("destination", event.getDestination().getAddress().toString());
writer.writeKeyValueEntry("partitionId", event.getPartitionId());
writer.writeKeyValueEntry("replicaIndex", event.getReplicaIndex());
writer.writeKeyValueEntry("elapsedTime(ms)", event.getReplicaIndex());

MigrationProgress migrationProgress = event.getMigrationProgress();
writer.startSection("MigrationProgress");
writer.writeKeyValueEntryAsDateTime("startTime", migrationProgress.getStartTime());
writer.writeKeyValueEntry("plannedMigrations", migrationProgress.getPlannedMigrations());
writer.writeKeyValueEntry("completedMigrations", migrationProgress.getCompletedMigrations());
writer.writeKeyValueEntry("remainingMigrations", migrationProgress.getRemainingMigrations());
writer.writeKeyValueEntry("totalElapsedTime(ms)", migrationProgress.getTotalElapsedTime());
writer.endSection();

writer.endSection();
}

Expand Down Expand Up @@ -322,8 +329,13 @@ public void memberRemoved(MembershipEvent event) {

private class MigrationListenerImpl implements MigrationListener {
@Override
public void migrationStarted(MigrationEvent event) {
logQueue.add(event);
public void migrationProcessStarted(MigrationPlan plan) {
// ignore
}

@Override
public void migrationProcessFinished(MigrationProgress result) {
// ignore
}

@Override
Expand Down
Expand Up @@ -18,6 +18,8 @@

import com.hazelcast.spi.partition.IPartition;

import java.util.stream.IntStream;

public interface InternalPartition extends IPartition {

int MAX_REPLICA_COUNT = MAX_BACKUP_COUNT + 1;
Expand Down Expand Up @@ -55,4 +57,11 @@ public interface InternalPartition extends IPartition {
* @throws ArrayIndexOutOfBoundsException when replica index is out of bounds
*/
PartitionReplica getReplica(int replicaIndex);

/**
* Returns the integer replica indices of {@code InternalPartition} as a stream.
*/
static IntStream replicaIndices() {
return IntStream.range(0, InternalPartition.MAX_REPLICA_COUNT);
}
}
@@ -0,0 +1,134 @@
/*
* Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.internal.partition;

import com.hazelcast.cluster.Member;
import com.hazelcast.internal.partition.impl.PartitionDataSerializerHook;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.partition.MigrationEvent;
import com.hazelcast.partition.MigrationListener;
import com.hazelcast.partition.MigrationProgress;
import com.hazelcast.partition.Partition;
import com.hazelcast.partition.PartitionService;

import java.io.IOException;

/**
* An event fired when a partition migration starts, completes or fails.
*
* @see Partition
* @see PartitionService
* @see MigrationListener
*/
public class MigrationEventImpl implements MigrationEvent, IdentifiedDataSerializable {

private MigrationProgress progress;
private int partitionId;
private int replicaIndex;
private Member source;
private Member destination;
private boolean success;
private long elapsedTime;

public MigrationEventImpl() {
}

public MigrationEventImpl(MigrationProgress progress, int partitionId, int replicaIndex, Member source, Member destination,
boolean success, long elapsedTime) {
this.progress = progress;
this.partitionId = partitionId;
this.replicaIndex = replicaIndex;
this.source = source;
this.destination = destination;
this.success = success;
this.elapsedTime = elapsedTime;
}

@Override
public int getPartitionId() {
return partitionId;
}

@Override
public int getReplicaIndex() {
return replicaIndex;
}

@Override
public Member getSource() {
return source;
}

@Override
public Member getDestination() {
return destination;
}

@Override
public MigrationProgress getMigrationProgress() {
return progress;
}

@Override
public boolean isSuccess() {
return success;
}

@Override
public long getElapsedTime() {
return elapsedTime;
}

public void writeData(ObjectDataOutput out) throws IOException {
out.writeObject(progress);
out.writeInt(partitionId);
out.writeInt(replicaIndex);
out.writeObject(source);
out.writeObject(destination);
out.writeBoolean(success);
out.writeLong(elapsedTime);
}

public void readData(ObjectDataInput in) throws IOException {
progress = in.readObject();
partitionId = in.readInt();
replicaIndex = in.readInt();
source = in.readObject();
destination = in.readObject();
success = in.readBoolean();
elapsedTime = in.readLong();
}

@Override
public int getFactoryId() {
return PartitionDataSerializerHook.F_ID;
}

@Override
public int getClassId() {
return PartitionDataSerializerHook.MIGRATION_EVENT;
}

@Override
public String toString() {
return "MigrationEventImpl{" + "progress=" + progress + ", partitionId=" + partitionId + ", replicaIndex=" + replicaIndex
+ ", source=" + source + ", destination=" + destination + ", success=" + success + ", elapsedTime=" + elapsedTime
+ "ms}";
}
}

0 comments on commit a1c15ca

Please sign in to comment.