Skip to content

Commit

Permalink
Refactor DistributedGroup packaging.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Mar 23, 2016
1 parent 61306dd commit 46cac6e
Show file tree
Hide file tree
Showing 39 changed files with 862 additions and 358 deletions.
4 changes: 2 additions & 2 deletions core/src/test/java/io/atomix/AtomixGroupTest.java
Expand Up @@ -16,7 +16,7 @@
package io.atomix;

import io.atomix.group.DistributedGroup;
import io.atomix.group.LocalGroupMember;
import io.atomix.group.LocalMember;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -54,7 +54,7 @@ private void testGroup(Atomix client1, Atomix client2, Function<Atomix, Distribu
DistributedGroup group1 = factory.apply(client1);
DistributedGroup group2 = factory.apply(client2);

LocalGroupMember localMember = group2.join().get(5, TimeUnit.SECONDS);
LocalMember localMember = group2.join().get(5, TimeUnit.SECONDS);
assertEquals(group2.members().size(), 1);

group1.join().thenRun(() -> {
Expand Down
Expand Up @@ -21,7 +21,7 @@
import io.atomix.catalyst.transport.NettyTransport;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.group.DistributedGroup;
import io.atomix.group.LocalGroupMember;
import io.atomix.group.LocalMember;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -65,7 +65,7 @@ public static void main(String[] args) throws Exception {
DistributedGroup group = atomix.getGroup("group").get();

// Join the group.
LocalGroupMember member = group.join().get();
LocalMember member = group.join().get();

// Register a callback to be called when the local member is elected the leader.
group.election().onElection(leader -> {
Expand Down
102 changes: 56 additions & 46 deletions group/src/main/java/io/atomix/group/DistributedGroup.java

Large diffs are not rendered by default.

46 changes: 46 additions & 0 deletions group/src/main/java/io/atomix/group/GroupController.java
@@ -0,0 +1,46 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package io.atomix.group;

/**
* Controls events for an object.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public interface GroupController {

/**
* Returns the underlying group.
*
* @return The underlying group.
*/
DistributedGroup group();

/**
* Called when a member joins the group.
*
* @param member The member that joined the group.
*/
void onJoin(GroupMember member);

/**
* Called when a member leaves the group.
*
* @param member The member that left the group.
*/
void onLeave(GroupMember member);

}
58 changes: 31 additions & 27 deletions group/src/main/java/io/atomix/group/GroupMember.java
Expand Up @@ -17,6 +17,10 @@

import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.util.Assert;
import io.atomix.group.connection.Connection;
import io.atomix.group.tasks.MemberTaskQueue;
import io.atomix.group.tasks.TaskQueueController;
import io.atomix.group.util.Submitter;

/**
* A {@link DistributedGroup} member representing a member of the group controlled by a local
Expand All @@ -30,37 +34,17 @@ public class GroupMember {
protected final Address address;
protected final MembershipGroup group;
private final GroupProperties properties;
private final MemberTaskQueue tasks;
private final MemberConnection connection;
final TaskQueueController tasks;
private final Connection connection;

GroupMember(GroupMemberInfo info, MembershipGroup group) {
GroupMember(GroupMemberInfo info, MembershipGroup group, Submitter submitter) {
this.index = info.index();
this.memberId = info.memberId();
this.address = info.address();
this.group = Assert.notNull(group, "group");
this.properties = new GroupProperties(memberId, group);
this.tasks = new MemberTaskQueue(memberId, group);
this.connection = new MemberConnection(memberId, address, group.connections);
}

/**
* Returns the member index.
*
* @return The member index.
*/
long index() {
return index;
}

/**
* Updates the member index.
*
* @param index The updated member index.
* @return The group member.
*/
GroupMember setIndex(long index) {
this.index = index;
return this;
this.tasks = new TaskQueueController(new MemberTaskQueue(memberId, group, submitter));
this.connection = new Connection(memberId, address, group.connections);
}

/**
Expand All @@ -84,6 +68,26 @@ public Address address() {
return address;
}

/**
* Returns the member version.
*
* @return The member version.
*/
public long version() {
return index;
}

/**
* Updates the member index.
*
* @param index The updated member index.
* @return The group member.
*/
GroupMember setIndex(long index) {
this.index = index;
return this;
}

/**
* Returns the member properties.
*
Expand All @@ -98,7 +102,7 @@ public GroupProperties properties() {
*
* @return A direct connection to the member.
*/
public MemberConnection connection() {
public Connection connection() {
return connection;
}

Expand All @@ -108,7 +112,7 @@ public MemberConnection connection() {
* @return The member's task queue.
*/
public MemberTaskQueue tasks() {
return tasks;
return (MemberTaskQueue) tasks.queue();
}

@Override
Expand Down
Expand Up @@ -15,7 +15,12 @@
*/
package io.atomix.group;

import io.atomix.group.connection.ConnectionController;
import io.atomix.group.connection.LocalConnection;
import io.atomix.group.state.GroupCommands;
import io.atomix.group.tasks.LocalTaskQueue;
import io.atomix.group.tasks.TaskQueueController;
import io.atomix.group.util.Submitter;

import java.util.concurrent.CompletableFuture;

Expand All @@ -39,24 +44,24 @@
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public class LocalGroupMember extends GroupMember {
private final LocalMemberTaskQueue tasks;
private final LocalMemberConnection connection;
public class LocalMember extends GroupMember {
final TaskQueueController tasks;
final ConnectionController connection;

LocalGroupMember(GroupMemberInfo info, MembershipGroup group) {
super(info, group);
this.tasks = new LocalMemberTaskQueue(info.memberId(), group);
this.connection = new LocalMemberConnection(info.memberId(), info.address(), group.connections);
LocalMember(GroupMemberInfo info, MembershipGroup group, Submitter submitter) {
super(info, group, submitter);
this.tasks = new TaskQueueController(new LocalTaskQueue(info.memberId(), group, submitter));
this.connection = new ConnectionController(new LocalConnection(info.memberId(), info.address(), group.connections));
}

@Override
public LocalMemberTaskQueue tasks() {
return tasks;
public LocalTaskQueue tasks() {
return (LocalTaskQueue) tasks.queue();
}

@Override
public LocalMemberConnection connection() {
return connection;
public LocalConnection connection() {
return connection.connection();
}

/**
Expand Down

0 comments on commit 46cac6e

Please sign in to comment.