Skip to content

Commit

Permalink
Add multimap configuration for configuring map value order.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Feb 19, 2016
1 parent f4aad7f commit 51a7718
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 13 deletions.
Expand Up @@ -52,6 +52,54 @@ public static Config config() {
return new Config();
}

/**
* Multimap configuration.
*/
public static class Config extends Resource.Config {

/**
* Sets the map value order.
*
* @param order The map value order.
* @return The map configuration.
*/
public Config withValueOrder(Order order) {
setProperty("order", order.name().toLowerCase());
return this;
}

/**
* Returns the map value order.
*
* @return The map value order.
*/
public Order getValueOrder() {
return Order.valueOf(getProperty("order", Order.INSERT.name().toLowerCase()).toUpperCase());
}
}

/**
* Map value order.
*/
public enum Order {

/**
* Indicates that values should be stored in natural order.
*/
NATURAL,

/**
* Indicates that values should be stored in insertion order.
*/
INSERT,

/**
* Indicates that no order is required for values.
*/
NONE

}

public DistributedMultiMap(CopycatClient client, Resource.Options options) {
super(client, options);
}
Expand Down
Expand Up @@ -32,11 +32,53 @@
public class MultiMapState extends ResourceStateMachine {
private final Map<Object, Map<Object, Commit<? extends MultiMapCommands.TtlCommand>>> map = new HashMap<>();
private final Map<Long, Scheduled> timers = new HashMap<>();
private DistributedMultiMap.Order order = DistributedMultiMap.Order.INSERT;

public MultiMapState() {
super(new ResourceType(DistributedMultiMap.class));
}

@Override
public void configure(Properties config) {
this.order = DistributedMultiMap.Order.valueOf(config.getProperty("order", DistributedMultiMap.Order.INSERT.name().toLowerCase()).toUpperCase());
Set<Object> keys = new HashSet<>(map.keySet());
for (Object key : keys) {
map.put(key, createValueMap(map.get(key)));
}
}

/**
* Creates a new value map.
*/
private Map<Object, Commit<? extends MultiMapCommands.TtlCommand>> createValueMap() {
switch (order) {
case NONE:
return new HashMap<>();
case NATURAL:
return new TreeMap<>();
case INSERT:
return new LinkedHashMap<>();
default:
return new HashMap<>();
}
}

/**
* Creates a new value map.
*/
private Map<Object, Commit<? extends MultiMapCommands.TtlCommand>> createValueMap(Map<Object, Commit<? extends MultiMapCommands.TtlCommand>> map) {
switch (order) {
case NONE:
return new HashMap<>(map);
case NATURAL:
return new TreeMap<>(map);
case INSERT:
return new LinkedHashMap<>(map);
default:
return new HashMap<>(map);
}
}

/**
* Handles a contains key commit.
*/
Expand Down Expand Up @@ -75,7 +117,7 @@ public boolean put(Commit<MultiMapCommands.Put> commit) {
try {
Map<Object, Commit<? extends MultiMapCommands.TtlCommand>> values = map.get(commit.operation().key());
if (values == null) {
values = new LinkedHashMap<>();
values = createValueMap();
map.put(commit.operation().key(), values);
}

Expand Down Expand Up @@ -125,7 +167,7 @@ public Object remove(Commit<MultiMapCommands.Remove> commit) {
} else {
Map<Object, Commit<? extends MultiMapCommands.TtlCommand>> values = map.remove(commit.operation().key());
if (values != null) {
Collection<Object> results = new ArrayList<>();
Collection<Object> results = new ArrayList<>(values.size());
for (Commit<? extends MultiMapCommands.TtlCommand> value : values.values()) {
Scheduled timer = timers.remove(value.index());
if (timer != null)
Expand Down
Expand Up @@ -15,10 +15,11 @@
*/
package io.atomix.collections;

import io.atomix.resource.ResourceType;
import io.atomix.testing.AbstractCopycatTest;
import org.testng.annotations.Test;

import java.util.Iterator;

/**
* Distributed multi map test.
*
Expand Down Expand Up @@ -74,7 +75,7 @@ public void testMapPutGetRemove() throws Throwable {
public void testMultiMapClear() throws Throwable {
createServers(3);

DistributedMultiMap<String, String> map = createResource();
DistributedMultiMap<String, String> map = createResource(DistributedMultiMap.config().withValueOrder(DistributedMultiMap.Order.NATURAL));

map.put("foo", "Hello world!").thenRun(this::resume);
map.put("foo", "Hello world again!").thenRun(this::resume);
Expand All @@ -101,4 +102,27 @@ public void testMultiMapClear() throws Throwable {
await(10000);
}

/**
* Tests operating on a map with naturally ordered values.
*/
public void testNaturalOrder() throws Throwable {
createServers(3);

DistributedMultiMap.Config config = DistributedMultiMap.config()
.withValueOrder(DistributedMultiMap.Order.NATURAL);
DistributedMultiMap<String, String> map = createResource(config);

map.put("foo", "foo").thenRun(this::resume);
map.put("foo", "bar").thenRun(this::resume);
await(10000, 2);

map.get("foo").thenAccept(results -> {
Iterator<String> iterator = results.iterator();
threadAssertEquals(iterator.next(), "bar");
threadAssertEquals(iterator.next(), "foo");
resume();
});
await(10000);
}

}
Expand Up @@ -27,15 +27,16 @@
import io.atomix.copycat.server.StateMachineExecutor;
import io.atomix.copycat.server.session.SessionListener;

import java.util.Properties;

/**
* Base resource state machine.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public abstract class ResourceStateMachine<T extends Resource.Config> extends StateMachine implements SessionListener {
public abstract class ResourceStateMachine extends StateMachine implements SessionListener {
private final ResourceType type;
private Commit<ConfigureCommand> configureCommit;
protected T config;

protected ResourceStateMachine(ResourceType type) {
this.type = Assert.notNull(type, "type");
Expand Down Expand Up @@ -67,16 +68,15 @@ private void configure(Commit<ConfigureCommand> commit) {
if (configureCommit != null)
configureCommit.close();
configureCommit = commit;
configure((T) configureCommit.operation().config());
configure(configureCommit.operation().config());
}

/**
* Configures the resource.
*
* @param config The resource configuration.
*/
public void configure(T config) {
this.config = config;
public void configure(Properties config) {
}

@Override
Expand Down Expand Up @@ -116,9 +116,12 @@ public void delete() {
* Resource configure command.
*/
public static class ConfigureCommand implements Command<Void>, CatalystSerializable {
private Resource.Config config;
private Properties config;

public ConfigureCommand() {
}

public ConfigureCommand(Resource.Config config) {
public ConfigureCommand(Properties config) {
this.config = Assert.notNull(config, "config");
}

Expand All @@ -127,7 +130,7 @@ public ConfigureCommand(Resource.Config config) {
*
* @return The resource configuration.
*/
public Resource.Config config() {
public Properties config() {
return config;
}

Expand Down
18 changes: 17 additions & 1 deletion testing/src/main/java/io/atomix/testing/AbstractCopycatTest.java
Expand Up @@ -96,11 +96,25 @@ protected T createResource() throws Throwable {
return createResource(new Resource.Options());
}

/**
* Creates a new resource instance.
*/
protected T createResource(Resource.Config config) throws Throwable {
return createResource(config, new Resource.Options());
}

/**
* Creates a new resource instance.
*/
protected T createResource(Resource.Options options) throws Throwable {
return createResource(new Resource.Config(), options);
}

/**
* Creates a new resource instance.
*/
@SuppressWarnings("unchecked")
protected T createResource(Resource.Options options) throws Throwable {
protected T createResource(Resource.Config config, Resource.Options options) throws Throwable {
CopycatClient client = CopycatClient.builder(members)
.withTransport(new LocalTransport(registry))
.withServerSelectionStrategy(ServerSelectionStrategies.ANY)
Expand All @@ -113,6 +127,8 @@ protected T createResource(Resource.Options options) throws Throwable {
resource.open().thenRun(this::resume);
resources.add(resource);
await(10000);
resource.configure(config).thenRun(this::resume);
await(10000);
return resource;
}

Expand Down

0 comments on commit 51a7718

Please sign in to comment.