Skip to content

Commit

Permalink
Add strategies for routing and failover in group task queues.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Mar 31, 2016
1 parent 9d37859 commit 84ba414
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 16 deletions.
30 changes: 29 additions & 1 deletion groups/src/main/java/io/atomix/group/internal/GroupCommands.java
Expand Up @@ -26,6 +26,8 @@
import io.atomix.copycat.Operation; import io.atomix.copycat.Operation;
import io.atomix.copycat.Query; import io.atomix.copycat.Query;
import io.atomix.group.messaging.internal.GroupMessage; import io.atomix.group.messaging.internal.GroupMessage;
import io.atomix.group.task.FailoverStrategy;
import io.atomix.group.task.RoutingStrategy;
import io.atomix.group.task.internal.GroupTask; import io.atomix.group.task.internal.GroupTask;


import java.util.Set; import java.util.Set;
Expand Down Expand Up @@ -263,15 +265,19 @@ public static class Submit extends MemberCommand<Void> {
private long id; private long id;
private String type; private String type;
private Object task; private Object task;
private RoutingStrategy routingStrategy;
private FailoverStrategy failoverStrategy;


public Submit() { public Submit() {
} }


public Submit(String member, String type, long id, Object task) { public Submit(String member, String type, long id, Object task, RoutingStrategy routingStrategy, FailoverStrategy failoverStrategy) {
super(member); super(member);
this.type = type; this.type = type;
this.id = id; this.id = id;
this.task = task; this.task = task;
this.routingStrategy = routingStrategy;
this.failoverStrategy = failoverStrategy;
} }


/** /**
Expand Down Expand Up @@ -301,11 +307,31 @@ public Object task() {
return task; return task;
} }


/**
* Returns the task routing strategy.
*
* @return The task routing strategy.
*/
public RoutingStrategy routingStrategy() {
return routingStrategy;
}

/**
* Returns the task failover strategy.
*
* @return The task failover strategy.
*/
public FailoverStrategy failoverStrategy() {
return failoverStrategy;
}

@Override @Override
public void writeObject(BufferOutput buffer, Serializer serializer) { public void writeObject(BufferOutput buffer, Serializer serializer) {
super.writeObject(buffer, serializer); super.writeObject(buffer, serializer);
buffer.writeString(type); buffer.writeString(type);
buffer.writeLong(id); buffer.writeLong(id);
buffer.writeByte(routingStrategy.ordinal());
buffer.writeByte(failoverStrategy.ordinal());
serializer.writeObject(task, buffer); serializer.writeObject(task, buffer);
} }


Expand All @@ -314,6 +340,8 @@ public void readObject(BufferInput buffer, Serializer serializer) {
super.readObject(buffer, serializer); super.readObject(buffer, serializer);
type = buffer.readString(); type = buffer.readString();
id = buffer.readLong(); id = buffer.readLong();
routingStrategy = RoutingStrategy.values()[buffer.readByte()];
failoverStrategy = FailoverStrategy.values()[buffer.readByte()];
task = serializer.readObject(buffer); task = serializer.readObject(buffer);
} }
} }
Expand Down
60 changes: 51 additions & 9 deletions groups/src/main/java/io/atomix/group/internal/GroupState.java
Expand Up @@ -19,6 +19,8 @@
import io.atomix.copycat.server.Commit; import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.session.ServerSession; import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.session.SessionListener; import io.atomix.copycat.server.session.SessionListener;
import io.atomix.group.task.FailoverStrategy;
import io.atomix.group.task.RoutingStrategy;
import io.atomix.group.task.internal.GroupTask; import io.atomix.group.task.internal.GroupTask;
import io.atomix.resource.ResourceStateMachine; import io.atomix.resource.ResourceStateMachine;


