Skip to content

Commit

Permalink
Support scheduling callbacks to be executed on remote nodes.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Sep 25, 2015
1 parent 19736f4 commit f65bbe3
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 124 deletions.
Expand Up @@ -20,8 +20,8 @@
import io.atomix.catalyst.util.Listener;
import io.atomix.catalyst.util.Listeners;
import io.atomix.copycat.Resource;
import io.atomix.copycat.coordination.state.GroupCommands;
import io.atomix.copycat.coordination.state.GroupState;
import io.atomix.copycat.coordination.state.MembershipGroupCommands;
import io.atomix.copycat.coordination.state.MembershipGroupState;
import io.atomix.copycat.resource.ResourceContext;

import java.util.Collection;
Expand All @@ -36,25 +36,25 @@
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public class DistributedMembershipGroup extends Resource {
private final Listeners<Member> joinListeners = new Listeners<>();
private final Listeners<Member> leaveListeners = new Listeners<>();
private final Map<Long, Member> members = new ConcurrentHashMap<>();
private final Listeners<GroupMember> joinListeners = new Listeners<>();
private final Listeners<GroupMember> leaveListeners = new Listeners<>();
private final Map<Long, GroupMember> members = new ConcurrentHashMap<>();

@Override
protected void open(ResourceContext context) {
super.open(context);

context.session().<Long>onEvent("join", memberId -> {
Member member = members.computeIfAbsent(memberId, m -> new Member(m, this));
for (Listener<Member> listener : joinListeners) {
GroupMember member = members.computeIfAbsent(memberId, m -> new GroupMember(m, this));
for (Listener<GroupMember> listener : joinListeners) {
listener.accept(member);
}
});

context.session().onEvent("leave", memberId -> {
Member member = members.remove(memberId);
GroupMember member = members.remove(memberId);
if (member != null) {
for (Listener<Member> listener : leaveListeners) {
for (Listener<GroupMember> listener : leaveListeners) {
listener.accept(member);
}
}
Expand All @@ -65,7 +65,7 @@ protected void open(ResourceContext context) {

@Override
protected Class<? extends StateMachine> stateMachine() {
return GroupState.class;
return MembershipGroupState.class;
}

/**
Expand All @@ -74,7 +74,7 @@ protected Class<? extends StateMachine> stateMachine() {
* @param memberId The member ID.
* @return The member.
*/
public Member member(long memberId) {
public GroupMember member(long memberId) {
return members.get(memberId);
}

Expand All @@ -83,7 +83,7 @@ public Member member(long memberId) {
*
* @return The collection of members in the group.
*/
public Collection<Member> members() {
public Collection<GroupMember> members() {
return members.values();
}

Expand All @@ -93,7 +93,7 @@ public Collection<Member> members() {
* @return A completable future to be completed once the member has joined.
*/
public CompletableFuture<Void> join() {
return submit(GroupCommands.Join.builder().build());
return submit(MembershipGroupCommands.Join.builder().build());
}

/**
Expand All @@ -102,7 +102,7 @@ public CompletableFuture<Void> join() {
* @param listener The join listener.
* @return The listener context.
*/
public Listener<Member> onJoin(Consumer<Member> listener) {
public Listener<GroupMember> onJoin(Consumer<GroupMember> listener) {
return joinListeners.add(listener);
}

Expand All @@ -112,7 +112,7 @@ public Listener<Member> onJoin(Consumer<Member> listener) {
* @return A completable future to be completed once the member has left.
*/
public CompletableFuture<Void> leave() {
return submit(GroupCommands.Leave.builder().build());
return submit(MembershipGroupCommands.Leave.builder().build());
}

/**
Expand All @@ -121,7 +121,7 @@ public CompletableFuture<Void> leave() {
* @param listener The leave listener.
* @return The listener context.
*/
public Listener<Member> onLeave(Consumer<Member> listener) {
public Listener<GroupMember> onLeave(Consumer<GroupMember> listener) {
return leaveListeners.add(listener);
}

Expand Down
Expand Up @@ -16,20 +16,22 @@
package io.atomix.copycat.coordination;

import io.atomix.catalyst.util.Assert;
import io.atomix.copycat.coordination.state.GroupCommands;
import io.atomix.copycat.coordination.state.MembershipGroupCommands;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;

/**
* Group member.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public class Member {
public class GroupMember {
private final long memberId;
private final DistributedMembershipGroup group;

Member(long memberId, DistributedMembershipGroup group) {
GroupMember(long memberId, DistributedMembershipGroup group) {
this.memberId = memberId;
this.group = Assert.notNull(group, "group");
}
Expand All @@ -43,14 +45,41 @@ public long id() {
return memberId;
}

/**
* Schedules a callback to run at the given instant.
*
* @param instant The instant at which to run the callback.
* @param callback The callback to run.
* @return A completable future to be completed once the callback has been scheduled.
*/
public CompletableFuture<Void> schedule(Instant instant, Runnable callback) {
return schedule(instant.minusMillis(System.currentTimeMillis()), callback);
}

/**
* Schedules a callback to run after the given delay on the member.
*
* @param delay The delay after which to run the callback.
* @param callback The callback to run.
* @return A completable future to be completed once the callback has been scheduled.
*/
public CompletableFuture<Void> schedule(Duration delay, Runnable callback) {
return group.submit(MembershipGroupCommands.Schedule.builder()
.withMember(memberId)
.withDelay(delay.toMillis())
.withCallback(callback)
.build());
}

/**
* Executes a callback on the group member.
*
* @param callback The callback to execute.
* @return A completable future to be completed once the callback has completed.
*/
public CompletableFuture<Void> execute(Runnable callback) {
return group.submit(GroupCommands.Execute.builder()
return group.submit(MembershipGroupCommands.Execute.builder()
.withMember(memberId)
.withCallback(callback)
.build());
}
Expand Down

This file was deleted.

0 comments on commit f65bbe3

Please sign in to comment.