Skip to content

Commit

Permalink
Improve DGroup documentation.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Mar 10, 2016
1 parent 6c65d57 commit 72cdc45
Show file tree
Hide file tree
Showing 10 changed files with 419 additions and 59 deletions.
15 changes: 13 additions & 2 deletions group/src/main/java/io/atomix/group/ConsistentHashGroup.java
Expand Up @@ -26,7 +26,12 @@
import java.util.function.Consumer;

/**
* {@link DistributedGroup} that consistently maps keys to members on a ring.
* {@link DistributedGroup} that consistently maps values to members on a ring.
* <p>
* Consistent hash groups place members of the parent {@link DistributedGroup} on a ring. Each member is hashed to
* a point on the ring, and {@code n} virtual nodes for each concrete node are created to reduce hotspotting. When
* accessing a member {@link #member(Object) by value}, the value is hashed to a point on the ring and the first
* member following that point is returned.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
Expand Down Expand Up @@ -54,14 +59,20 @@ static int hashCode(int parent, Hasher hasher, int virtualNodes) {
for (GroupMember member : members) {
this.members.put(member.id(), member);
hashRing.addMember(member);
election.onJoin(member);
}
}

/**
* Returns the member associated with the given value.
* <p>
* The given value's {@link Object#hashCode() hashCode} will be hashed to a point on the consistent hash ring.
* The first {@link GroupMember} following that point on the ring will be returned. If there are no members in
* the parent {@link DistributedGroup} then {@code null} will be returned.
*
* @param value The value for which to return the associated member.
* @return The associated group member.
* @return The associated group member or {@code null} if the parent {@link DistributedGroup} is empty.
* @throws NullPointerException if the value is {@code null}
*/
public synchronized GroupMember member(Object value) {
return hashRing.member(intToByteArray(value.hashCode()));
Expand Down
334 changes: 300 additions & 34 deletions group/src/main/java/io/atomix/group/DistributedGroup.java

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions group/src/main/java/io/atomix/group/GroupElection.java
Expand Up @@ -41,8 +41,8 @@
* {@code
* DistributedGroup group = atomix.getGroup("election-group").get();
*
* group.onElection(leader -> {
* leader.connection().send("hi!");
* group.onElection(term -> {
* term.leader().connection().send("hi!");
* });
* }
* </pre>
Expand Down
1 change: 1 addition & 0 deletions group/src/main/java/io/atomix/group/GroupPartition.java
Expand Up @@ -53,6 +53,7 @@ static int hashCode(int parent, int partition) {
this.sortedMembers = members;
for (GroupMember member : members) {
this.members.put(member.id(), member);
election.onJoin(member);
}
this.partition = partition;
}
Expand Down
4 changes: 2 additions & 2 deletions group/src/main/java/io/atomix/group/GroupPartitions.java
Expand Up @@ -41,7 +41,7 @@ public class GroupPartitions implements Iterable<GroupPartition> {
* @return The group partition.
* @throws IndexOutOfBoundsException if the given {@code partitionId} is greater than the range of partitions in the group
*/
public GroupPartition partition(int partitionId) {
public GroupPartition get(int partitionId) {
return partitions.get(partitionId);
}

Expand All @@ -51,7 +51,7 @@ public GroupPartition partition(int partitionId) {
* @param value The value for which to return a partition.
* @return The partition for the given value or {@code null} if the value was not mapped to any partition.
*/
public GroupPartition partition(Object value) {
public GroupPartition get(Object value) {
int partitionId = partitioner.partition(value, partitions.size());
return partitionId != -1 ? partitions.get(partitionId) : null;
}
Expand Down
43 changes: 37 additions & 6 deletions group/src/main/java/io/atomix/group/GroupTask.java
Expand Up @@ -23,7 +23,24 @@
import java.util.concurrent.CompletableFuture;

/**
* Group task.
* Represents a reliable task received by a member to be processed and acknowledged.
* <p>
* Tasks are {@link GroupTaskQueue#submit(Object) submitted} by {@link DistributedGroup} users to any member of a group.
* Tasks are replicated and persisted within the Atomix cluster before being pushed to clients on a queue. Once a task
* is received by a task listener, the task may be processed asynchronously and either {@link #ack() acknowledged} or
* {@link #fail() failed} once processing is complete.
* <pre>
* {@code
* DistributedGroup group = atomix.getGroup("task-group").get();
* group.join().thenAccept(member -> {
* member.tasks().onTask(task -> {
* processTask(task).thenRun(() -> {
* task.ack();
* });
* });
* });
* }
* </pre>
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
Expand All @@ -48,9 +65,12 @@ GroupTask<T> setFuture(CompletableFuture<Boolean> future) {
}

/**
* Returns the monotonically increasing task ID.
* Returns the task ID.
* <p>
* The task ID is guaranteed to be unique and monotonically increasing within a given task queue. Tasks received
* across members are not associated with one another.
*
* @return The unique task ID.
* @return The monotonically increasing task ID.
*/
public long id() {
return id;
Expand All @@ -66,23 +86,34 @@ String member() {
}

/**
* Returns the value of the task.
* Returns the task value.
* <p>
* This is the value that was {@link GroupTaskQueue#submit(Object) submitted} by the sending process.
*
* @return The value of the task.
* @return The task value.
*/
public T value() {
public T task() {
return value;
}

/**
* Acknowledges completion of the task.
* <p>
* Once a task is acknowledged, an ack will be sent back to the process that submitted the task. Acknowledging
* completion of a task does not guarantee that the sender will learn of the acknowledgement. The acknowledgement
* itself may fail to reach the cluster or the sender may crash before the acknowledgement can be received.
* Acks serve only as positive acknowledgement, but the lack of an ack does not indicate failure.
*/
public void ack() {
future.complete(true);
}

/**
* Fails processing of the task.
* <p>
* Once a task is failed, a failure message will be sent back to the process that submitted the task for processing.
* Failing a task does not guarantee that the sender will learn of the failure. The process that submitted the task
* may itself fail.
*/
public void fail() {
future.complete(false);
Expand Down
18 changes: 17 additions & 1 deletion group/src/main/java/io/atomix/group/MembershipGroup.java
Expand Up @@ -36,7 +36,23 @@
import java.util.function.Consumer;

/**
* Distributed membership group.
* Base {@link DistributedGroup} implementation which manages a membership set for the group
* and all {@link SubGroup}s.
* <p>
* The membership group is the base {@link DistributedGroup} type which is created when a new group
* is created via the Atomix API.
* <pre>
* {@code
* DistributedGroup group = atomix.getGroup("foo").get();
* }
* </pre>
* The membership group controls the set of members available within the group and all {@link SubGroup}s.
* When a membership change occurs within the group, the membership group will update its state and
* the state of all subgroups.
* <p>
* Subgroups created by the membership group via either {@link #hash()} or {@link #partition(int)} will
* inherit the membership group's {@link GroupProperties properties} and members. However, subgroups
* may filter members according to their requirements.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
Expand Down
9 changes: 5 additions & 4 deletions group/src/main/java/io/atomix/group/PartitionGroup.java
Expand Up @@ -54,6 +54,7 @@ static int hashCode(int parent, int partitions, int replicationFactor, GroupPart
this.hashRing = new GroupHashRing(new Murmur2Hasher(), 100, replicationFactor);
for (GroupMember member : members) {
hashRing.addMember(member);
election.onJoin(member);
}

List<GroupPartition> partitions = new ArrayList<>(numPartitions);
Expand Down Expand Up @@ -217,7 +218,7 @@ private void migratePartitionMembers(List<List<GroupMember>> oldPartitions, List
if (!migratedMembers.contains(oldMember)) {
for (GroupMember newMember : newPartitionMembers) {
if (!migratedMembers.contains(newMember)) {
migrations.add(new GroupPartitionMigration(oldMember, newMember, partitions.partition(i)));
migrations.add(new GroupPartitionMigration(oldMember, newMember, partitions.get(i)));
migratedMembers.add(oldMember);
migratedMembers.add(newMember);
}
Expand All @@ -228,7 +229,7 @@ private void migratePartitionMembers(List<List<GroupMember>> oldPartitions, List
// Determine the members present in old partition members but not in new.
for (GroupMember oldMember : oldPartitionMembers) {
if (!migratedMembers.contains(oldMember)) {
migrations.add(new GroupPartitionMigration(oldMember, null, partitions.partition(i)));
migrations.add(new GroupPartitionMigration(oldMember, null, partitions.get(i)));
migratedMembers.add(oldMember);
}
}
Expand All @@ -237,13 +238,13 @@ private void migratePartitionMembers(List<List<GroupMember>> oldPartitions, List
for (GroupMember newMember : newPartitionMembers) {
if (!migratedMembers.contains(newMember) && !migratedMembers.contains(newMember)) {
migratedMembers.add(newMember);
migrations.add(new GroupPartitionMigration(null, newMember, partitions.partition(i)));
migrations.add(new GroupPartitionMigration(null, newMember, partitions.get(i)));
}
}
}

// Update the partition members and trigger migration callbacks.
partitions.partition(i).handleRepartition(newPartitions.get(i));
partitions.get(i).handleRepartition(newPartitions.get(i));
for (GroupPartitionMigration migration : migrations) {
migrationListeners.accept(migration);
migration.partition().handleMigration(migration);
Expand Down
15 changes: 14 additions & 1 deletion group/src/main/java/io/atomix/group/SubGroup.java
Expand Up @@ -31,7 +31,20 @@
import java.util.function.Consumer;

/**
* Abstract distributed group.
* Base class for subgroups of {@link DistributedGroup}.
* <p>
* {@link DistributedGroup} can be partitioned into subgroups that can be nested to any depth. This allows groups
* to be partitioned multiple times to facilitate replication algorithms. Subgroups are guaranteed to be consistent
* across all nodes in a cluster. For example, in a {@link MembershipGroup} partitioned into a {@link PartitionGroup}
* with {@code 3} partitions, each {@link GroupPartition} will represent the same members on all nodes in the cluster.
* Changes to groups and subgroups are guaranteed to occur in the same order on all nodes.
* <p>
* Subgroups inherit a number of attributes of their parent group. When a group is {@link #partition(int) partitioned}
* into a subgroup, the subgroup will inherit the {@link #members() membership} list of the parent group but may
* represent only a subset of those members. Changes in the set of members in a parent group will be immediately
* reflected in all subgroups. Subgroups inherit the {@link GroupProperties properties}, {@link GroupTaskQueue tasks},
* {@link io.atomix.group.DistributedGroup.Config configuration}, and {@link io.atomix.group.DistributedGroup.Options options}
* of the base {@link MembershipGroup}.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
Expand Down
35 changes: 28 additions & 7 deletions group/src/test/java/io/atomix/group/DistributedGroupTest.java
Expand Up @@ -212,6 +212,27 @@ public void testElectClose() throws Throwable {
await(10000, 2);
}

/**
* Tests electing a leader in a subgroup.
*/
public void testSubGroupElect() throws Throwable {
createServers(3);

DistributedGroup group1 = createResource();
DistributedGroup group2 = createResource();

LocalGroupMember localMember2 = group2.join().get();
assertEquals(group2.members().size(), 1);
assertEquals(group2.election().term().leader(), localMember2);

LocalGroupMember localMember1 = group1.join().get();
assertEquals(group1.partition(3).partitions().get(0).election().term().leader(), localMember1);

group2.close().thenRun(this::resume);

await(10000);
}

/**
* Tests setting and getting member properties.
*/
Expand Down Expand Up @@ -345,7 +366,7 @@ public void testPartitionMessage() throws Throwable {
});

PartitionGroup partitions = group1.partition(3);
partitions.partitions().partition("Hello world!").members().iterator().next().connection().send("test", "Hello world!").thenAccept(reply -> {
partitions.partitions().get("Hello world!").members().iterator().next().connection().send("test", "Hello world!").thenAccept(reply -> {
threadAssertEquals(reply, "Hello world back!");
resume();
});
Expand All @@ -364,7 +385,7 @@ public void testPartitionMigration() throws Throwable {
LocalGroupMember member1 = group1.join().get(10, TimeUnit.SECONDS);

PartitionGroup partitions = group2.partition(3, 3);
partitions.partitions().partition(0).onMigration(migration -> {
partitions.partitions().get(0).onMigration(migration -> {
threadAssertEquals(migration.source().id(), member1.id());
threadAssertNotNull(migration.target());
resume();
Expand Down Expand Up @@ -430,7 +451,7 @@ public void testDirectTask() throws Throwable {
assertEquals(group2.members().size(), 1);

member.tasks().onTask(task -> {
threadAssertEquals(task.value(), "Hello world!");
threadAssertEquals(task.task(), "Hello world!");
task.ack();
resume();
});
Expand All @@ -453,7 +474,7 @@ public void testDirectTaskFail() throws Throwable {
assertEquals(group2.members().size(), 1);

member.tasks().onTask(task -> {
threadAssertEquals(task.value(), "Hello world!");
threadAssertEquals(task.task(), "Hello world!");
task.fail();
resume();
});
Expand Down Expand Up @@ -507,17 +528,17 @@ public void testAllTask() throws Throwable {
assertEquals(group2.members().size(), 3);

member1.tasks().onTask(task -> {
threadAssertEquals(task.value(), "Hello world!");
threadAssertEquals(task.task(), "Hello world!");
task.ack();
resume();
});
member2.tasks().onTask(task -> {
threadAssertEquals(task.value(), "Hello world!");
threadAssertEquals(task.task(), "Hello world!");
task.ack();
resume();
});
member3.tasks().onTask(task -> {
threadAssertEquals(task.value(), "Hello world!");
threadAssertEquals(task.task(), "Hello world!");
task.ack();
resume();
});
Expand Down

0 comments on commit 72cdc45

Please sign in to comment.