Skip to content

Commit

Permalink
Remove internal handling of resource partitions. Partitions can be re…
Browse files Browse the repository at this point in the history
…plicated by simply creating multiple instances of the same resource.
  • Loading branch information
kuujo committed Jan 9, 2015
1 parent 2eef02e commit 8b16c75
Show file tree
Hide file tree
Showing 50 changed files with 783 additions and 1,938 deletions.
179 changes: 94 additions & 85 deletions README.md
Expand Up @@ -49,6 +49,10 @@ taking place on Copycat**
1. [Resources](#resources)
* [Resource lifecycle](#resource-lifecycle)
* [Configuring resources](#configuring-resources)
* [Resource replicas](#resource-replicas)
* [Log configuration](#log-configuration)
* [Serialization](#serialization)
* [Configuration maps](#configuration-maps)
* [Creating resources](#creating-resources)
* [Resource clusters](#resource-clusters)
1. [State machines](#state-machines)
Expand All @@ -64,14 +68,12 @@ taking place on Copycat**
* [Asynchronous proxies](#asynchronous-proxies)
1. [Event logs](#event-logs)
* [Creating an event log](#creating-an-event-log)
* [Configuring the event log](#configuring-the-event-log)
* [Writing events to the event log](#writing-events-to-the-event-log)
* [Consuming events from the event log](#consuming-events-from-the-event-log)
* [Working with event log partitions](#working-with-event-log-partitions)
* [Event log clusters](#event-log-clusters)
1. [State logs](#state-logs)
* [Creating a state log](#creating-a-state-log)
* [Configuring the state log](#configuring-the-state-log)
* [State commands](#state-commands)
* [State queries](#state-queries)
* [Submitting operations to the state log](#submitting-operations-to-the-state-log)
Expand Down Expand Up @@ -329,6 +331,87 @@ config.addEventLogConfig("event-log", new EventLogConfig()
.withRetentionPolicy(new SizeBasedRetentionPolicy(1024 * 1024)));
```
#### Resource replicas
Each of the resources in Copycat's cluster do not have to be replicated on all [active members](#active-members) on
the core cluster. Instead, Copycat allows replicas to be configured on a per-resource basis. This allows users to
configure replication for a specific resource on a subset of the core cluster of active members.

```java
CopycatConfig config = new CopycatConfig()
.withClusterConfig(new ClusterConfig()
.withMembers(
"tcp://123.456.789.0:5000",
"tcp://123.456.789.1:5000",
"tcp://123.456.789.2:5000",
"tcp://123.456.789.3:5000",
"tcp://123.456.789.4:5000"))
.addResource("event-log", new EventLogConfig()
.withReplicas(
"tcp://123.456.789.1:5000",
"tcp://123.456.789.2:5000",
"tcp://123.456.789.3:5000"));
```

#### Log configuration

Each replica of each resource in the Copycat cluster writes to a separate, configurable log. Many resource configuration
options involve configuring the log itself. Users can configure the performance and reliability of all Copycat logs by
tuning the underlying file log configuration options, for instance, by configuring the frequency with which Copycat
flushes logs to disk.

```java
EventLogConfig config = new EventLogConfig()
.withLog(new FileLog()
.withFlushInterval(60, TimeUnit.SECONDS));
```

Additionally, all Copycat file logs support configurable retention policies. Retention policies dictate the amount of
time for which a *segment* of the log is held on disk. For instance, the `FullRetentionPolicy` keeps logs forever,
while the `TimeBasedRetentionPolicy` allows segments of the log to be deleted after a certain amount of time has passed
since the segment was created.

```java
EventLogConfig config = new EventLogConfig()
.withLog(new FileLog()
.withSegmentSize(1024 * 1024 * 32)
.withRetentionPolicy(new SizeBasedRetentionPolicy(1024 * 1024 * 128));
```

Copycat provides the following log retention policies:
* `FullRetentionPolicy` - keeps all log segments forever
* `ZeroRetentionPolicy` - deletes segments immediately after the log has been rotated
* `TimeBasedRetentionPolicy` - deletes segments after a period of time has passed since they were *created*
* `SizeBasedRetentionPolicy` - deletes segments once the complete log grows to a certain size

#### Serialization

By default, entries to Copycat's logs are serialized using the default [Kryo](https://github.com/EsotericSoftware/kryo)
based serializer, so users can implement custom serializer for log entries by implementing `KryoSerializable`:
```java
public class MyEntry implements KryoSerializable {
public void write (Kryo kryo, Output output) {
// ...
}
public void read (Kryo kryo, Input input) {
// ...
}
}
```
Alternatively, users can provide a custom serializer for logs via the log configuration:
```java
EventLogConfig config = new EventLogConfig()
.withSerializer(new MySerializer());
```
#### Configuration maps
The Copycat configuration API is designed to support arbitrary `Map` based configurations as well. Simply pass a map
with the proper configuration options for the given configuration type to the configuration object constructor:
Expand Down Expand Up @@ -361,8 +444,8 @@ EventLogConfig config = new EventLogConfig(configMap);
With resources configured and the `Copycat` instance created, resources can be easily retrieved by calling any
of the resource-specific methods on the `Copycat` instance:
* `<T, U> EventLog<T, U> eventLog(String name)`
* `<T, U> StateLog<T, U> stateLog(String name)`
* `<T> EventLog<T> eventLog(String name)`
* `<T> StateLog<T> stateLog(String name)`
* `<T> StateMachine<T> stateMachine(String name)`
* `LeaderElection leaderElection(String name)`
* `<K, V> AsyncMap<K, V> map(String name)`
Expand Down Expand Up @@ -728,8 +811,6 @@ map.put("foo", "Hello world!").thenRun(() -> {
## Event logs

Event logs are eventually consistent Raft replicated logs that are designed to be compacted based on time or size.
Event logs are inherently partitioned - allowing high-throughput concurrent writes to leaders of multiple partitions -
and entries are consumed as they are received.

### Creating an event log

Expand All @@ -742,10 +823,9 @@ ClusterConfig cluster = new ClusterConfig()

CopycatConfig config = new CopycatConfig()
.withClusterConfig(cluster)
.addEventLogConfig("event-log", new EventLogConfig())
.withPartitions(3);
.addEventLogConfig("event-log", new EventLogConfig());

Copycat copycat = Copycat.create("tcp://123.456.789.0", config);
Copycat copycat = Copycat.create("tcp://123.456.789.0:5000", config);

copycat.open().thenRun(() -> {
copycat.<String, String>eventLog("event-log").open().thenAccept(eventLog -> {
Expand All @@ -761,52 +841,16 @@ static interface method:
```java
ClusterConfig cluster = new ClusterConfig()
.withProtocol(new NettyTcpProtocol())
.withMembers("tcp://123.456.789.0", "tcp://123.456.789.1", "tcp://123.456.789.2");

EventLogConfig config = new EventLogConfig())
.withPartitions(3);

EventLog<String, String> eventLog = EventLog.create("tcp:/123.456.789.0", cluster, config);
```

Note that the event log API requires two generic types. The first generic - `T` - is the partition key type. This is
the data type of the keys used to select partitions to which to commit entries. The second generic type - `U` - is the
log entry type.

### Configuring the event log

Copycat event logs are partitioned and replicated. Thus, the `EventLogConfig` API provides various methods for
configuring the partitioning and replication of each event log.

To set the number of event log partitions, use the `setPartitions` method or `withPartitions` fluent method.
.withMembers("tcp://123.456.789.0:5000", "tcp://123.456.789.1:5000", "tcp://123.456.789.2:5000");

```java
EventLogConfig config = new EventLogConfig()
.withPartitions(3);
```

The number of partitions indicated will dictate the number of separate replicated log instances that are created for
the event log. For each log partition, a separate leader will be elected and separate instance of the Raft consensus
algorithm will be run.
.withLog(new FileLog());

In order to control the amount of cluster resources each event log partition consumes, Copycat provides a *replication
factor* option which configures the number of nodes on which each partition is synchronously replicated. For each
level of replication factor, two additional nodes are required for replication for each partition. In other words:
* A replication factor of `0` indicates that each partition resides only on a single node
* A replication factor of `1` indicates that each partition resides on three nodes, and writes require data to be
written to the leader and one other node
* A replication factor of `2` indicates that each partition resides on five nodes, and writes require data to be written
to the leader and two other nodes

To configure the replication factor, use the `setReplicationFactor` method or `withReplicationFactor` fluent method.

```java
EventLogConfig config = new EventLogConfig()
.withPartitions(3)
.withReplicationFactor(1);
EventLog<String, String> eventLog = EventLog.create("tcp:/123.456.789.0", cluster, config);
```

By default the replication factor is `-1`, indicating that each partition should be replicated to the entire cluster.
Note that the event log API requires two generic types. The generic type - `T` - is the log entry type. Copycat logs
support arbitrary entry types.

### Writing events to the event log

Expand Down Expand Up @@ -916,41 +960,6 @@ Note that the state log API requires two generic types. The first generic - `T`
the data type of the keys used to select partitions to which to commit entries. The second generic type - `U` - is the
log entry type.
### Configuring the state log

Copycat state logs are partitioned and replicated. Thus, the `StateLogConfig` API provides various methods for
configuring the partitioning and replication of each state log.

To set the number of state log partitions, use the `setPartitions` method or `withPartitions` fluent method.

```java
StateLogConfig config = new StateLogConfig()
.withPartitions(3);
```

The number of partitions indicated will dictate the number of separate replicated log instances that are created for
the state log. For each log partition, a separate leader will be elected and separate instance of the Raft consensus
algorithm will be run.

In order to control the amount of cluster resources each state log partition consumes, Copycat provides a *replication
factor* option which configures the number of nodes on which each partition is synchronously replicated. For each
level of replication factor, two additional nodes are required for replication for each partition. In other words:
* A replication factor of `0` indicates that each partition resides only on a single node
* A replication factor of `1` indicates that each partition resides on three nodes, and writes require data to be
written to the leader and one other node
* A replication factor of `2` indicates that each partition resides on five nodes, and writes require data to be written
to the leader and two other nodes

To configure the replication factor, use the `setReplicationFactor` method or `withReplicationFactor` fluent method.

```java
StateLogConfig config = new StateLogConfig()
.withPartitions(3)
.withReplicationFactor(1);
```

By default the replication factor is `-1`, indicating that each partition should be replicated to the entire cluster.

### State commands
State logs work to alter state by applying log entries to registered state commands. Commands are operations which
Expand Down
4 changes: 2 additions & 2 deletions api/src/main/java/net/kuujo/copycat/Copycat.java
Expand Up @@ -70,7 +70,7 @@ static Copycat create(String uri, CopycatConfig config) {
* @param <T> the event log entry type.
* @return A completable future to be completed once the event log has been created.
*/
<T, U> EventLog<T, U> eventLog(String name);
<T> EventLog<T> eventLog(String name);

/**
* Creates a new state log.
Expand All @@ -79,7 +79,7 @@ static Copycat create(String uri, CopycatConfig config) {
* @param <T> The state log entry type.
* @return A completable future to be completed once the state log has been created.
*/
<T, U> StateLog<T, U> stateLog(String name);
<T> StateLog<T> stateLog(String name);

/**
* Creates a new replicated state machine.
Expand Down
Expand Up @@ -48,12 +48,12 @@ public CopycatConfig config() {
}

@Override
public <T, U> EventLog<T, U> eventLog(String name) {
public <T> EventLog<T> eventLog(String name) {
return coordinator.getResource(name);
}

@Override
public <T, U> StateLog<T, U> stateLog(String name) {
public <T> StateLog<T> stateLog(String name) {
return null;
}

Expand Down
Expand Up @@ -15,7 +15,7 @@
*/
package net.kuujo.copycat.collections;

import net.kuujo.copycat.DiscreteResource;
import net.kuujo.copycat.Resource;

/**
* Asynchronous collection.
Expand All @@ -24,5 +24,5 @@
*
* @param <T> The collection data type.
*/
public interface AsyncCollection<T extends AsyncCollection<T, U>, U> extends AsyncCollectionProxy<U>, DiscreteResource<T> {
public interface AsyncCollection<T extends AsyncCollection<T, U>, U> extends AsyncCollectionProxy<U>, Resource<T> {
}
Expand Up @@ -18,7 +18,6 @@
import net.kuujo.copycat.StateLogConfig;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.coordinator.CoordinatedResourceConfig;
import net.kuujo.copycat.cluster.coordinator.CoordinatedResourcePartitionConfig;
import net.kuujo.copycat.collections.internal.collection.DefaultAsyncList;

import java.util.Map;
Expand Down Expand Up @@ -56,10 +55,7 @@ public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
.withHeartbeatInterval(getHeartbeatInterval())
.withLog(getLog())
.withSerializer(getSerializer())
.withPartitions(new CoordinatedResourcePartitionConfig()
.withPartition(1)
.withReplicas(getReplicas().isEmpty() ? cluster.getMembers() : getReplicas())
.withResourceConfig(config));
.withReplicas(getReplicas().isEmpty() ? cluster.getMembers() : getReplicas());
}

}
Expand Up @@ -14,11 +14,10 @@
*/
package net.kuujo.copycat.collections;

import net.kuujo.copycat.DiscreteResource;
import net.kuujo.copycat.Resource;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.coordinator.ClusterCoordinator;
import net.kuujo.copycat.cluster.coordinator.CoordinatorConfig;
import net.kuujo.copycat.internal.AbstractResource;
import net.kuujo.copycat.internal.cluster.coordinator.DefaultClusterCoordinator;

import java.util.concurrent.CompletableFuture;
Expand All @@ -28,7 +27,7 @@
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public interface AsyncLock extends DiscreteResource<AsyncLock> {
public interface AsyncLock extends Resource<AsyncLock> {

/**
* Creates a new asynchronous lock.
Expand All @@ -54,10 +53,9 @@ static AsyncLock create(String name, String uri, ClusterConfig cluster) {
@SuppressWarnings({"unchecked", "rawtypes"})
static AsyncLock create(String name, String uri, ClusterConfig cluster, AsyncLockConfig config) {
ClusterCoordinator coordinator = new DefaultClusterCoordinator(uri, new CoordinatorConfig().withClusterConfig(cluster).addResourceConfig(name, config.resolve(cluster)));
AsyncLock lock = coordinator.getResource(name);
((AbstractResource) lock).withStartupTask(() -> coordinator.open().thenApply(v -> null));
((AbstractResource) lock).withShutdownTask(coordinator::close);
return lock;
return coordinator.<AsyncLock>getResource(name)
.withStartupTask(() -> coordinator.open().thenApply(v -> null))
.withShutdownTask(coordinator::close);
}

/**
Expand Down
Expand Up @@ -19,7 +19,6 @@
import net.kuujo.copycat.StateLogConfig;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.coordinator.CoordinatedResourceConfig;
import net.kuujo.copycat.cluster.coordinator.CoordinatedResourcePartitionConfig;
import net.kuujo.copycat.collections.internal.lock.DefaultAsyncLock;
import net.kuujo.copycat.log.BufferedLog;
import net.kuujo.copycat.log.Log;
Expand Down Expand Up @@ -65,10 +64,7 @@ public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
.withHeartbeatInterval(getHeartbeatInterval())
.withLog(getLog())
.withSerializer(getSerializer())
.withPartitions(new CoordinatedResourcePartitionConfig()
.withPartition(1)
.withReplicas(getReplicas().isEmpty() ? cluster.getMembers() : getReplicas())
.withResourceConfig(config));
.withReplicas(getReplicas().isEmpty() ? cluster.getMembers() : getReplicas());
}

}

0 comments on commit 8b16c75

Please sign in to comment.