Skip to content

Commit

Permalink
Refactor MigrationListener API
Browse files Browse the repository at this point in the history
With new listener, an event is published when a new migration process starts
and another event when migration is completed. Those events include stats
about migration process; start time, planned migration count, completed migration count etc.

Additionally, on each replica migration, both for primary and backup replica migrations,
a migration event is published. This event includes partition-id, replica-index and
migration progress stats.
  • Loading branch information
mdogan committed Sep 5, 2019
1 parent 8fdc5d6 commit bacdf04
Show file tree
Hide file tree
Showing 38 changed files with 1,320 additions and 643 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import com.hazelcast.client.impl.spi.EventHandler;
import com.hazelcast.client.impl.spi.impl.ListenerMessageCodec;
import com.hazelcast.cluster.Member;
import com.hazelcast.internal.partition.PartitionLostEventImpl;
import com.hazelcast.nio.Address;
import com.hazelcast.partition.MigrationListener;
import com.hazelcast.partition.Partition;
import com.hazelcast.partition.PartitionLostEvent;
import com.hazelcast.partition.PartitionLostListener;
import com.hazelcast.partition.PartitionService;

Expand Down Expand Up @@ -141,7 +141,7 @@ private static class ClientPartitionLostEventHandler extends ClientAddPartitionL

