Skip to content

Commit

Permalink
Rename ClusterEvent and ClusterEventListener for consistency with Clu…
Browse files Browse the repository at this point in the history
…sterMembershipService.
  • Loading branch information
kuujo committed Apr 26, 2018
1 parent 08cd0ac commit d14dadf
Show file tree
Hide file tree
Showing 13 changed files with 86 additions and 86 deletions.
Expand Up @@ -23,7 +23,7 @@
/** /**
* Describes cluster-related event. * Describes cluster-related event.
*/ */
public class ClusterEvent extends AbstractEvent<ClusterEvent.Type, Member> { public class ClusterMembershipEvent extends AbstractEvent<ClusterMembershipEvent.Type, Member> {


/** /**
* Type of cluster-related events. * Type of cluster-related events.
Expand Down Expand Up @@ -57,7 +57,7 @@ public enum Type {
* @param type cluster event type * @param type cluster event type
* @param instance cluster device subject * @param instance cluster device subject
*/ */
public ClusterEvent(Type type, Member instance) { public ClusterMembershipEvent(Type type, Member instance) {
super(type, instance); super(type, instance);
} }


Expand All @@ -68,7 +68,7 @@ public ClusterEvent(Type type, Member instance) {
* @param instance event device subject * @param instance event device subject
* @param time occurrence time * @param time occurrence time
*/ */
public ClusterEvent(Type type, Member instance, long time) { public ClusterMembershipEvent(Type type, Member instance, long time) {
super(type, instance, time); super(type, instance, time);
} }


Expand All @@ -82,8 +82,8 @@ public boolean equals(Object obj) {
if (this == obj) { if (this == obj) {
return true; return true;
} }
if (obj instanceof ClusterEvent) { if (obj instanceof ClusterMembershipEvent) {
final ClusterEvent other = (ClusterEvent) obj; final ClusterMembershipEvent other = (ClusterMembershipEvent) obj;
return Objects.equals(this.type(), other.type()) && return Objects.equals(this.type(), other.type()) &&
Objects.equals(this.subject(), other.subject()) && Objects.equals(this.subject(), other.subject()) &&
Objects.equals(this.time(), other.time()); Objects.equals(this.time(), other.time());
Expand Down
Expand Up @@ -20,5 +20,5 @@
/** /**
* Entity capable of receiving device cluster-related events. * Entity capable of receiving device cluster-related events.
*/ */
public interface ClusterEventListener extends EventListener<ClusterEvent> { public interface ClusterMembershipEventListener extends EventListener<ClusterMembershipEvent> {
} }
Expand Up @@ -23,7 +23,7 @@
* Service for obtaining information about the individual members within * Service for obtaining information about the individual members within
* the cluster. * the cluster.
*/ */
public interface ClusterMembershipService extends ListenerService<ClusterEvent, ClusterEventListener> { public interface ClusterMembershipService extends ListenerService<ClusterMembershipEvent, ClusterMembershipEventListener> {


/** /**
* Returns the local member. * Returns the local member.
Expand Down
Expand Up @@ -19,8 +19,8 @@
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import io.atomix.cluster.BootstrapMetadataService; import io.atomix.cluster.BootstrapMetadataService;
import io.atomix.cluster.ClusterEvent; import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterEventListener; import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.ClusterMembershipService; import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.ClusterMetadataEvent; import io.atomix.cluster.ClusterMetadataEvent;
import io.atomix.cluster.ClusterMetadataEventListener; import io.atomix.cluster.ClusterMetadataEventListener;
Expand Down Expand Up @@ -63,7 +63,7 @@
* Default cluster implementation. * Default cluster implementation.
*/ */
public class DefaultClusterMembershipService public class DefaultClusterMembershipService
extends AbstractListenerManager<ClusterEvent, ClusterEventListener> extends AbstractListenerManager<ClusterMembershipEvent, ClusterMembershipEventListener>
implements ManagedClusterMembershipService { implements ManagedClusterMembershipService {


private static final Logger LOGGER = getLogger(DefaultClusterMembershipService.class); private static final Logger LOGGER = getLogger(DefaultClusterMembershipService.class);
Expand Down Expand Up @@ -160,8 +160,8 @@ private void broadcastIdentity() {
private void handleBroadcastMessage(byte[] message) { private void handleBroadcastMessage(byte[] message) {
StatefulMember node = SERIALIZER.decode(message); StatefulMember node = SERIALIZER.decode(message);
if (nodes.putIfAbsent(node.id(), node) == null) { if (nodes.putIfAbsent(node.id(), node) == null) {
post(new ClusterEvent(ClusterEvent.Type.NODE_ADDED, node)); post(new ClusterMembershipEvent(ClusterMembershipEvent.Type.NODE_ADDED, node));
post(new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, node)); post(new ClusterMembershipEvent(ClusterMembershipEvent.Type.NODE_ACTIVATED, node));
sendHeartbeats(); sendHeartbeats();
} }
} }
Expand Down Expand Up @@ -223,8 +223,8 @@ private CompletableFuture<Void> sendHeartbeat(Address address, byte[] payload) {
boolean sendHeartbeats = false; boolean sendHeartbeats = false;
for (StatefulMember node : nodes) { for (StatefulMember node : nodes) {
if (this.nodes.putIfAbsent(node.id(), node) == null) { if (this.nodes.putIfAbsent(node.id(), node) == null) {
post(new ClusterEvent(ClusterEvent.Type.NODE_ADDED, node)); post(new ClusterMembershipEvent(ClusterMembershipEvent.Type.NODE_ADDED, node));
post(new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, node)); post(new ClusterMembershipEvent(ClusterMembershipEvent.Type.NODE_ACTIVATED, node));
sendHeartbeats = true; sendHeartbeats = true;
} }
} }
Expand Down Expand Up @@ -275,8 +275,8 @@ private void activateNode(Member member) {
LOGGER.info("{} - Node activated: {}", localMember.id(), statefulNode); LOGGER.info("{} - Node activated: {}", localMember.id(), statefulNode);
statefulNode.setState(State.ACTIVE); statefulNode.setState(State.ACTIVE);
nodes.put(statefulNode.id(), statefulNode); nodes.put(statefulNode.id(), statefulNode);
post(new ClusterEvent(ClusterEvent.Type.NODE_ADDED, statefulNode)); post(new ClusterMembershipEvent(ClusterMembershipEvent.Type.NODE_ADDED, statefulNode));
post(new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, statefulNode)); post(new ClusterMembershipEvent(ClusterMembershipEvent.Type.NODE_ACTIVATED, statefulNode));
sendHeartbeat(member.address(), SERIALIZER.encode(new ClusterHeartbeat( sendHeartbeat(member.address(), SERIALIZER.encode(new ClusterHeartbeat(
localMember.id(), localMember.id(),
localMember.type(), localMember.type(),
Expand All @@ -287,7 +287,7 @@ private void activateNode(Member member) {
} else if (existingNode.getState() == State.INACTIVE) { } else if (existingNode.getState() == State.INACTIVE) {
LOGGER.info("{} - Node activated: {}", localMember.id(), existingNode); LOGGER.info("{} - Node activated: {}", localMember.id(), existingNode);
existingNode.setState(State.ACTIVE); existingNode.setState(State.ACTIVE);
post(new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, existingNode)); post(new ClusterMembershipEvent(ClusterMembershipEvent.Type.NODE_ACTIVATED, existingNode));
} }
} }


Expand All @@ -301,11 +301,11 @@ private void deactivateNode(Member member) {
existingNode.setState(State.INACTIVE); existingNode.setState(State.INACTIVE);
switch (existingNode.type()) { switch (existingNode.type()) {
case PERSISTENT: case PERSISTENT:
post(new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, existingNode)); post(new ClusterMembershipEvent(ClusterMembershipEvent.Type.NODE_DEACTIVATED, existingNode));
break; break;
case EPHEMERAL: case EPHEMERAL:
post(new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, existingNode)); post(new ClusterMembershipEvent(ClusterMembershipEvent.Type.NODE_DEACTIVATED, existingNode));
post(new ClusterEvent(ClusterEvent.Type.NODE_REMOVED, existingNode)); post(new ClusterMembershipEvent(ClusterMembershipEvent.Type.NODE_REMOVED, existingNode));
break; break;
default: default:
throw new AssertionError(); throw new AssertionError();
Expand All @@ -332,7 +332,7 @@ private void handleMetadataEvent(ClusterMetadataEvent event) {
node.host(), node.host(),
node.tags()); node.tags());
nodes.put(newNode.id(), newNode); nodes.put(newNode.id(), newNode);
post(new ClusterEvent(ClusterEvent.Type.NODE_ADDED, newNode)); post(new ClusterMembershipEvent(ClusterMembershipEvent.Type.NODE_ADDED, newNode));
} }
return node.id(); return node.id();
}).collect(Collectors.toSet()); }).collect(Collectors.toSet());
Expand All @@ -350,7 +350,7 @@ private void handleMetadataEvent(ClusterMetadataEvent event) {
for (MemberId memberId : missingNodes) { for (MemberId memberId : missingNodes) {
StatefulMember existingNode = nodes.remove(memberId); StatefulMember existingNode = nodes.remove(memberId);
if (existingNode != null) { if (existingNode != null) {
post(new ClusterEvent(ClusterEvent.Type.NODE_REMOVED, existingNode)); post(new ClusterMembershipEvent(ClusterMembershipEvent.Type.NODE_REMOVED, existingNode));
} }
} }
} }
Expand Down
40 changes: 20 additions & 20 deletions core/src/test/java/io/atomix/core/AtomixTest.java
Expand Up @@ -15,8 +15,8 @@
*/ */
package io.atomix.core; package io.atomix.core;


import io.atomix.cluster.ClusterEvent; import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterEventListener; import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.Member; import io.atomix.cluster.Member;
import io.atomix.core.profile.Profile; import io.atomix.core.profile.Profile;
import io.atomix.core.profile.Profiles; import io.atomix.core.profile.Profiles;
Expand Down Expand Up @@ -187,58 +187,58 @@ public void testClientJoinLeaveCore() throws Exception {
futures.add(startAtomix(Member.Type.PERSISTENT, 3, Arrays.asList(1, 2, 3), Profiles.CONSENSUS)); futures.add(startAtomix(Member.Type.PERSISTENT, 3, Arrays.asList(1, 2, 3), Profiles.CONSENSUS));
Futures.allOf(futures).join(); Futures.allOf(futures).join();


TestClusterEventListener dataListener = new TestClusterEventListener(); TestClusterMembershipEventListener dataListener = new TestClusterMembershipEventListener();
instances.get(0).membershipService().addListener(dataListener); instances.get(0).membershipService().addListener(dataListener);


Atomix client1 = startAtomix(Member.Type.EPHEMERAL, 4, Arrays.asList(1, 2, 3), Profiles.CLIENT).join(); Atomix client1 = startAtomix(Member.Type.EPHEMERAL, 4, Arrays.asList(1, 2, 3), Profiles.CLIENT).join();
assertEquals(1, client1.partitionService().getPartitionGroups().size()); assertEquals(1, client1.partitionService().getPartitionGroups().size());


// client1 added to data node // client1 added to data node
ClusterEvent event1 = dataListener.event(); ClusterMembershipEvent event1 = dataListener.event();
assertEquals(ClusterEvent.Type.NODE_ADDED, event1.type()); assertEquals(ClusterMembershipEvent.Type.NODE_ADDED, event1.type());
event1 = dataListener.event(); event1 = dataListener.event();
assertEquals(ClusterEvent.Type.NODE_ACTIVATED, event1.type()); assertEquals(ClusterMembershipEvent.Type.NODE_ACTIVATED, event1.type());


Thread.sleep(1000); Thread.sleep(1000);


TestClusterEventListener clientListener = new TestClusterEventListener(); TestClusterMembershipEventListener clientListener = new TestClusterMembershipEventListener();
client1.membershipService().addListener(clientListener); client1.membershipService().addListener(clientListener);


Atomix client2 = startAtomix(Member.Type.EPHEMERAL, 5, Arrays.asList(1, 2, 3), Profiles.CLIENT).join(); Atomix client2 = startAtomix(Member.Type.EPHEMERAL, 5, Arrays.asList(1, 2, 3), Profiles.CLIENT).join();
assertEquals(1, client2.partitionService().getPartitionGroups().size()); assertEquals(1, client2.partitionService().getPartitionGroups().size());


// client2 added to data node // client2 added to data node
ClusterEvent event2 = dataListener.event(); ClusterMembershipEvent event2 = dataListener.event();
assertEquals(ClusterEvent.Type.NODE_ADDED, event2.type()); assertEquals(ClusterMembershipEvent.Type.NODE_ADDED, event2.type());
event2 = dataListener.event(); event2 = dataListener.event();
assertEquals(ClusterEvent.Type.NODE_ACTIVATED, event2.type()); assertEquals(ClusterMembershipEvent.Type.NODE_ACTIVATED, event2.type());


// client2 added to client node // client2 added to client node
event1 = clientListener.event(); event1 = clientListener.event();
assertEquals(ClusterEvent.Type.NODE_ADDED, event1.type()); assertEquals(ClusterMembershipEvent.Type.NODE_ADDED, event1.type());
event1 = clientListener.event(); event1 = clientListener.event();
assertEquals(ClusterEvent.Type.NODE_ACTIVATED, event1.type()); assertEquals(ClusterMembershipEvent.Type.NODE_ACTIVATED, event1.type());


client2.stop().join(); client2.stop().join();


// client2 removed from data node // client2 removed from data node
event1 = dataListener.event(); event1 = dataListener.event();
assertEquals(ClusterEvent.Type.NODE_DEACTIVATED, event1.type()); assertEquals(ClusterMembershipEvent.Type.NODE_DEACTIVATED, event1.type());
event1 = dataListener.event(); event1 = dataListener.event();
assertEquals(ClusterEvent.Type.NODE_REMOVED, event1.type()); assertEquals(ClusterMembershipEvent.Type.NODE_REMOVED, event1.type());


// client2 removed from client node // client2 removed from client node
event1 = clientListener.event(); event1 = clientListener.event();
assertEquals(ClusterEvent.Type.NODE_DEACTIVATED, event1.type()); assertEquals(ClusterMembershipEvent.Type.NODE_DEACTIVATED, event1.type());
event1 = clientListener.event(); event1 = clientListener.event();
assertEquals(ClusterEvent.Type.NODE_REMOVED, event1.type()); assertEquals(ClusterMembershipEvent.Type.NODE_REMOVED, event1.type());
} }


private static class TestClusterEventListener implements ClusterEventListener { private static class TestClusterMembershipEventListener implements ClusterMembershipEventListener {
private final BlockingQueue<ClusterEvent> queue = new LinkedBlockingQueue<>(); private final BlockingQueue<ClusterMembershipEvent> queue = new LinkedBlockingQueue<>();


@Override @Override
public void onEvent(ClusterEvent event) { public void onEvent(ClusterMembershipEvent event) {
try { try {
queue.put(event); queue.put(event);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Expand All @@ -250,7 +250,7 @@ public boolean eventReceived() {
return !queue.isEmpty(); return !queue.isEmpty();
} }


public ClusterEvent event() throws InterruptedException { public ClusterMembershipEvent event() throws InterruptedException {
return queue.take(); return queue.take();
} }
} }
Expand Down
Expand Up @@ -15,7 +15,7 @@
*/ */
package io.atomix.primitive.partition.impl; package io.atomix.primitive.partition.impl;


import io.atomix.cluster.ClusterEventListener; import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.ClusterMembershipService; import io.atomix.cluster.ClusterMembershipService;
import io.atomix.primitive.partition.ManagedMemberGroupService; import io.atomix.primitive.partition.ManagedMemberGroupService;
import io.atomix.primitive.partition.MemberGroup; import io.atomix.primitive.partition.MemberGroup;
Expand All @@ -39,7 +39,7 @@ public class DefaultMemberGroupService
private final AtomicBoolean started = new AtomicBoolean(); private final AtomicBoolean started = new AtomicBoolean();
private final ClusterMembershipService membershipService; private final ClusterMembershipService membershipService;
private final MemberGroupProvider memberGroupProvider; private final MemberGroupProvider memberGroupProvider;
private final ClusterEventListener clusterEventListener = event -> recomputeGroups(); private final ClusterMembershipEventListener membershipEventListener = event -> recomputeGroups();
private volatile Collection<MemberGroup> memberGroups; private volatile Collection<MemberGroup> memberGroups;


public DefaultMemberGroupService(ClusterMembershipService membershipService, MemberGroupProvider memberGroupProvider) { public DefaultMemberGroupService(ClusterMembershipService membershipService, MemberGroupProvider memberGroupProvider) {
Expand All @@ -60,7 +60,7 @@ private void recomputeGroups() {
public CompletableFuture<MemberGroupService> start() { public CompletableFuture<MemberGroupService> start() {
if (started.compareAndSet(false, true)) { if (started.compareAndSet(false, true)) {
memberGroups = memberGroupProvider.getMemberGroups(membershipService.getMembers()); memberGroups = memberGroupProvider.getMemberGroups(membershipService.getMembers());
membershipService.addListener(clusterEventListener); membershipService.addListener(membershipEventListener);
} }
return CompletableFuture.completedFuture(this); return CompletableFuture.completedFuture(this);
} }
Expand All @@ -73,7 +73,7 @@ public boolean isRunning() {
@Override @Override
public CompletableFuture<Void> stop() { public CompletableFuture<Void> stop() {
if (started.compareAndSet(true, false)) { if (started.compareAndSet(true, false)) {
membershipService.removeListener(clusterEventListener); membershipService.removeListener(membershipEventListener);
} }
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
Expand Down
Expand Up @@ -16,7 +16,7 @@
package io.atomix.primitive.partition.impl; package io.atomix.primitive.partition.impl;


import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import io.atomix.cluster.ClusterEventListener; import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.ClusterMembershipService; import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member; import io.atomix.cluster.Member;
import io.atomix.primitive.partition.GroupMember; import io.atomix.primitive.partition.GroupMember;
Expand All @@ -43,15 +43,15 @@ public class HashBasedPrimaryElection
private final PartitionId partitionId; private final PartitionId partitionId;
private final ClusterMembershipService clusterMembershipService; private final ClusterMembershipService clusterMembershipService;
private final HashBasedPrimaryElectionService electionService; private final HashBasedPrimaryElectionService electionService;
private final ClusterEventListener clusterEventListener = e -> recomputeTerm(); private final ClusterMembershipEventListener membershipEventListener = e -> recomputeTerm();
private volatile PrimaryTerm currentTerm; private volatile PrimaryTerm currentTerm;


public HashBasedPrimaryElection(PartitionId partitionId, ClusterMembershipService clusterMembershipService, HashBasedPrimaryElectionService electionService) { public HashBasedPrimaryElection(PartitionId partitionId, ClusterMembershipService clusterMembershipService, HashBasedPrimaryElectionService electionService) {
this.partitionId = partitionId; this.partitionId = partitionId;
this.clusterMembershipService = clusterMembershipService; this.clusterMembershipService = clusterMembershipService;
this.electionService = electionService; this.electionService = electionService;
recomputeTerm(); recomputeTerm();
clusterMembershipService.addListener(clusterEventListener); clusterMembershipService.addListener(membershipEventListener);
} }


@Override @Override
Expand Down
Expand Up @@ -17,8 +17,8 @@


import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import io.atomix.cluster.ClusterEvent; import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterEventListener; import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.ClusterMembershipService; import io.atomix.cluster.ClusterMembershipService;
import io.atomix.primitive.PrimitiveException; import io.atomix.primitive.PrimitiveException;
import io.atomix.primitive.PrimitiveType; import io.atomix.primitive.PrimitiveType;
Expand Down Expand Up @@ -65,7 +65,7 @@ public class PrimaryBackupProxy extends AbstractPrimitiveProxy {
private final Set<Consumer<State>> stateChangeListeners = Sets.newIdentityHashSet(); private final Set<Consumer<State>> stateChangeListeners = Sets.newIdentityHashSet();
private final Set<Consumer<PrimitiveEvent>> eventListeners = Sets.newIdentityHashSet(); private final Set<Consumer<PrimitiveEvent>> eventListeners = Sets.newIdentityHashSet();
private final PrimaryElectionEventListener primaryElectionListener = event -> changeReplicas(event.term()); private final PrimaryElectionEventListener primaryElectionListener = event -> changeReplicas(event.term());
private final ClusterEventListener clusterEventListener = this::handleClusterEvent; private final ClusterMembershipEventListener membershipEventListener = this::handleClusterEvent;
private PrimaryTerm term; private PrimaryTerm term;
private volatile State state = State.CLOSED; private volatile State state = State.CLOSED;


Expand Down Expand Up @@ -212,8 +212,8 @@ private void changeReplicas(PrimaryTerm term) {
/** /**
* Handles a cluster event. * Handles a cluster event.
*/ */
private void handleClusterEvent(ClusterEvent event) { private void handleClusterEvent(ClusterMembershipEvent event) {
if (event.type() == ClusterEvent.Type.NODE_DEACTIVATED && event.subject().id().equals(term.primary().memberId())) { if (event.type() == ClusterMembershipEvent.Type.NODE_DEACTIVATED && event.subject().id().equals(term.primary().memberId())) {
threadContext.execute(() -> { threadContext.execute(() -> {
state = State.SUSPENDED; state = State.SUSPENDED;
stateChangeListeners.forEach(l -> l.accept(state)); stateChangeListeners.forEach(l -> l.accept(state));
Expand Down Expand Up @@ -257,7 +257,7 @@ public CompletableFuture<Void> close() {
protocol.close(term.primary().memberId(), new CloseRequest(descriptor, sessionId.id())) protocol.close(term.primary().memberId(), new CloseRequest(descriptor, sessionId.id()))
.whenCompleteAsync((response, error) -> { .whenCompleteAsync((response, error) -> {
protocol.unregisterEventListener(sessionId); protocol.unregisterEventListener(sessionId);
clusterMembershipService.removeListener(clusterEventListener); clusterMembershipService.removeListener(membershipEventListener);
primaryElection.removeListener(primaryElectionListener); primaryElection.removeListener(primaryElectionListener);
future.complete(null); future.complete(null);
}, threadContext); }, threadContext);
Expand Down
Expand Up @@ -16,8 +16,8 @@


package io.atomix.protocols.backup.service.impl; package io.atomix.protocols.backup.service.impl;


import io.atomix.cluster.ClusterEvent; import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterEventListener; import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.ClusterMembershipService; import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.MemberId; import io.atomix.cluster.MemberId;
import io.atomix.primitive.PrimitiveId; import io.atomix.primitive.PrimitiveId;
Expand Down Expand Up @@ -104,7 +104,7 @@ public WallClockTimestamp getTime() {
} }
}; };
private PrimaryBackupRole role; private PrimaryBackupRole role;
private final ClusterEventListener clusterEventListener = this::handleClusterEvent; private final ClusterMembershipEventListener membershipEventListener = this::handleClusterEvent;
private final PrimaryElectionEventListener primaryElectionListener = event -> changeRole(event.term()); private final PrimaryElectionEventListener primaryElectionListener = event -> changeRole(event.term());


public PrimaryBackupServiceContext( public PrimaryBackupServiceContext(
Expand Down Expand Up @@ -133,7 +133,7 @@ public PrimaryBackupServiceContext(
.add("type", descriptor.type()) .add("type", descriptor.type())
.add("name", descriptor.name()) .add("name", descriptor.name())
.build()); .build());
clusterMembershipService.addListener(clusterEventListener); clusterMembershipService.addListener(membershipEventListener);
primaryElection.addListener(primaryElectionListener); primaryElection.addListener(primaryElectionListener);
} }


Expand Down Expand Up @@ -517,8 +517,8 @@ public PrimaryBackupSession getOrCreateSession(long sessionId, MemberId memberId
/** /**
* Handles a cluster event. * Handles a cluster event.
*/ */
private void handleClusterEvent(ClusterEvent event) { private void handleClusterEvent(ClusterMembershipEvent event) {
if (event.type() == ClusterEvent.Type.NODE_DEACTIVATED) { if (event.type() == ClusterMembershipEvent.Type.NODE_DEACTIVATED) {
for (Session session : sessions) { for (Session session : sessions) {
if (session.memberId().equals(event.subject().id())) { if (session.memberId().equals(event.subject().id())) {
role.expire((PrimaryBackupSession) session); role.expire((PrimaryBackupSession) session);
Expand Down Expand Up @@ -575,7 +575,7 @@ private void changeRole(PrimaryTerm term) {
* Closes the service. * Closes the service.
*/ */
public CompletableFuture<Void> close() { public CompletableFuture<Void> close() {
clusterMembershipService.removeListener(clusterEventListener); clusterMembershipService.removeListener(membershipEventListener);
primaryElection.removeListener(primaryElectionListener); primaryElection.removeListener(primaryElectionListener);
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
Expand Down

0 comments on commit d14dadf

Please sign in to comment.