Expand All @@ -31,9 +33,11 @@
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public class GroupState extends ResourceStateMachine implements SessionListener { public class GroupState extends ResourceStateMachine implements SessionListener {
private final Random random = new Random(141650939l);
private final Duration expiration; private final Duration expiration;
private final Map<Long, GroupSession> sessions = new HashMap<>(); private final Map<Long, GroupSession> sessions = new HashMap<>();
private final Map<String, Member> members = new HashMap<>(); private final Map<String, Member> members = new HashMap<>();
private final List<Member> membersList = new ArrayList<>();
private final List<Member> candidates = new ArrayList<>(); private final List<Member> candidates = new ArrayList<>();
private Member leader; private Member leader;
private long term; private long term;
Expand All @@ -59,6 +63,7 @@ public void close(ServerSession session) {
// If the member is not persistent, remove the member from the membership group. // If the member is not persistent, remove the member from the membership group.
if (!member.persistent()) { if (!member.persistent()) {
iterator.remove(); iterator.remove();
membersList.remove(member);
candidates.remove(member); candidates.remove(member);
left.put(member.index(), member); left.put(member.index(), member);
} else { } else {
Expand Down Expand Up @@ -155,6 +160,7 @@ public GroupMemberInfo join(Commit<GroupCommands.Join> commit) {


// Store the member ID and join commit mappings and add the member as a candidate. // Store the member ID and join commit mappings and add the member as a candidate.
members.put(member.id(), member); members.put(member.id(), member);
membersList.add(member);
candidates.add(member); candidates.add(member);


// Iterate through available sessions and publish a join event to each session. // Iterate through available sessions and publish a join event to each session.
Expand Down Expand Up @@ -216,6 +222,7 @@ public void leave(Commit<GroupCommands.Leave> commit) {
Member member = members.remove(commit.operation().member()); Member member = members.remove(commit.operation().member());
if (member != null) { if (member != null) {
// Remove the member from the candidates list. // Remove the member from the candidates list.
membersList.remove(member);
candidates.remove(member); candidates.remove(member);


// If the leaving member was the leader, increment the term and elect a new leader. // If the leaving member was the leader, increment the term and elect a new leader.
Expand Down Expand Up @@ -261,17 +268,32 @@ public Set<GroupMemberInfo> listen(Commit<GroupCommands.Listen> commit) {
public void submit(Commit<GroupCommands.Submit> commit) { public void submit(Commit<GroupCommands.Submit> commit) {
try { try {
if (commit.operation().member() != null) { if (commit.operation().member() != null) {
// Create a task instance.
Task task = new Task(commit);

// Ensure that the member is a member of the group. // Ensure that the member is a member of the group.
Member member = members.get(commit.operation().member()); Member member = members.get(commit.operation().member());
if (member == null) { if (member == null) {
throw new IllegalArgumentException("unknown member: " + commit.operation().member()); task.fail();
task.close();
} else {
// Add the task to the member's task queue.
member.submit(task);
} }

} else if (commit.operation().routingStrategy() == RoutingStrategy.DIRECT) {
// Create a task instance. // Create a task instance.
Task task = new Task(commit); Task task = new Task(commit);


// Add the task to the member's task queue. // If the members list is empty, fail the task submission.
member.submit(task); if (members.isEmpty()) {
task.fail();
task.close();
} else {
Member member = membersList.get(random.nextInt(membersList.size()));

// Add the task to the member's task queue.
member.submit(task);
}
} else { } else {
// Create a task instance. // Create a task instance.
Task task = new Task(commit); Task task = new Task(commit);
Expand Down Expand Up @@ -387,7 +409,7 @@ public boolean equals(Object object) {
/** /**
* Represents a member of the group. * Represents a member of the group.
*/ */
private static class Member implements AutoCloseable { private class Member implements AutoCloseable {
private final Commit<GroupCommands.Join> commit; private final Commit<GroupCommands.Join> commit;
private final long index; private final long index;
private final String memberId; private final String memberId;
Expand Down Expand Up @@ -522,13 +544,33 @@ public void close() {
Task task = this.task; Task task = this.task;
this.task = null; this.task = null;
if (task != null) { if (task != null) {
task.fail(); if (task.commit.operation().routingStrategy() == RoutingStrategy.DIRECT && task.commit.operation().failoverStrategy() == FailoverStrategy.RESUBMIT) {
task.close(); if (!members.isEmpty()) {
Member member = membersList.get(random.nextInt(membersList.size()));
member.submit(task);
} else {
task.fail();
task.close();
}
} else {
task.fail();
task.close();
}
} }


tasks.forEach(t -> { tasks.forEach(t -> {
t.fail(); if (t.commit.operation().routingStrategy() == RoutingStrategy.DIRECT && task.commit.operation().failoverStrategy() == FailoverStrategy.RESUBMIT) {
t.close(); if (!members.isEmpty()) {
Member member = membersList.get(random.nextInt(membersList.size()));
member.submit(t);
} else {
t.fail();
t.close();
}
} else {
t.fail();
t.close();
}
}); });
tasks.clear(); tasks.clear();


Expand Down
Expand Up @@ -54,7 +54,7 @@ public <T> MessageProducer<T> producer(String name, MessageProducer.Options opti
synchronized (producers) { synchronized (producers) {
producer = producers.get(name); producer = producers.get(name);
if (producer == null) { if (producer == null) {
producer = createProducer(name, options); producer = createProducer(name, options != null ? options : new MessageProducer.Options());
producers.put(name, producer); producers.put(name, producer);
} }
} }
Expand Down
Expand Up @@ -44,7 +44,7 @@ public <T> MessageConsumer<T> consumer(String name, MessageConsumer.Options opti
synchronized (consumers) { synchronized (consumers) {
consumer = consumers.get(name); consumer = consumers.get(name);
if (consumer == null) { if (consumer == null) {
consumer = createConsumer(name, options); consumer = createConsumer(name, options != null ? options : new MessageConsumer.Options());
consumers.put(name, consumer); consumers.put(name, consumer);
} }
} }
Expand Down
29 changes: 29 additions & 0 deletions groups/src/main/java/io/atomix/group/task/FailoverStrategy.java
@@ -0,0 +1,29 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package io.atomix.group.task;

/**
* Task failover strategy.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public enum FailoverStrategy {

FAIL,

RESUBMIT

}
35 changes: 35 additions & 0 deletions groups/src/main/java/io/atomix/group/task/RoutingStrategy.java
@@ -0,0 +1,35 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package io.atomix.group.task;

/**
* Task routing strategies.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public enum RoutingStrategy {

/**
* Sends a task to a single member.
*/
DIRECT,

/**
* Sends a task to all members.
*/
ALL

}
44 changes: 44 additions & 0 deletions groups/src/main/java/io/atomix/group/task/TaskProducer.java
Expand Up @@ -15,6 +15,8 @@
*/ */
package io.atomix.group.task; package io.atomix.group.task;


import io.atomix.catalyst.util.Assert;

import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


/** /**
Expand All @@ -26,6 +28,48 @@ public interface TaskProducer<T> extends AutoCloseable {
* Task producer options. * Task producer options.
*/ */
class Options { class Options {
private RoutingStrategy routingStrategy = RoutingStrategy.ALL;
private FailoverStrategy failoverStrategy = FailoverStrategy.FAIL;

/**
* Sets the producer routing strategy.
*
* @param routingStrategy The routing strategy.
* @return The producer options.
*/
public Options withRoutingStrategy(RoutingStrategy routingStrategy) {
this.routingStrategy = Assert.notNull(routingStrategy, "routingStrategy");
return this;
}

/**
* Returns the routing strategy.
*
* @return The routing strategy.
*/
public RoutingStrategy getRoutingStrategy() {
return routingStrategy;
}

/**
* Sets the producer failover strategy.
*
* @param failoverStrategy The producer failover strategy.
* @return The producer options.
*/
public Options withFailoverStrategy(FailoverStrategy failoverStrategy) {
this.failoverStrategy = Assert.notNull(failoverStrategy, "failoverStrategy");
return this;
}

/**
* Returns the failover strategy.
*
* @return The failover strategy.
*/
public FailoverStrategy getFailoverStrategy() {
return failoverStrategy;
}
} }


CompletableFuture<Void> submit(T task); CompletableFuture<Void> submit(T task);
Expand Down
Expand Up @@ -70,7 +70,7 @@ public <T> AbstractTaskProducer<T> producer(String name, TaskProducer.Options op
synchronized (producers) { synchronized (producers) {
producer = producers.get(name); producer = producers.get(name);
if (producer == null) { if (producer == null) {
producer = createProducer(name, options); producer = createProducer(name, options != null ? options : new TaskProducer.Options());
producers.put(name, producer); producers.put(name, producer);
} }
} }
Expand Down
Expand Up @@ -88,7 +88,7 @@ protected CompletableFuture<Void> submit(String member, T task) {
CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> future = new CompletableFuture<>();
final long taskId = ++this.taskId; final long taskId = ++this.taskId;
taskFutures.put(taskId, future); taskFutures.put(taskId, future);
client.submitter().submit(new GroupCommands.Submit(member, name, taskId, task)).whenComplete((result, error) -> { client.submitter().submit(new GroupCommands.Submit(member, name, taskId, task, options.getRoutingStrategy(), options.getFailoverStrategy())).whenComplete((result, error) -> {
if (error != null) { if (error != null) {
CompletableFuture<Void> taskFuture = taskFutures.remove(taskId); CompletableFuture<Void> taskFuture = taskFutures.remove(taskId);
if (taskFuture != null) { if (taskFuture != null) {
Expand Down
Expand Up @@ -59,7 +59,7 @@ public <T> AbstractTaskConsumer<T> consumer(String name, TaskConsumer.Options op
synchronized (consumers) { synchronized (consumers) {
consumer = consumers.get(name); consumer = consumers.get(name);
if (consumer == null) { if (consumer == null) {
consumer = createConsumer(name, options); consumer = createConsumer(name, options != null ? options : new TaskConsumer.Options());
consumers.put(name, consumer); consumers.put(name, consumer);
} }
} }
Expand Down

0 comments on commit 84ba414

Please sign in to comment.