Skip to content

Commit

Permalink
Update state log resource to support partitioning.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Apr 17, 2015
1 parent 6db86b4 commit 7dded8a
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 138 deletions.
154 changes: 18 additions & 136 deletions resources/state-log/src/main/java/net/kuujo/copycat/state/StateLog.java
@@ -1,5 +1,5 @@
/* /*
* Copyright 2014 the original author or authors. * Copyright 2015 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
Expand All @@ -15,72 +15,35 @@
*/ */
package net.kuujo.copycat.state; package net.kuujo.copycat.state;


import net.kuujo.copycat.CopycatException;
import net.kuujo.copycat.cluster.ClusterConfig; import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.io.Buffer;
import net.kuujo.copycat.io.HeapBufferPool;
import net.kuujo.copycat.io.util.HashFunctions;
import net.kuujo.copycat.io.util.ReferencePool;
import net.kuujo.copycat.raft.Consistency; import net.kuujo.copycat.raft.Consistency;
import net.kuujo.copycat.resource.ResourceContext; import net.kuujo.copycat.resource.PartitionContext;
import net.kuujo.copycat.resource.internal.AbstractResource; import net.kuujo.copycat.resource.internal.AbstractPartitionedResource;
import net.kuujo.copycat.util.concurrent.Futures;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;


/** /**
* Copycat state log. * Partitioned state log.
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public class StateLog<K, V> extends AbstractResource<StateLog<K, V>> { public class StateLog<K, V> extends AbstractPartitionedResource<StateLog<K, V>, StateLogPartition<K, V>> {

/**
* Creates a new state log with the given cluster and state log configurations.
*
* @param config The state log configuration.
* @param cluster The cluster configuration.
* @return A new state log instance.
*/
public static <K, V> StateLog<K, V> create(StateLogConfig config, ClusterConfig cluster) {
return new StateLog<>(config, cluster);
}

/**
* Creates a new state log with the given cluster and state log configurations.
*
* @param config The state log configuration.
* @param cluster The cluster configuration.
* @param executor An executor on which to execute state log callbacks.
* @return A new state log instance.
*/
public static <K, V> StateLog<K, V> create(StateLogConfig config, ClusterConfig cluster, Executor executor) {
return new StateLog<>(config, cluster, executor);
}

private static final Logger LOGGER = LoggerFactory.getLogger(StateLog.class); private static final Logger LOGGER = LoggerFactory.getLogger(StateLog.class);
private final ReferencePool<Buffer> operationPool = new HeapBufferPool();
private final Map<Long, OperationInfo> operations = new ConcurrentHashMap<>(128);
private final Map<String, Long> hashMap = new ConcurrentHashMap<>(128);
private final Consistency defaultConsistency;


public StateLog(StateLogConfig config, ClusterConfig cluster) { public StateLog(StateLogConfig config, ClusterConfig cluster) {
this(new ResourceContext(config, cluster)); super(config, cluster);
} }


public StateLog(StateLogConfig config, ClusterConfig cluster, Executor executor) { public StateLog(StateLogConfig config, ClusterConfig cluster, Executor executor) {
this(new ResourceContext(config, cluster, executor)); super(config, cluster, executor);
} }


public StateLog(ResourceContext context) { @Override
super(context); protected StateLogPartition<K, V> createPartition(PartitionContext context) {
defaultConsistency = context.<StateLogConfig>config().getDefaultConsistency(); return new StateLogPartition<>(context);
context.commitHandler(this::commit);
} }