@Override
public void handlePartitionLostEvent(int partitionId, int lostBackupCount, Address source) {
listener.partitionLost(new PartitionLostEvent(partitionId, lostBackupCount, source));
listener.partitionLost(new PartitionLostEventImpl(partitionId, lostBackupCount, source));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
import com.hazelcast.nio.Connection;
import com.hazelcast.nio.ConnectionListenable;
import com.hazelcast.nio.ConnectionListener;
import com.hazelcast.partition.MigrationEvent;
import com.hazelcast.partition.MigrationState;
import com.hazelcast.partition.MigrationListener;
import com.hazelcast.partition.ReplicaMigrationEvent;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.spi.properties.HazelcastProperty;
Expand Down Expand Up @@ -157,8 +158,10 @@ public void run(DiagnosticsLogWriter writer) {
render(writer, (LifecycleEvent) item);
} else if (item instanceof MembershipEvent) {
render(writer, (MembershipEvent) item);
} else if (item instanceof MigrationEvent) {
render(writer, (MigrationEvent) item);
} else if (item instanceof MigrationState) {
render(writer, (MigrationState) item);
} else if (item instanceof ReplicaMigrationEvent) {
render(writer, (ReplicaMigrationEvent) item);
} else if (item instanceof ConnectionEvent) {
ConnectionEvent event = (ConnectionEvent) item;
render(writer, event);
Expand Down Expand Up @@ -213,25 +216,31 @@ private void render(DiagnosticsLogWriter writer, MembershipEvent event) {
writer.endSection();
}

private void render(DiagnosticsLogWriter writer, MigrationEvent event) {
switch (event.getStatus()) {
case STARTED:
writer.startSection("MigrationStarted");
break;
case COMPLETED:
writer.startSection("MigrationCompleted");
break;
case FAILED:
writer.startSection("MigrationFailed");
break;
default:
return;
private void render(DiagnosticsLogWriter writer, MigrationState migrationState) {
writer.startSection("MigrationState");
writer.writeKeyValueEntryAsDateTime("startTime", migrationState.getStartTime());
writer.writeKeyValueEntry("plannedMigrations", migrationState.getPlannedMigrations());
writer.writeKeyValueEntry("completedMigrations", migrationState.getCompletedMigrations());
writer.writeKeyValueEntry("remainingMigrations", migrationState.getRemainingMigrations());
writer.writeKeyValueEntry("totalElapsedTime(ms)", migrationState.getTotalElapsedTime());
writer.endSection();
}

private void render(DiagnosticsLogWriter writer, ReplicaMigrationEvent event) {
if (event.isSuccess()) {
writer.startSection("MigrationCompleted");
} else {
writer.startSection("MigrationFailed");
}

Member oldOwner = event.getOldOwner();
writer.writeKeyValueEntry("oldOwner", oldOwner == null ? "null" : oldOwner.getAddress().toString());
writer.writeKeyValueEntry("newOwner", event.getNewOwner().getAddress().toString());
Member source = event.getSource();
writer.writeKeyValueEntry("source", source == null ? "null" : source.getAddress().toString());
writer.writeKeyValueEntry("destination", event.getDestination().getAddress().toString());
writer.writeKeyValueEntry("partitionId", event.getPartitionId());
writer.writeKeyValueEntry("replicaIndex", event.getReplicaIndex());
writer.writeKeyValueEntry("elapsedTime(ms)", event.getReplicaIndex());

render(writer, event.getMigrationState());
writer.endSection();
}

Expand Down Expand Up @@ -322,17 +331,22 @@ public void memberRemoved(MembershipEvent event) {

private class MigrationListenerImpl implements MigrationListener {
@Override
public void migrationStarted(MigrationEvent event) {
logQueue.add(event);
public void migrationStarted(MigrationState state) {
logQueue.add(state);
}

@Override
public void migrationFinished(MigrationState result) {
logQueue.add(result);
}

@Override
public void migrationCompleted(MigrationEvent event) {
public void replicaMigrationCompleted(ReplicaMigrationEvent event) {
logQueue.add(event);
}

@Override
public void migrationFailed(MigrationEvent event) {
public void replicaMigrationFailed(ReplicaMigrationEvent event) {
logQueue.add(event);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.hazelcast.spi.partition.IPartition;

import java.util.stream.IntStream;

public interface InternalPartition extends IPartition {

int MAX_REPLICA_COUNT = MAX_BACKUP_COUNT + 1;
Expand Down Expand Up @@ -55,4 +57,11 @@ public interface InternalPartition extends IPartition {
* @throws ArrayIndexOutOfBoundsException when replica index is out of bounds
*/
PartitionReplica getReplica(int replicaIndex);

/**
* Returns the integer replica indices of {@code InternalPartition} as a stream.
*/
static IntStream replicaIndices() {
return IntStream.range(0, InternalPartition.MAX_REPLICA_COUNT);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.internal.partition;

import com.hazelcast.internal.partition.impl.PartitionDataSerializerHook;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.partition.MigrationState;

import java.io.IOException;

/**
* Summary of the migration state.
* Main implementation of {@link MigrationState}.
*/
public class MigrationStateImpl implements MigrationState, IdentifiedDataSerializable {

private long startTime;
private int plannedMigrations;
private int completedMigrations;
private long totalElapsedTime;

public MigrationStateImpl() {
}

public MigrationStateImpl(long startTime, int plannedMigrations, int completedMigrations, long totalElapsedTime) {
this.startTime = startTime;
this.plannedMigrations = plannedMigrations;
this.completedMigrations = completedMigrations;
this.totalElapsedTime = totalElapsedTime;
}

@Override
public long getStartTime() {
return startTime;
}

@Override
public int getPlannedMigrations() {
return plannedMigrations;
}

@Override
public int getCompletedMigrations() {
return completedMigrations;
}

@Override
public int getRemainingMigrations() {
return plannedMigrations - completedMigrations;
}

@Override
public long getTotalElapsedTime() {
return totalElapsedTime;
}

public MigrationStateImpl onComplete(long elapsed) {
return onComplete(1, elapsed);
}

public MigrationStateImpl onComplete(int migrations, long elapsed) {
return new MigrationStateImpl(startTime, plannedMigrations, completedMigrations + migrations, totalElapsedTime + elapsed);
}

@Override
public void writeData(ObjectDataOutput out) throws IOException {
out.writeLong(startTime);
out.writeLong(totalElapsedTime);
out.writeInt(plannedMigrations);
out.writeInt(completedMigrations);
}

@Override
public void readData(ObjectDataInput in) throws IOException {
startTime = in.readLong();
totalElapsedTime = in.readLong();
plannedMigrations = in.readInt();
completedMigrations = in.readInt();
}

@Override
public int getFactoryId() {
return PartitionDataSerializerHook.F_ID;
}

@Override
public int getClassId() {
return PartitionDataSerializerHook.MIGRATION_EVENT;
}

@Override
public String toString() {
return "MigrationState{startTime=" + startTime + ", plannedMigrations=" + plannedMigrations
+ ", completedMigrations=" + completedMigrations + ", totalElapsedTime=" + totalElapsedTime + "ms}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.internal.partition;

import com.hazelcast.internal.partition.impl.PartitionDataSerializerHook;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.partition.Partition;
import com.hazelcast.partition.PartitionLostEvent;
import com.hazelcast.partition.PartitionLostListener;
import com.hazelcast.partition.PartitionService;
import com.hazelcast.spi.partition.IPartitionLostEvent;

import java.io.IOException;

/**
* The event that is fired when a partition lost its owner and all backups.
*
* @see Partition
* @see PartitionService
* @see PartitionLostListener
*/
public class PartitionLostEventImpl implements PartitionLostEvent, IPartitionLostEvent, IdentifiedDataSerializable {

private int partitionId;

private int lostBackupCount;

private Address eventSource;

public PartitionLostEventImpl() {
}

public PartitionLostEventImpl(int partitionId, int lostBackupCount, Address eventSource) {
this.partitionId = partitionId;
this.lostBackupCount = lostBackupCount;
this.eventSource = eventSource;
}

/**
* Returns the lost partition ID.
*
* @return the lost partition ID.
*/
@Override
public int getPartitionId() {
return partitionId;
}

/**
* Returns the number of lost backups for the partition. 0: the owner, 1: first backup, 2: second backup...
* If all replicas of a partition are lost, {@link InternalPartition#MAX_BACKUP_COUNT} is returned.
*/
@Override
public int getLostBackupCount() {
return lostBackupCount;
}

@Override
public int getLostReplicaIndex() {
return lostBackupCount;
}

/**
* Returns the address of the node that dispatches the event
*
* @return the address of the node that dispatches the event
*/
@Override
public Address getEventSource() {
return eventSource;
}

@Override
public void writeData(ObjectDataOutput out) throws IOException {
out.writeInt(partitionId);
out.writeInt(lostBackupCount);
out.writeObject(eventSource);
}

@Override
public void readData(ObjectDataInput in) throws IOException {
partitionId = in.readInt();
lostBackupCount = in.readInt();
eventSource = in.readObject();
}

@Override
public int getFactoryId() {
return PartitionDataSerializerHook.F_ID;
}

@Override
public int getClassId() {
return PartitionDataSerializerHook.PARTITION_LOST_EVENT;
}

@Override
public String toString() {
return getClass().getName() + "{partitionId=" + partitionId + ", lostBackupCount=" + lostBackupCount + ", eventSource="
+ eventSource + '}';
}
}
Loading

0 comments on commit bacdf04

Please sign in to comment.