/** /**
Expand All @@ -95,12 +58,8 @@ public StateLog(ResourceContext context) {
public StateLog<K, V> register(String name, Command.Type type, Command<? extends K, ? extends V, ?> command) { public StateLog<K, V> register(String name, Command.Type type, Command<? extends K, ? extends V, ?> command) {
if (!isClosed()) if (!isClosed())
throw new IllegalStateException("cannot register command on open state log"); throw new IllegalStateException("cannot register command on open state log");
if (type == Command.Type.READ) { partitions.forEach(p -> p.register(name, type, command));
operations.put(HashFunctions.CITYHASH.hash64(name.getBytes()), new OperationInfo((Command) command, type, defaultConsistency)); LOGGER.debug("{} - Registered state log command {}", this.name, name);
} else {
operations.put(HashFunctions.CITYHASH.hash64(name.getBytes()), new OperationInfo((Command) command, type));
}
LOGGER.debug("{} - Registered state log command {}", context.name(), name);
return this; return this;
} }


Expand All @@ -117,14 +76,8 @@ public StateLog<K, V> register(String name, Command.Type type, Command<? extends
public StateLog<K, V> register(String name, Command.Type type, Command<? extends K, ? extends V, ?> command, Consistency consistency) { public StateLog<K, V> register(String name, Command.Type type, Command<? extends K, ? extends V, ?> command, Consistency consistency) {
if (!isClosed()) if (!isClosed())
throw new IllegalStateException("cannot register command on open state log"); throw new IllegalStateException("cannot register command on open state log");
if (type == Command.Type.READ) { partitions.forEach(p -> p.register(name, type, command, consistency));
operations.put(HashFunctions.CITYHASH.hash64(name.getBytes()), new OperationInfo((Command) command, type, consistency)); LOGGER.debug("{} - Registered state log command {}", this.name, name);
} else {
if (consistency != null && consistency != Consistency.STRONG)
throw new IllegalArgumentException("consistency level STRONG is required for write and delete commands");
operations.put(HashFunctions.CITYHASH.hash64(name.getBytes()), new OperationInfo((Command) command, type));
}
LOGGER.debug("{} - Registered state log command {}", context.name(), name);
return this; return this;
} }


Expand All @@ -137,10 +90,8 @@ public StateLog<K, V> register(String name, Command.Type type, Command<? extends
public StateLog<K, V> unregister(String name) { public StateLog<K, V> unregister(String name) {
if (!isClosed()) if (!isClosed())
throw new IllegalStateException("cannot unregister command on open state log"); throw new IllegalStateException("cannot unregister command on open state log");
OperationInfo info = operations.remove(HashFunctions.CITYHASH.hash64(name.getBytes())); partitions.forEach(p -> p.unregister(name));
if (info != null) { LOGGER.debug("{} - Unregistered state log command {}", this.name, name);
LOGGER.debug("{} - Unregistered state log command {}", context.name(), name);
}
return this; return this;
} }


Expand Down Expand Up @@ -169,81 +120,12 @@ public <U> CompletableFuture<U> submit(String command, V entry) {
public <U> CompletableFuture<U> submit(String command, K key, V entry) { public <U> CompletableFuture<U> submit(String command, K key, V entry) {
if (!isOpen()) if (!isOpen())
throw new IllegalStateException("state log not open"); throw new IllegalStateException("state log not open");
OperationInfo<K, V, U> operationInfo = operations.get(HashFunctions.CITYHASH.hash64(command.getBytes())); return partition(key).submit(command, key, entry);
if (operationInfo == null) {
return Futures.exceptionalFutureAsync(new CopycatException(String.format("Invalid state log command %s", command)), context.executor());
}

// If this is a read-only command, check if the command is consistent. For consistent operations,
// queries are forwarded to the current cluster leader for evaluation. Otherwise, it's safe to
// read stale data from the local node.
Buffer keyBuffer = null;
if (key != null) {
keyBuffer = operationPool.acquire();
serializer.writeObject(key, keyBuffer);
keyBuffer.flip();
}

Buffer entryBuffer = operationPool.acquire().writeLong(hashMap.computeIfAbsent(command, c -> HashFunctions.CITYHASH.hash64(c.getBytes())));
serializer.writeObject(entry, entryBuffer);
entryBuffer.flip();

if (operationInfo.type == Command.Type.READ) {
LOGGER.debug("{} - Submitting read command {} with entry {}", context.name(), command, entry);
return context.read(keyBuffer, entryBuffer, operationInfo.consistency)
.thenApplyAsync(serializer::readObject, context.executor());
} else {
LOGGER.debug("{} - Submitting write command {} with entry {}", context.name(), command, entry);
return context.write(keyBuffer, entryBuffer)
.thenApplyAsync(serializer::readObject, context.executor());
}
}

/**
* Consumes a log entry.
*
* @param term The entry term.
* @param index The entry index.
* @param key The entry key.
* @param entry The log entry.
* @return The entry output.
*/
@SuppressWarnings({"unchecked"})
private Buffer commit(long term, long index, Buffer key, Buffer entry) {
long commandCode = entry.readLong();
OperationInfo operationInfo = operations.get(commandCode);
if (operationInfo != null) {
return serializer.writeObject(operationInfo.execute(term, index, serializer.readObject(key), serializer.readObject(entry.slice())));
}
throw new IllegalStateException("Invalid state log operation");
} }


@Override @Override
public String toString() { public String toString() {
return String.format("%s[name=%s]", getClass().getSimpleName(), context.name()); return String.format("%s[name=%s]", getClass().getSimpleName(), name);
}

/**
* State command info.
*/
private class OperationInfo<K, V, U> {
private final Command<K, V, U> command;
private final Command.Type type;
private final Consistency consistency;

private OperationInfo(Command<K, V, U> command, Command.Type type) {
this(command, type, Consistency.DEFAULT);
}

private OperationInfo(Command<K, V, U> command, Command.Type type, Consistency consistency) {
this.command = command;
this.type = type;
this.consistency = consistency;
}

private U execute(long term, long index, K key, V entry) {
return command.apply(key, entry);
}
} }


} }
Expand Up @@ -16,21 +16,31 @@
package net.kuujo.copycat.state; package net.kuujo.copycat.state;


import net.kuujo.copycat.raft.Consistency; import net.kuujo.copycat.raft.Consistency;
import net.kuujo.copycat.resource.PartitionedResourceConfig;
import net.kuujo.copycat.resource.ResourceConfig; import net.kuujo.copycat.resource.ResourceConfig;


/** /**
* State log configuration. * State log configuration.
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public class StateLogConfig extends ResourceConfig<StateLogConfig> { public class StateLogConfig extends PartitionedResourceConfig<StateLogConfig> {
private Consistency defaultConsistency = Consistency.DEFAULT; private Consistency defaultConsistency = Consistency.DEFAULT;


public StateLogConfig() { public StateLogConfig() {
} }


public StateLogConfig(ResourceConfig<?> config) {
super(config);
}

public StateLogConfig(PartitionedResourceConfig<?> config) {
super(config);
}

protected StateLogConfig(StateLogConfig config) { protected StateLogConfig(StateLogConfig config) {
super(config); super(config);
this.defaultConsistency = config.defaultConsistency;
} }


@Override @Override
Expand Down

0 comments on commit 7dded8a

Please sign in to comment.