From 338b2baa8dbc17ad73e3346c091f8564a65560eb Mon Sep 17 00:00:00 2001 From: Jordan Halterman Date: Mon, 17 Aug 2015 02:42:40 -0700 Subject: [PATCH] Refactor storage module to support per-entry persistence levels. --- .../atomic/DistributedAtomicValue.java | 14 +- .../atomic/state/ReferenceCommands.java | 10 +- .../copycat/atomic/state/ReferenceState.java | 4 +- .../atomic/DistributedAtomicValueTest.java | 11 +- .../copycat/collections/DistributedMap.java | 8 +- .../copycat/collections/DistributedSet.java | 6 +- .../collections/state/MapCommands.java | 10 +- .../copycat/collections/state/MapState.java | 4 +- .../collections/state/SetCommands.java | 8 +- .../copycat/collections/state/SetState.java | 4 +- .../collections/DistributedMapTest.java | 3 +- .../collections/DistributedSetTest.java | 3 +- .../DistributedLeaderElectionTest.java | 3 +- .../coordination/DistributedLockTest.java | 3 +- .../java/net/kuujo/copycat/Configurable.java | 32 ++++ .../main/java/net/kuujo/copycat/Copycat.java | 42 ++++- .../src/main/java/net/kuujo/copycat/Node.java | 22 +-- .../main/java/net/kuujo/copycat/Options.java | 24 +++ ...istenceLevel.java => PersistenceMode.java} | 4 +- .../kuujo/copycat/manager/CreateResource.java | 10 +- .../kuujo/copycat/raft/protocol/Command.java | 20 ++- .../kuujo/copycat/raft/protocol/Query.java | 2 +- .../kuujo/copycat/raft/state/LeaderState.java | 15 +- .../net/kuujo/copycat/io/storage/Entry.java | 22 +++ .../kuujo/copycat/io/storage/OffsetIndex.java | 35 ++-- ...torageLevel.java => PersistenceLevel.java} | 2 +- .../net/kuujo/copycat/io/storage/Segment.java | 170 +++++++++++------- .../copycat/io/storage/SegmentManager.java | 71 ++------ .../net/kuujo/copycat/io/storage/Storage.java | 37 +--- .../copycat/io/storage/TypedEntryPool.java | 2 +- .../kuujo/copycat/io/storage/CleanerTest.java | 1 - .../net/kuujo/copycat/io/storage/LogTest.java | 19 +- 32 files changed, 374 insertions(+), 247 deletions(-) create mode 100644 core/src/main/java/net/kuujo/copycat/Configurable.java create mode 100644 core/src/main/java/net/kuujo/copycat/Options.java rename core/src/main/java/net/kuujo/copycat/{PersistenceLevel.java => PersistenceMode.java} (93%) rename storage/src/main/java/net/kuujo/copycat/io/storage/{StorageLevel.java => PersistenceLevel.java} (97%) diff --git a/atomic/src/main/java/net/kuujo/copycat/atomic/DistributedAtomicValue.java b/atomic/src/main/java/net/kuujo/copycat/atomic/DistributedAtomicValue.java index 498c4ef4ce..5b49255ec6 100644 --- a/atomic/src/main/java/net/kuujo/copycat/atomic/DistributedAtomicValue.java +++ b/atomic/src/main/java/net/kuujo/copycat/atomic/DistributedAtomicValue.java @@ -15,7 +15,7 @@ */ package net.kuujo.copycat.atomic; -import net.kuujo.copycat.PersistenceLevel; +import net.kuujo.copycat.PersistenceMode; import net.kuujo.copycat.Resource; import net.kuujo.copycat.atomic.state.ReferenceCommands; import net.kuujo.copycat.atomic.state.ReferenceState; @@ -141,7 +141,7 @@ public CompletableFuture set(T value, Duration ttl) { * @param persistence The write persistence. * @return A completable future to be completed once the value has been set. */ - public CompletableFuture set(T value, PersistenceLevel persistence) { + public CompletableFuture set(T value, PersistenceMode persistence) { return submit(ReferenceCommands.Set.builder() .withValue(value) .withPersistence(persistence) @@ -156,7 +156,7 @@ public CompletableFuture set(T value, PersistenceLevel persistence) { * @param persistence The write persistence. * @return A completable future to be completed once the value has been set. */ - public CompletableFuture set(T value, Duration ttl, PersistenceLevel persistence) { + public CompletableFuture set(T value, Duration ttl, PersistenceMode persistence) { return submit(ReferenceCommands.Set.builder() .withValue(value) .withTtl(ttl.toMillis()) @@ -197,7 +197,7 @@ public CompletableFuture getAndSet(T value, Duration ttl) { * @param persistence The write persistence. * @return A completable future to be completed with the previous value. */ - public CompletableFuture getAndSet(T value, PersistenceLevel persistence) { + public CompletableFuture getAndSet(T value, PersistenceMode persistence) { return submit(ReferenceCommands.GetAndSet.builder() .withValue(value) .withPersistence(persistence) @@ -212,7 +212,7 @@ public CompletableFuture getAndSet(T value, PersistenceLevel persistence) { * @param persistence The write persistence. * @return A completable future to be completed with the previous value. */ - public CompletableFuture getAndSet(T value, Duration ttl, PersistenceLevel persistence) { + public CompletableFuture getAndSet(T value, Duration ttl, PersistenceMode persistence) { return submit(ReferenceCommands.GetAndSet.builder() .withValue(value) .withTtl(ttl.toMillis()) @@ -258,7 +258,7 @@ public CompletableFuture compareAndSet(T expect, T update, Duration ttl * @param persistence The write persistence. * @return A completable future to be completed with a boolean value indicating whether the value was updated. */ - public CompletableFuture compareAndSet(T expect, T update, PersistenceLevel persistence) { + public CompletableFuture compareAndSet(T expect, T update, PersistenceMode persistence) { return submit(ReferenceCommands.CompareAndSet.builder() .withExpect(expect) .withUpdate(update) @@ -275,7 +275,7 @@ public CompletableFuture compareAndSet(T expect, T update, PersistenceL * @param persistence The write persistence. * @return A completable future to be completed with a boolean value indicating whether the value was updated. */ - public CompletableFuture compareAndSet(T expect, T update, Duration ttl, PersistenceLevel persistence) { + public CompletableFuture compareAndSet(T expect, T update, Duration ttl, PersistenceMode persistence) { return submit(ReferenceCommands.CompareAndSet.builder() .withExpect(expect) .withUpdate(update) diff --git a/atomic/src/main/java/net/kuujo/copycat/atomic/state/ReferenceCommands.java b/atomic/src/main/java/net/kuujo/copycat/atomic/state/ReferenceCommands.java index ab12c78d4d..a8fe0a4ea0 100644 --- a/atomic/src/main/java/net/kuujo/copycat/atomic/state/ReferenceCommands.java +++ b/atomic/src/main/java/net/kuujo/copycat/atomic/state/ReferenceCommands.java @@ -15,7 +15,7 @@ */ package net.kuujo.copycat.atomic.state; -import net.kuujo.copycat.PersistenceLevel; +import net.kuujo.copycat.PersistenceMode; import net.kuujo.copycat.io.BufferInput; import net.kuujo.copycat.io.BufferOutput; import net.kuujo.copycat.io.serializer.CopycatSerializable; @@ -41,7 +41,7 @@ private ReferenceCommands() { * Abstract reference command. */ public static abstract class ReferenceCommand implements Command, CopycatSerializable { - protected PersistenceLevel mode = PersistenceLevel.PERSISTENT; + protected PersistenceMode mode = PersistenceMode.PERSISTENT; protected long ttl; /** @@ -49,7 +49,7 @@ public static abstract class ReferenceCommand implements Command, CopycatS * * @return The persistence mode. */ - public PersistenceLevel mode() { + public PersistenceMode mode() { return mode; } @@ -70,7 +70,7 @@ public void writeObject(BufferOutput buffer, Serializer serializer) { @Override public void readObject(BufferInput buffer, Serializer serializer) { - mode = PersistenceLevel.values()[buffer.readByte()]; + mode = PersistenceMode.values()[buffer.readByte()]; ttl = buffer.readLong(); } @@ -89,7 +89,7 @@ protected Builder(BuilderPool pool) { * @return The command builder. */ @SuppressWarnings("unchecked") - public T withPersistence(PersistenceLevel mode) { + public T withPersistence(PersistenceMode mode) { if (mode == null) throw new NullPointerException("mode cannot be null"); command.mode = mode; diff --git a/atomic/src/main/java/net/kuujo/copycat/atomic/state/ReferenceState.java b/atomic/src/main/java/net/kuujo/copycat/atomic/state/ReferenceState.java index 2f59479b84..9f949e7d6c 100644 --- a/atomic/src/main/java/net/kuujo/copycat/atomic/state/ReferenceState.java +++ b/atomic/src/main/java/net/kuujo/copycat/atomic/state/ReferenceState.java @@ -15,7 +15,7 @@ */ package net.kuujo.copycat.atomic.state; -import net.kuujo.copycat.PersistenceLevel; +import net.kuujo.copycat.PersistenceMode; import net.kuujo.copycat.raft.session.Session; import net.kuujo.copycat.raft.Commit; import net.kuujo.copycat.raft.StateMachine; @@ -79,7 +79,7 @@ public void close(Session session) { private boolean isActive(Commit commit, Instant time) { if (commit == null) { return false; - } else if (commit.operation().mode() == PersistenceLevel.EPHEMERAL && !sessions.contains(commit.session().id())) { + } else if (commit.operation().mode() == PersistenceMode.EPHEMERAL && !sessions.contains(commit.session().id())) { return false; } else if (commit.operation().ttl() != 0 && commit.operation().ttl() < time.toEpochMilli() - commit.time().toEpochMilli()) { return false; diff --git a/atomic/src/test/java/net/kuujo/copycat/atomic/DistributedAtomicValueTest.java b/atomic/src/test/java/net/kuujo/copycat/atomic/DistributedAtomicValueTest.java index b5d1a7042e..e602d95a53 100644 --- a/atomic/src/test/java/net/kuujo/copycat/atomic/DistributedAtomicValueTest.java +++ b/atomic/src/test/java/net/kuujo/copycat/atomic/DistributedAtomicValueTest.java @@ -20,7 +20,6 @@ import net.kuujo.copycat.CopycatServer; import net.kuujo.copycat.Node; import net.kuujo.copycat.io.storage.Storage; -import net.kuujo.copycat.io.storage.StorageLevel; import net.kuujo.copycat.io.transport.LocalServerRegistry; import net.kuujo.copycat.io.transport.LocalTransport; import net.kuujo.copycat.raft.Member; @@ -125,7 +124,7 @@ public void testMembershipChange() throws Throwable { .withRegistry(registry) .build()) .withStorage(Storage.builder() - .withStorageLevel(StorageLevel.MEMORY) + .withDirectory("test-logs") .build()) .build(); Copycat copycat2 = CopycatServer.builder() @@ -135,7 +134,7 @@ public void testMembershipChange() throws Throwable { .withRegistry(registry) .build()) .withStorage(Storage.builder() - .withStorageLevel(StorageLevel.MEMORY) + .withDirectory("test-logs") .build()) .build(); Copycat copycat3 = CopycatServer.builder() @@ -145,7 +144,7 @@ public void testMembershipChange() throws Throwable { .withRegistry(registry) .build()) .withStorage(Storage.builder() - .withStorageLevel(StorageLevel.MEMORY) + .withDirectory("test-logs") .build()) .build(); @@ -185,7 +184,7 @@ public void testMembershipChange() throws Throwable { .withRegistry(registry) .build()) .withStorage(Storage.builder() - .withStorageLevel(StorageLevel.MEMORY) + .withDirectory("test-logs") .build()) .build(); @@ -268,7 +267,7 @@ private List createCopycats(int nodes) throws Throwable { .withRegistry(registry) .build()) .withStorage(Storage.builder() - .withStorageLevel(StorageLevel.MEMORY) + .withDirectory("test-logs") .build()) .build(); diff --git a/collections/src/main/java/net/kuujo/copycat/collections/DistributedMap.java b/collections/src/main/java/net/kuujo/copycat/collections/DistributedMap.java index 81f01da40d..a4aa14c3a6 100644 --- a/collections/src/main/java/net/kuujo/copycat/collections/DistributedMap.java +++ b/collections/src/main/java/net/kuujo/copycat/collections/DistributedMap.java @@ -15,7 +15,7 @@ */ package net.kuujo.copycat.collections; -import net.kuujo.copycat.PersistenceLevel; +import net.kuujo.copycat.PersistenceMode; import net.kuujo.copycat.Resource; import net.kuujo.copycat.collections.state.MapCommands; import net.kuujo.copycat.collections.state.MapState; @@ -158,7 +158,7 @@ public CompletableFuture put(K key, V value) { * @return A completable future to be completed with the result once complete. */ @SuppressWarnings("unchecked") - public CompletableFuture put(K key, V value, PersistenceLevel persistence) { + public CompletableFuture put(K key, V value, PersistenceMode persistence) { return submit(MapCommands.Put.builder() .withKey(key) .withValue(value) @@ -195,7 +195,7 @@ public CompletableFuture put(K key, V value, Duration ttl) { * @return A completable future to be completed with the result once complete. */ @SuppressWarnings("unchecked") - public CompletableFuture put(K key, V value, Duration ttl, PersistenceLevel persistence) { + public CompletableFuture put(K key, V value, Duration ttl, PersistenceMode persistence) { return submit(MapCommands.Put.builder() .withKey(key) .withValue(value) @@ -297,7 +297,7 @@ public CompletableFuture putIfAbsent(K key, V value, Duration ttl) { * @return A completable future to be completed with the result once complete. */ @SuppressWarnings("unchecked") - public CompletableFuture putIfAbsent(K key, V value, Duration ttl, PersistenceLevel persistence) { + public CompletableFuture putIfAbsent(K key, V value, Duration ttl, PersistenceMode persistence) { return submit(MapCommands.PutIfAbsent.builder() .withKey(key) .withValue(value) diff --git a/collections/src/main/java/net/kuujo/copycat/collections/DistributedSet.java b/collections/src/main/java/net/kuujo/copycat/collections/DistributedSet.java index 194a98ef0f..f5607c2c80 100644 --- a/collections/src/main/java/net/kuujo/copycat/collections/DistributedSet.java +++ b/collections/src/main/java/net/kuujo/copycat/collections/DistributedSet.java @@ -15,7 +15,7 @@ */ package net.kuujo.copycat.collections; -import net.kuujo.copycat.PersistenceLevel; +import net.kuujo.copycat.PersistenceMode; import net.kuujo.copycat.Resource; import net.kuujo.copycat.collections.state.SetCommands; import net.kuujo.copycat.collections.state.SetState; @@ -58,7 +58,7 @@ public CompletableFuture add(T value) { * @return A completable future to be completed with the result once complete. */ @SuppressWarnings("unchecked") - public CompletableFuture add(T value, PersistenceLevel persistence) { + public CompletableFuture add(T value, PersistenceMode persistence) { return submit(SetCommands.Add.builder() .withValue(value.hashCode()) .withPersistence(persistence) @@ -89,7 +89,7 @@ public CompletableFuture add(T value, Duration ttl) { * @return A completable future to be completed with the result once complete. */ @SuppressWarnings("unchecked") - public CompletableFuture add(T value, Duration ttl, PersistenceLevel persistence) { + public CompletableFuture add(T value, Duration ttl, PersistenceMode persistence) { return submit(SetCommands.Add.builder() .withValue(value.hashCode()) .withTtl(ttl.toMillis()) diff --git a/collections/src/main/java/net/kuujo/copycat/collections/state/MapCommands.java b/collections/src/main/java/net/kuujo/copycat/collections/state/MapCommands.java index 213c0a5fbf..cc7c93892a 100644 --- a/collections/src/main/java/net/kuujo/copycat/collections/state/MapCommands.java +++ b/collections/src/main/java/net/kuujo/copycat/collections/state/MapCommands.java @@ -15,7 +15,7 @@ */ package net.kuujo.copycat.collections.state; -import net.kuujo.copycat.PersistenceLevel; +import net.kuujo.copycat.PersistenceMode; import net.kuujo.copycat.io.BufferInput; import net.kuujo.copycat.io.BufferOutput; import net.kuujo.copycat.io.serializer.CopycatSerializable; @@ -268,7 +268,7 @@ public T withValue(Object value) { * TTL command. */ public static abstract class TtlCommand extends KeyValueCommand { - protected PersistenceLevel mode = PersistenceLevel.PERSISTENT; + protected PersistenceMode mode = PersistenceMode.PERSISTENT; protected long ttl; /** @@ -276,7 +276,7 @@ public static abstract class TtlCommand extends KeyValueCommand { * * @return The persistence mode. */ - public PersistenceLevel mode() { + public PersistenceMode mode() { return mode; } @@ -298,7 +298,7 @@ public void writeObject(BufferOutput buffer, Serializer serializer) { @Override public void readObject(BufferInput buffer, Serializer serializer) { super.readObject(buffer, serializer); - mode = PersistenceLevel.values()[buffer.readByte()]; + mode = PersistenceMode.values()[buffer.readByte()]; ttl = buffer.readLong(); } @@ -316,7 +316,7 @@ protected Builder(BuilderPool pool) { * @param mode The persistence mode. * @return The command builder. */ - public Builder withPersistence(PersistenceLevel mode) { + public Builder withPersistence(PersistenceMode mode) { command.mode = mode; return this; } diff --git a/collections/src/main/java/net/kuujo/copycat/collections/state/MapState.java b/collections/src/main/java/net/kuujo/copycat/collections/state/MapState.java index cd67dee2cc..70508e92c9 100644 --- a/collections/src/main/java/net/kuujo/copycat/collections/state/MapState.java +++ b/collections/src/main/java/net/kuujo/copycat/collections/state/MapState.java @@ -15,7 +15,7 @@ */ package net.kuujo.copycat.collections.state; -import net.kuujo.copycat.PersistenceLevel; +import net.kuujo.copycat.PersistenceMode; import net.kuujo.copycat.raft.session.Session; import net.kuujo.copycat.raft.Commit; import net.kuujo.copycat.raft.StateMachine; @@ -70,7 +70,7 @@ public void close(Session session) { private boolean isActive(Commit commit, Instant instant) { if (commit == null) { return false; - } else if (commit.operation().mode() == PersistenceLevel.EPHEMERAL && !sessions.contains(commit.session().id())) { + } else if (commit.operation().mode() == PersistenceMode.EPHEMERAL && !sessions.contains(commit.session().id())) { return false; } else if (commit.operation().ttl() != 0 && commit.operation().ttl() < instant.toEpochMilli() - commit.time().toEpochMilli()) { return false; diff --git a/collections/src/main/java/net/kuujo/copycat/collections/state/SetCommands.java b/collections/src/main/java/net/kuujo/copycat/collections/state/SetCommands.java index 63197fac2f..8ca7dbad30 100644 --- a/collections/src/main/java/net/kuujo/copycat/collections/state/SetCommands.java +++ b/collections/src/main/java/net/kuujo/copycat/collections/state/SetCommands.java @@ -222,7 +222,7 @@ protected Contains create() { */ public static abstract class TtlCommand extends ValueCommand { protected long ttl; - protected PersistenceLevel mode = PersistenceLevel.PERSISTENT; + protected PersistenceMode mode = PersistenceMode.PERSISTENT; /** * Returns the time to live in milliseconds. @@ -238,7 +238,7 @@ public long ttl() { * * @return The persistence mode. */ - public PersistenceLevel mode() { + public PersistenceMode mode() { return mode; } @@ -251,7 +251,7 @@ public void writeObject(BufferOutput buffer, Serializer serializer) { @Override public void readObject(BufferInput buffer, Serializer serializer) { super.readObject(buffer, serializer); - mode = PersistenceLevel.values()[buffer.readByte()]; + mode = PersistenceMode.values()[buffer.readByte()]; ttl = buffer.readLong(); } @@ -292,7 +292,7 @@ public Builder withTtl(long ttl, TimeUnit unit) { * @param mode The persistence mode. * @return The command builder. */ - public Builder withPersistence(PersistenceLevel mode) { + public Builder withPersistence(PersistenceMode mode) { command.mode = mode; return this; } diff --git a/collections/src/main/java/net/kuujo/copycat/collections/state/SetState.java b/collections/src/main/java/net/kuujo/copycat/collections/state/SetState.java index 06766660e6..3761359e20 100644 --- a/collections/src/main/java/net/kuujo/copycat/collections/state/SetState.java +++ b/collections/src/main/java/net/kuujo/copycat/collections/state/SetState.java @@ -15,7 +15,7 @@ */ package net.kuujo.copycat.collections.state; -import net.kuujo.copycat.PersistenceLevel; +import net.kuujo.copycat.PersistenceMode; import net.kuujo.copycat.raft.session.Session; import net.kuujo.copycat.raft.Commit; import net.kuujo.copycat.raft.StateMachine; @@ -67,7 +67,7 @@ public void close(Session session) { private boolean isActive(Commit commit, Instant instant) { if (commit == null) { return false; - } else if (commit.operation().mode() == PersistenceLevel.EPHEMERAL && !sessions.contains(commit.session().id())) { + } else if (commit.operation().mode() == PersistenceMode.EPHEMERAL && !sessions.contains(commit.session().id())) { return false; } else if (commit.operation().ttl() != 0 && commit.operation().ttl() < instant.toEpochMilli() - commit.time().toEpochMilli()) { return false; diff --git a/collections/src/test/java/net/kuujo/copycat/collections/DistributedMapTest.java b/collections/src/test/java/net/kuujo/copycat/collections/DistributedMapTest.java index e37d801b08..4a46667b61 100644 --- a/collections/src/test/java/net/kuujo/copycat/collections/DistributedMapTest.java +++ b/collections/src/test/java/net/kuujo/copycat/collections/DistributedMapTest.java @@ -20,7 +20,6 @@ import net.kuujo.copycat.CopycatServer; import net.kuujo.copycat.Node; import net.kuujo.copycat.io.storage.Storage; -import net.kuujo.copycat.io.storage.StorageLevel; import net.kuujo.copycat.io.transport.LocalServerRegistry; import net.kuujo.copycat.io.transport.LocalTransport; import net.kuujo.copycat.raft.Member; @@ -192,7 +191,7 @@ private List createCopycats(int nodes) throws Throwable { .withRegistry(registry) .build()) .withStorage(Storage.builder() - .withStorageLevel(StorageLevel.MEMORY) + .withDirectory("test-logs") .build()) .build(); diff --git a/collections/src/test/java/net/kuujo/copycat/collections/DistributedSetTest.java b/collections/src/test/java/net/kuujo/copycat/collections/DistributedSetTest.java index 0971e7218f..6ff05fedb9 100644 --- a/collections/src/test/java/net/kuujo/copycat/collections/DistributedSetTest.java +++ b/collections/src/test/java/net/kuujo/copycat/collections/DistributedSetTest.java @@ -20,7 +20,6 @@ import net.kuujo.copycat.CopycatServer; import net.kuujo.copycat.Node; import net.kuujo.copycat.io.storage.Storage; -import net.kuujo.copycat.io.storage.StorageLevel; import net.kuujo.copycat.io.transport.LocalServerRegistry; import net.kuujo.copycat.io.transport.LocalTransport; import net.kuujo.copycat.raft.Member; @@ -96,7 +95,7 @@ private List createCopycats(int nodes) throws Throwable { .withRegistry(registry) .build()) .withStorage(Storage.builder() - .withStorageLevel(StorageLevel.MEMORY) + .withDirectory("test-logs") .build()) .build(); diff --git a/coordination/src/test/java/net/kuujo/copycat/coordination/DistributedLeaderElectionTest.java b/coordination/src/test/java/net/kuujo/copycat/coordination/DistributedLeaderElectionTest.java index 7147b25a52..fdd6f2bbc3 100644 --- a/coordination/src/test/java/net/kuujo/copycat/coordination/DistributedLeaderElectionTest.java +++ b/coordination/src/test/java/net/kuujo/copycat/coordination/DistributedLeaderElectionTest.java @@ -20,7 +20,6 @@ import net.kuujo.copycat.CopycatServer; import net.kuujo.copycat.Node; import net.kuujo.copycat.io.storage.Storage; -import net.kuujo.copycat.io.storage.StorageLevel; import net.kuujo.copycat.io.transport.LocalServerRegistry; import net.kuujo.copycat.io.transport.LocalTransport; import net.kuujo.copycat.raft.Member; @@ -85,7 +84,7 @@ private List createCopycats(int nodes) throws Throwable { .withRegistry(registry) .build()) .withStorage(Storage.builder() - .withStorageLevel(StorageLevel.MEMORY) + .withDirectory("test-logs") .build()) .build(); diff --git a/coordination/src/test/java/net/kuujo/copycat/coordination/DistributedLockTest.java b/coordination/src/test/java/net/kuujo/copycat/coordination/DistributedLockTest.java index 0fe29c9638..0929151c66 100644 --- a/coordination/src/test/java/net/kuujo/copycat/coordination/DistributedLockTest.java +++ b/coordination/src/test/java/net/kuujo/copycat/coordination/DistributedLockTest.java @@ -20,7 +20,6 @@ import net.kuujo.copycat.CopycatServer; import net.kuujo.copycat.Node; import net.kuujo.copycat.io.storage.Storage; -import net.kuujo.copycat.io.storage.StorageLevel; import net.kuujo.copycat.io.transport.LocalServerRegistry; import net.kuujo.copycat.io.transport.LocalTransport; import net.kuujo.copycat.raft.Member; @@ -89,7 +88,7 @@ private List createCopycats(int nodes) throws Throwable { .withRegistry(registry) .build()) .withStorage(Storage.builder() - .withStorageLevel(StorageLevel.MEMORY) + .withDirectory("test-logs") .build()) .build(); diff --git a/core/src/main/java/net/kuujo/copycat/Configurable.java b/core/src/main/java/net/kuujo/copycat/Configurable.java new file mode 100644 index 0000000000..732a7fe4da --- /dev/null +++ b/core/src/main/java/net/kuujo/copycat/Configurable.java @@ -0,0 +1,32 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.kuujo.copycat; + +/** + * Configurable resource. + * + * @author , U extends Options> CompletableFuture create(String path, Class type, U options) { + try { + T resource = (T) type.newInstance(); + return client.submit(CreateResource.builder() + .withPath(path) + .withType(resource.stateMachine()) + .build()) + .thenApply(id -> { + resource.open(resources.computeIfAbsent(id, i -> { + ResourceContext context = new ResourceContext(id, client); + resource.configure(options); + return context; + })); + return resource; + }); + } catch (InstantiationException | IllegalAccessException e) { + throw new ResourceException("failed to instantiate resource: " + type, e); + } + } + /** * Deletes a node at the given path. *

@@ -155,25 +191,21 @@ public CompletableFuture delete(String path) { } @Override - @SuppressWarnings("unchecked") public CompletableFuture open() { return client.open().thenApply(v -> this); } @Override - @SuppressWarnings("unchecked") public boolean isOpen() { return client.isOpen(); } @Override - @SuppressWarnings("unchecked") public CompletableFuture close() { return client.close(); } @Override - @SuppressWarnings("unchecked") public boolean isClosed() { return client.isClosed(); } diff --git a/core/src/main/java/net/kuujo/copycat/Node.java b/core/src/main/java/net/kuujo/copycat/Node.java index 77d48485ac..81d7d6d27d 100644 --- a/core/src/main/java/net/kuujo/copycat/Node.java +++ b/core/src/main/java/net/kuujo/copycat/Node.java @@ -16,7 +16,6 @@ package net.kuujo.copycat; import net.kuujo.copycat.manager.PathChildren; -import net.kuujo.copycat.raft.StateMachine; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -79,7 +78,7 @@ public CompletableFuture> children() { return copycat.client.submit(PathChildren.builder() .withPath(path) .build()) - .thenApply(children -> children.stream() + .thenApply(children -> children.stream() .map(copycat::node) .collect(Collectors.toList())); } @@ -134,11 +133,6 @@ public CompletableFuture create(String path) { /** * Creates a resource at this node. - *

- * The provided {@link net.kuujo.copycat.Resource} class must be annotated with {@link net.kuujo.copycat.Stateful} - * indicating the {@link StateMachine} to create on the server side. The state machine - * class will be submitted to the cluster and created on each Raft server before the returned - * {@link java.util.concurrent.CompletableFuture} is completed. * * @param type The resource type. * @param The resource type. @@ -148,11 +142,19 @@ public CompletableFuture create(Class type) { return copycat.create(path, type); } + /** + * Creates a configurable resource at this node. + * + * @param type The resource type. + * @param The resource type. + * @return A completable future to be completed with the resource instance. + */ + public , U extends Options> CompletableFuture create(Class type, U options) { + return copycat.create(path, type, options); + } + /** * Gets a resource at this node. - *

- * The provided {@link net.kuujo.copycat.Resource} class must be annotated with {@link net.kuujo.copycat.Stateful} - * indicating the {@link StateMachine} type on the server side. * * @param type The resource type. * @param The resource type. diff --git a/core/src/main/java/net/kuujo/copycat/Options.java b/core/src/main/java/net/kuujo/copycat/Options.java new file mode 100644 index 0000000000..c801e139e2 --- /dev/null +++ b/core/src/main/java/net/kuujo/copycat/Options.java @@ -0,0 +1,24 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.kuujo.copycat; + +/** + * Resource options. + * + * @author Jordan Halterman */ -public enum PersistenceLevel { +public enum PersistenceMode { /** * Ephemeral mode. diff --git a/core/src/main/java/net/kuujo/copycat/manager/CreateResource.java b/core/src/main/java/net/kuujo/copycat/manager/CreateResource.java index 9658e3c01a..3f1ee3d446 100644 --- a/core/src/main/java/net/kuujo/copycat/manager/CreateResource.java +++ b/core/src/main/java/net/kuujo/copycat/manager/CreateResource.java @@ -53,9 +53,9 @@ public CreateResource(String path, Class type) { } /** - * Returns the resource class. + * Returns the resource state machine class. * - * @return The resource class. + * @return The resource state machine class. */ public Class type() { return type; @@ -95,10 +95,10 @@ protected CreateResource create() { } /** - * Sets the command type. + * Sets the resource state machine type. * - * @param type The command type. - * @return The command builder. + * @param type The resource state machine type. + * @return The create resource command builder. */ public Builder withType(Class type) { operation.type = type; diff --git a/protocol/src/main/java/net/kuujo/copycat/raft/protocol/Command.java b/protocol/src/main/java/net/kuujo/copycat/raft/protocol/Command.java index d30a81c005..051eece785 100644 --- a/protocol/src/main/java/net/kuujo/copycat/raft/protocol/Command.java +++ b/protocol/src/main/java/net/kuujo/copycat/raft/protocol/Command.java @@ -15,6 +15,7 @@ */ package net.kuujo.copycat.raft.protocol; +import net.kuujo.copycat.io.storage.PersistenceLevel; import net.kuujo.copycat.util.BuilderPool; /** @@ -29,10 +30,27 @@ */ public interface Command extends Operation { + /** + * Returns the command storage level. + *

+ * The storage level specifies how the command should be persisted in Raft replicated logs. Copycat's log + * supports persisting commands to disk or holding them in memory depending on the provided storage level. + * If the storage level is {@link PersistenceLevel#DISK} then the command will persist across failures, otherwise + * a failure will result in {@link PersistenceLevel#MEMORY} commands being lost. + *

+ * It's important to note, though, that all commands will be replicated to a majority of the cluster regardless + * of their storage level. + * + * @return The command storage level. + */ + default PersistenceLevel storage() { + return PersistenceLevel.DISK; + } + /** * Base builder for commands. */ - static abstract class Builder, U extends Command, V> extends Operation.Builder { + abstract class Builder, U extends Command, V> extends Operation.Builder { protected U command; protected Builder(BuilderPool pool) { diff --git a/protocol/src/main/java/net/kuujo/copycat/raft/protocol/Query.java b/protocol/src/main/java/net/kuujo/copycat/raft/protocol/Query.java index 77c917307d..30cb6a5bb7 100644 --- a/protocol/src/main/java/net/kuujo/copycat/raft/protocol/Query.java +++ b/protocol/src/main/java/net/kuujo/copycat/raft/protocol/Query.java @@ -57,7 +57,7 @@ default ConsistencyLevel consistency() { /** * Base builder for queries. */ - static abstract class Builder, U extends Query, V> extends Operation.Builder { + abstract class Builder, U extends Query, V> extends Operation.Builder { protected U query; protected Builder(BuilderPool pool) { diff --git a/server/src/main/java/net/kuujo/copycat/raft/state/LeaderState.java b/server/src/main/java/net/kuujo/copycat/raft/state/LeaderState.java index 665f5083bb..fb9b02f601 100644 --- a/server/src/main/java/net/kuujo/copycat/raft/state/LeaderState.java +++ b/server/src/main/java/net/kuujo/copycat/raft/state/LeaderState.java @@ -16,6 +16,7 @@ package net.kuujo.copycat.raft.state; import net.kuujo.copycat.io.storage.Entry; +import net.kuujo.copycat.io.storage.PersistenceLevel; import net.kuujo.copycat.raft.Members; import net.kuujo.copycat.raft.RaftServer; import net.kuujo.copycat.raft.protocol.Command; @@ -166,7 +167,8 @@ public CompletableFuture join(final JoinRequest request) { .build(); try (ConfigurationEntry entry = context.getLog().createEntry(ConfigurationEntry.class)) { - entry.setTerm(term) + entry.setPersistenceLevel(PersistenceLevel.DISK) + .setTerm(term) .setActive(activeMembers) .setPassive(passiveMembers); index = context.getLog().appendEntry(entry); @@ -220,7 +222,8 @@ public CompletableFuture leave(final LeaveRequest request) { .build(); try (ConfigurationEntry entry = context.getLog().createEntry(ConfigurationEntry.class)) { - entry.setTerm(term) + entry.setPersistenceLevel(PersistenceLevel.DISK) + .setTerm(term) .setActive(activeMembers) .setPassive(passiveMembers); index = context.getLog().appendEntry(entry); @@ -304,7 +307,8 @@ protected CompletableFuture command(final CommandRequest reques final long index; try (CommandEntry entry = context.getLog().createEntry(CommandEntry.class)) { - entry.setTerm(term) + entry.setPersistenceLevel(request.command().storage()) + .setTerm(term) .setTimestamp(timestamp) .setSession(request.session()) .setSequence(request.commandSequence()) @@ -470,6 +474,7 @@ protected CompletableFuture register(RegisterRequest request) final long index; try (RegisterEntry entry = context.getLog().createEntry(RegisterEntry.class)) { + entry.setPersistenceLevel(PersistenceLevel.DISK); entry.setTerm(context.getTerm()); entry.setTimestamp(timestamp); entry.setConnection(request.connection()); @@ -525,6 +530,7 @@ protected CompletableFuture keepAlive(KeepAliveRequest reques final long index; try (KeepAliveEntry entry = context.getLog().createEntry(KeepAliveEntry.class)) { + entry.setPersistenceLevel(PersistenceLevel.MEMORY); entry.setTerm(context.getTerm()); entry.setSession(request.session()); entry.setTimestamp(timestamp); @@ -915,7 +921,8 @@ private void updateConfiguration(MemberState member) { .build(); try (ConfigurationEntry entry = context.getLog().createEntry(ConfigurationEntry.class)) { - entry.setTerm(context.getTerm()) + entry.setPersistenceLevel(PersistenceLevel.DISK) + .setTerm(context.getTerm()) .setActive(activeMembers) .setPassive(passiveMembers); long index = context.getLog().appendEntry(entry); diff --git a/storage/src/main/java/net/kuujo/copycat/io/storage/Entry.java b/storage/src/main/java/net/kuujo/copycat/io/storage/Entry.java index 45e9c270c3..ffa29d7757 100644 --- a/storage/src/main/java/net/kuujo/copycat/io/storage/Entry.java +++ b/storage/src/main/java/net/kuujo/copycat/io/storage/Entry.java @@ -38,6 +38,7 @@ public abstract class Entry> implements ReferenceCounted> referenceManager; private final AtomicInteger references = new AtomicInteger(); private long index; + private PersistenceLevel persistenceLevel; protected Entry() { referenceManager = null; @@ -68,6 +69,27 @@ public T setIndex(long index) { return (T) this; } + /** + * Returns the entry storage level. + * + * @return The entry storage level. + */ + public PersistenceLevel getPersistenceLevel() { + return persistenceLevel; + } + + /** + * Sets the entry storage level. + * + * @param level The entry storage level. + * @return The entry. + */ + @SuppressWarnings("unchecked") + public T setPersistenceLevel(PersistenceLevel level) { + this.persistenceLevel = level; + return (T) this; + } + @Override public Entry acquire() { references.incrementAndGet(); diff --git a/storage/src/main/java/net/kuujo/copycat/io/storage/OffsetIndex.java b/storage/src/main/java/net/kuujo/copycat/io/storage/OffsetIndex.java index 9f440e2a45..5c9bca4ba2 100644 --- a/storage/src/main/java/net/kuujo/copycat/io/storage/OffsetIndex.java +++ b/storage/src/main/java/net/kuujo/copycat/io/storage/OffsetIndex.java @@ -246,9 +246,9 @@ public int length(int offset) { } /** - * Performs a binary search to find the given offset in the buffer. + * Returns the relative offset for the given offset. */ - private int search(int offset) { + public int relativeOffset(int offset) { if (size == 0) { return -1; } @@ -260,10 +260,11 @@ private int search(int offset) { int mid = lo + (hi - lo) / 2; int i = buffer.readInt(mid * ENTRY_SIZE + HEADER_SIZE); if (i == offset) { - return mid * ENTRY_SIZE + HEADER_SIZE; + return mid; } else if (lo == mid) { - if (buffer.readInt(hi * ENTRY_SIZE + HEADER_SIZE) == offset) { - return hi * ENTRY_SIZE + HEADER_SIZE; + i = buffer.readInt(hi * ENTRY_SIZE + HEADER_SIZE); + if (i == offset) { + return hi; } return -1; } else if (i < offset) { @@ -274,11 +275,19 @@ private int search(int offset) { } if (buffer.readInt(hi * ENTRY_SIZE + HEADER_SIZE) == offset) { - return hi * ENTRY_SIZE + HEADER_SIZE; + return hi; } return -1; } + /** + * Performs a binary search to find the given offset in the buffer. + */ + private int search(int offset) { + int relativeOffset = relativeOffset(offset); + return relativeOffset != -1 ? relativeOffset * ENTRY_SIZE + HEADER_SIZE : -1; + } + /** * Truncates the index up to the given offset. *

@@ -323,12 +332,17 @@ public void truncate(int offset) { * @param offset The offset to delete. */ public boolean delete(int offset) { - if (deletes.size() <= offset) { - while (deletes.size() <= offset) { + int relativeOffset = relativeOffset(offset); + if (relativeOffset == -1) { + return false; + } + + if (deletes.size() <= relativeOffset) { + while (deletes.size() <= relativeOffset) { deletes.resize(deletes.size() * 2); } } - return deletes.set(offset); + return deletes.set(relativeOffset); } /** @@ -338,7 +352,8 @@ public boolean delete(int offset) { * @return Whether the offset has been marked for deletion. */ public boolean deleted(int offset) { - return deletes.size() > offset && deletes.get(offset); + int relativeOffset = relativeOffset(offset); + return relativeOffset == -1 || (deletes.size() > relativeOffset && deletes.get(relativeOffset)); } /** diff --git a/storage/src/main/java/net/kuujo/copycat/io/storage/StorageLevel.java b/storage/src/main/java/net/kuujo/copycat/io/storage/PersistenceLevel.java similarity index 97% rename from storage/src/main/java/net/kuujo/copycat/io/storage/StorageLevel.java rename to storage/src/main/java/net/kuujo/copycat/io/storage/PersistenceLevel.java index 8c2f42a13f..56f2bc60e1 100644 --- a/storage/src/main/java/net/kuujo/copycat/io/storage/StorageLevel.java +++ b/storage/src/main/java/net/kuujo/copycat/io/storage/PersistenceLevel.java @@ -22,7 +22,7 @@ * * @author Jordan Halterman */ -public enum StorageLevel { +public enum PersistenceLevel { /** * Memory storage level. diff --git a/storage/src/main/java/net/kuujo/copycat/io/storage/Segment.java b/storage/src/main/java/net/kuujo/copycat/io/storage/Segment.java index 02ea08c689..7241304769 100644 --- a/storage/src/main/java/net/kuujo/copycat/io/storage/Segment.java +++ b/storage/src/main/java/net/kuujo/copycat/io/storage/Segment.java @@ -16,6 +16,7 @@ package net.kuujo.copycat.io.storage; import net.kuujo.copycat.io.Buffer; +import net.kuujo.copycat.io.FileBuffer; import net.kuujo.copycat.io.serializer.Serializer; /** @@ -27,44 +28,42 @@ class Segment implements AutoCloseable { /** * Opens a new segment. - * - * @param buffer The segment buffer. - * @param descriptor The segment descriptor. - * @param index The segment index. - * @param serializer The segment entry serializer. - * @return The opened segment. */ - static Segment open(Buffer buffer, SegmentDescriptor descriptor, OffsetIndex index, Serializer serializer) { - return new Segment(buffer, descriptor, index, serializer); + static Segment open(Buffer diskBuffer, Buffer memoryBuffer, SegmentDescriptor descriptor, OffsetIndex diskIndex, OffsetIndex memoryIndex, Serializer serializer) { + return new Segment(diskBuffer, memoryBuffer, descriptor, diskIndex, memoryIndex, serializer); } private final SegmentDescriptor descriptor; private final Serializer serializer; - private final Buffer source; - private final Buffer writeBuffer; - private final Buffer readBuffer; - private final OffsetIndex offsetIndex; + private final Buffer diskBuffer; + private final Buffer memoryBuffer; + private final OffsetIndex diskIndex; + private final OffsetIndex memoryIndex; private int skip = 0; private boolean open = true; - Segment(Buffer buffer, SegmentDescriptor descriptor, OffsetIndex offsetIndex, Serializer serializer) { - if (buffer == null) - throw new NullPointerException("buffer cannot be null"); + Segment(Buffer diskBuffer, Buffer memoryBuffer, SegmentDescriptor descriptor, OffsetIndex diskIndex, OffsetIndex memoryIndex, Serializer serializer) { + if (diskBuffer == null) + throw new NullPointerException("diskBuffer cannot be null"); + if (memoryBuffer == null) + throw new NullPointerException("memoryBuffer cannot be null"); if (descriptor == null) throw new NullPointerException("descriptor cannot be null"); - if (offsetIndex == null) - throw new NullPointerException("index cannot be null"); + if (diskIndex == null) + throw new NullPointerException("diskIndex cannot be null"); + if (memoryIndex == null) + throw new NullPointerException("memoryIndex cannot be null"); if (serializer == null) throw new NullPointerException("serializer cannot be null"); - this.source = buffer; this.serializer = serializer; - this.writeBuffer = buffer.slice(); - this.readBuffer = writeBuffer.asReadOnlyBuffer(); + this.diskBuffer = diskBuffer; + this.memoryBuffer = memoryBuffer; this.descriptor = descriptor; - this.offsetIndex = offsetIndex; - if (offsetIndex.size() > 0) { - writeBuffer.position(offsetIndex.position(offsetIndex.lastOffset()) + offsetIndex.length(offsetIndex.lastOffset())); + this.diskIndex = diskIndex; + this.memoryIndex = memoryIndex; + if (diskIndex.size() > 0) { + diskBuffer.position(diskIndex.position(diskIndex.lastOffset()) + diskIndex.length(diskIndex.lastOffset())); } } @@ -92,7 +91,14 @@ public boolean isOpen() { * @return Indicates whether the segment is empty. */ public boolean isEmpty() { - return offsetIndex.size() > 0 ? offsetIndex.lastOffset() - offsetIndex.offset() + 1 + skip == 0 : skip == 0; + return isEmpty(memoryIndex) && isEmpty(diskIndex); + } + + /** + * Returns a boolean value indicating whether the given index is empty. + */ + private boolean isEmpty(OffsetIndex index) { + return index.size() > 0 ? index.lastOffset() - index.offset() + 1 + skip == 0 : skip == 0; } /** @@ -101,7 +107,9 @@ public boolean isEmpty() { * @return Indicates whether the segment is full. */ public boolean isFull() { - return size() >= descriptor.maxSegmentSize() || offsetIndex.lastOffset() >= descriptor.maxEntries() - 1 || offsetIndex.lastOffset() + skip + 1 == Integer.MAX_VALUE; + return size() >= descriptor.maxSegmentSize() + || memoryIndex.size() + diskIndex.size() >= descriptor.maxEntries() + || Math.max(memoryIndex.lastOffset(), diskIndex.lastOffset()) + skip + 1 == Integer.MAX_VALUE; } /** @@ -110,7 +118,7 @@ public boolean isFull() { * @return The count of the segment in bytes. */ public long size() { - return writeBuffer.offset() + writeBuffer.position(); + return diskBuffer.offset() + diskBuffer.position() + memoryBuffer.offset() + memoryBuffer.position(); } /** @@ -119,7 +127,7 @@ public long size() { * @return The current range of the segment. */ public int length() { - return !isEmpty() ? offsetIndex.lastOffset() - offsetIndex.offset() + 1 + skip : 0; + return !isEmpty() ? Math.max(memoryIndex.lastOffset(), diskIndex.lastOffset()) - Math.min(memoryIndex.offset(), diskIndex.offset()) + 1 + skip : 0; } /** @@ -128,7 +136,7 @@ public int length() { * @return The count of entries in the segment. */ public int count() { - return offsetIndex.lastOffset() + 1 - offsetIndex.deletes(); + return Math.max(memoryIndex.lastOffset(), diskIndex.lastOffset()) + 1 - (memoryIndex.deletes() + diskIndex.deletes()); } /** @@ -137,7 +145,7 @@ public int count() { * @return The index of the segment. */ long index() { - return descriptor.index() + offsetIndex.offset(); + return descriptor.index() + Math.min(memoryIndex.offset(), diskIndex.offset()); } /** @@ -148,7 +156,7 @@ long index() { public long firstIndex() { if (!isOpen()) throw new IllegalStateException("segment not open"); - return !isEmpty() ? descriptor.index() + offsetIndex.offset() : 0; + return !isEmpty() ? descriptor.index() + Math.max(0, Math.min(diskIndex.offset(), memoryIndex.offset())) : 0; } /** @@ -159,7 +167,7 @@ public long firstIndex() { public long lastIndex() { if (!isOpen()) throw new IllegalStateException("segment not open"); - return !isEmpty() ? offsetIndex.lastOffset() + descriptor.index() + skip : descriptor.index() - 1; + return !isEmpty() ? Math.max(diskIndex.lastOffset(), memoryIndex.lastOffset()) + descriptor.index() + skip : descriptor.index() - 1; } /** @@ -179,7 +187,9 @@ public long nextIndex() { */ public Segment compact(long firstIndex) { if (!isEmpty()) { - offsetIndex.resetOffset(offset(firstIndex)); + int offset = offset(firstIndex); + diskIndex.resetOffset(offset); + memoryIndex.resetOffset(offset); } return this; } @@ -207,10 +217,20 @@ private void checkRange(long index) { * Commits an entry to the segment. */ public long appendEntry(Entry entry) { - if (isFull()) { + if (isFull()) throw new IllegalStateException("segment is full"); + + if (entry.getPersistenceLevel() == PersistenceLevel.DISK) { + return appendEntry(entry, diskBuffer, diskIndex); + } else { + return appendEntry(entry, memoryBuffer, memoryIndex); } + } + /** + * Appends an entry to the segment. + */ + private long appendEntry(Entry entry, Buffer buffer, OffsetIndex offsetIndex) { long index = nextIndex(); if (entry.getIndex() != index) { @@ -221,13 +241,13 @@ public long appendEntry(Entry entry) { int offset = offset(index); // Record the starting position of the new entry. - long position = writeBuffer.position(); + long position = buffer.position(); // Serialize the object into the segment buffer. - serializer.writeObject(entry, writeBuffer.limit(-1)); + serializer.writeObject(entry, buffer.limit(-1)); // Calculate the length of the serialized bytes based on the resulting buffer position and the starting position. - int length = (int) (writeBuffer.position() - position); + int length = (int) (buffer.position() - position); // Index the offset, position, and length. offsetIndex.index(offset, position, length); @@ -253,31 +273,37 @@ public synchronized T getEntry(long index) { int offset = offset(index); // Return null if the offset has been deleted from the segment. - if (offsetIndex.deleted(offset)) { + if (memoryIndex.deleted(offset) && diskIndex.deleted(offset)) { return null; } - // Get the start position of the offset from the offset index. - long position = offsetIndex.position(offset); + // Get the start position of the entry from the memory index. + long position = memoryIndex.position(offset); - // If the position is -1 then that indicates no start position was found. The offset may have been removed from - // the index via deduplication or compaction. + // If the memory index contained the entry, read the entry from the memory buffer. if (position != -1) { - - // Get the length of the offset entry from the offset index. This will be calculated by getting the start - // position of the next offset in the index and subtracting this position from the next position. - int length = offsetIndex.length(offset); - - // Deserialize the entry from a slice of the underlying buffer. - try (Buffer value = readBuffer.slice(position, length)) { - T entry = serializer.readObject(value); - entry.setIndex(index); - return entry; + return getEntry(index, memoryBuffer, position, memoryIndex.length(offset), PersistenceLevel.MEMORY); + } else { + // If the memory index did not contain the entry, attempt to read it from disk. + position = diskIndex.position(offset); + if (position != -1) { + return getEntry(index, diskBuffer, position, diskIndex.length(offset), PersistenceLevel.DISK); } } return null; } + /** + * Reads an entry from the segment. + */ + private synchronized T getEntry(long index, Buffer buffer, long position, int length, PersistenceLevel persistenceLevel) { + try (Buffer value = buffer.slice(position, length)) { + T entry = serializer.readObject(value); + entry.setIndex(index).setPersistenceLevel(persistenceLevel); + return entry; + } + } + /** * Returns a boolean value indicating whether the given index is within the range of the segment. * @@ -299,7 +325,14 @@ public boolean containsIndex(long index) { public boolean containsEntry(long index) { if (!isOpen()) throw new IllegalStateException("segment not open"); - return containsIndex(index) && offsetIndex.contains(offset(index)); + + if (!containsIndex(index)) { + return false; + } + + // Check the memory index first for performance reasons. + int offset = offset(index); + return memoryIndex.contains(offset) || diskIndex.contains(offset); } /** @@ -311,7 +344,9 @@ public boolean containsEntry(long index) { public Segment cleanEntry(long index) { if (!isOpen()) throw new IllegalStateException("segment not open"); - offsetIndex.delete(offset(index)); + int offset = offset(index); + diskIndex.delete(offset); + memoryIndex.delete(offset); return this; } @@ -337,12 +372,16 @@ public Segment skip(long entries) { public Segment truncate(long index) { if (!isOpen()) throw new IllegalStateException("segment not open"); + int offset = offset(index); - if (offset < offsetIndex.lastOffset()) { - int diff = offsetIndex.lastOffset() - offset; + int lastOffset = Math.max(diskIndex.lastOffset(), memoryIndex.lastOffset()); + + if (offset < lastOffset) { + int diff = lastOffset - offset; skip = Math.max(skip - diff, 0); - offsetIndex.truncate(offset); - offsetIndex.flush(); + diskIndex.truncate(offset); + memoryIndex.truncate(offset); + diskIndex.flush(); } return this; } @@ -353,17 +392,17 @@ public Segment truncate(long index) { * @return The segment. */ public Segment flush() { - writeBuffer.flush(); - offsetIndex.flush(); + diskBuffer.flush(); + diskIndex.flush(); return this; } @Override public void close() { - readBuffer.close(); - writeBuffer.close(); - source.close(); - offsetIndex.close(); + diskBuffer.close(); + diskIndex.close(); + memoryBuffer.close(); + memoryIndex.close(); open = false; } @@ -371,7 +410,10 @@ public void close() { * Deletes the segment. */ public void delete() { - offsetIndex.delete(); + if (diskBuffer instanceof FileBuffer) { + ((FileBuffer) diskBuffer).delete(); + } + diskIndex.delete(); descriptor.delete(); } diff --git a/storage/src/main/java/net/kuujo/copycat/io/storage/SegmentManager.java b/storage/src/main/java/net/kuujo/copycat/io/storage/SegmentManager.java index e776b187b3..6c8ff529a2 100644 --- a/storage/src/main/java/net/kuujo/copycat/io/storage/SegmentManager.java +++ b/storage/src/main/java/net/kuujo/copycat/io/storage/SegmentManager.java @@ -16,10 +16,10 @@ package net.kuujo.copycat.io.storage; import net.kuujo.copycat.io.Buffer; +import net.kuujo.copycat.io.DirectBuffer; import net.kuujo.copycat.io.FileBuffer; import net.kuujo.copycat.io.HeapBuffer; import net.kuujo.copycat.io.serializer.Serializer; -import net.kuujo.copycat.util.ConfigurationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,6 +82,8 @@ private void open() { .withMaxEntries(storage.maxEntriesPerSegment()) .build()) { + descriptor.lock(); + currentSegment = createSegment(descriptor); currentSegment.descriptor().update(System.currentTimeMillis()); currentSegment.descriptor().lock(); @@ -123,6 +125,7 @@ private void resetCurrentSegment() { .withMaxSegmentSize(storage.maxSegmentSize()) .withMaxEntries(storage.maxEntriesPerSegment()) .build()) { + descriptor.lock(); currentSegment = createSegment(descriptor); } segments.put(1L, currentSegment); @@ -163,6 +166,7 @@ public Segment nextSegment() { .withMaxSegmentSize(storage.maxSegmentSize()) .withMaxEntries(storage.maxEntriesPerSegment()) .build()) { + descriptor.lock(); currentSegment = createSegment(descriptor); segments.put(descriptor.index(), currentSegment); } @@ -229,38 +233,13 @@ synchronized void moveSegment(long index, Segment segment) { } /** - * Creates a new segment. + * Create a new segment. */ public Segment createSegment(SegmentDescriptor descriptor) { - switch (storage.level()) { - case DISK: - return createDiskSegment(descriptor); - case MEMORY: - return createMemorySegment(descriptor); - default: - throw new ConfigurationException("unknown storage level: " + storage.level()); - } - } - - /** - * Create an on-disk segment. - */ - private Segment createDiskSegment(SegmentDescriptor descriptor) { File segmentFile = SegmentFile.createSegmentFile(storage.directory(), descriptor.id(), descriptor.version()); Buffer buffer = FileBuffer.allocate(segmentFile, 1024 * 1024, descriptor.maxSegmentSize() + SegmentDescriptor.BYTES); - Segment segment = Segment.open(buffer.position(SegmentDescriptor.BYTES).slice(), descriptor, createIndex(descriptor), storage.serializer().clone()); - LOGGER.debug("Created persistent segment: {}", segment); - return segment; - } - - /** - * Creates an in memory segment. - */ - private Segment createMemorySegment(SegmentDescriptor descriptor) { - Buffer buffer = HeapBuffer.allocate(Math.min(1024 * 1024, storage.maxSegmentSize() + storage.maxEntrySize() + SegmentDescriptor.BYTES), - storage.maxSegmentSize() + storage.maxEntrySize() + SegmentDescriptor.BYTES); - Segment segment = Segment.open(buffer.position(SegmentDescriptor.BYTES).slice(), descriptor, createIndex(descriptor), storage.serializer().clone()); - LOGGER.debug("Created ephemeral segment: {}", segment); + Segment segment = Segment.open(buffer.position(SegmentDescriptor.BYTES).slice(), DirectBuffer.allocate(1024 * 1024, descriptor.maxSegmentSize()), descriptor, createDiskIndex(descriptor), createMemoryIndex(descriptor), storage.serializer().clone()); + LOGGER.debug("Created segment: {}", segment); return segment; } @@ -268,26 +247,12 @@ private Segment createMemorySegment(SegmentDescriptor descriptor) { * Loads a segment. */ public Segment loadSegment(long segmentId, long segmentVersion) { - switch (storage.level()) { - case DISK: - return loadDiskSegment(segmentId, segmentVersion); - case MEMORY: - return loadMemorySegment(segmentId, segmentVersion); - default: - throw new ConfigurationException("unknown storage level: " + storage.level()); - } - } - - /** - * Loads a segment from disk. - */ - private Segment loadDiskSegment(long segmentId, long segmentVersion) { File file = SegmentFile.createSegmentFile(storage.directory(), segmentId, segmentVersion); try (SegmentDescriptor descriptor = new SegmentDescriptor(FileBuffer.allocate(file, SegmentDescriptor.BYTES))) { Buffer buffer = FileBuffer.allocate(file, Math.min(1024 * 1024, storage.maxSegmentSize() + storage.maxEntrySize() + SegmentDescriptor.BYTES), storage.maxSegmentSize() + storage.maxEntrySize() + SegmentDescriptor.BYTES); buffer = buffer.position(SegmentDescriptor.BYTES).slice(); - Segment segment = Segment.open(buffer, descriptor, createIndex(descriptor), storage.serializer().clone()); + Segment segment = Segment.open(buffer, DirectBuffer.allocate(1024 * 1024, storage.maxSegmentSize()), descriptor, createDiskIndex(descriptor), createMemoryIndex(descriptor), storage.serializer().clone()); LOGGER.debug("Loaded segment: {} ({})", descriptor.id(), file.getName()); return segment; } @@ -300,26 +265,12 @@ private Segment loadMemorySegment(long segmentId, long segmentVersion) { throw new IllegalStateException("cannot load memory segment"); } - /** - * Creates a segment index. - */ - private OffsetIndex createIndex(SegmentDescriptor descriptor) { - switch (storage.level()) { - case DISK: - return createDiskIndex(descriptor); - case MEMORY: - return createMemoryIndex(descriptor); - default: - throw new ConfigurationException("unknown storage level: " + storage.level()); - } - } - /** * Creates an on disk segment index. */ private OffsetIndex createDiskIndex(SegmentDescriptor descriptor) { File file = SegmentFile.createIndexFile(storage.directory(), descriptor.id(), descriptor.version()); - return new OffsetIndex(FileBuffer.allocate(file, Math.min(1024 * 1024, descriptor.maxEntries()), OffsetIndex.size(descriptor.maxEntries()))); + return new OffsetIndex(FileBuffer.allocate(file, Math.min(1024 * 1024, descriptor.maxEntries() * 8), OffsetIndex.size(descriptor.maxEntries()))); } /** @@ -424,7 +375,7 @@ public void delete() { @Override public String toString() { - return String.format("%s[directory=%s, level=%s, segments=%d]", getClass().getSimpleName(), storage.directory(), storage.level(), segments.size()); + return String.format("%s[directory=%s, level=%s, segments=%d]", getClass().getSimpleName(), storage.directory(), segments.size()); } } diff --git a/storage/src/main/java/net/kuujo/copycat/io/storage/Storage.java b/storage/src/main/java/net/kuujo/copycat/io/storage/Storage.java index a901670f47..a5b9050fe5 100644 --- a/storage/src/main/java/net/kuujo/copycat/io/storage/Storage.java +++ b/storage/src/main/java/net/kuujo/copycat/io/storage/Storage.java @@ -29,12 +29,12 @@ * {@code * Storage storage = Storage.builder() * .withDirectory(new File("logs")) - * .withStorageLevel(StorageLevel.DISK) + * .withPersistenceLevel(PersistenceLevel.DISK) * .build(); * } * - * Copycat's storage facility supports two modes - {@link StorageLevel#DISK} and {@link StorageLevel#MEMORY}. - * By default, the storage module uses {@link StorageLevel#DISK} and {@link #directory()} defaults + * Copycat's storage facility supports two modes - {@link PersistenceLevel#DISK} and {@link PersistenceLevel#MEMORY}. + * By default, the storage module uses {@link PersistenceLevel#DISK} and {@link #directory()} defaults * to {@code System.getProperty("user.dir")}. *

* Users can also configure a number of options related to how {@link Log logs} are constructed and managed. @@ -55,7 +55,6 @@ public class Storage { private Serializer serializer = new Serializer(); private File directory = new File(DEFAULT_DIRECTORY); - private StorageLevel level = StorageLevel.DISK; private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE; private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE; private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT; @@ -82,19 +81,10 @@ public Serializer serializer() { return serializer; } - /** - * Returns the storage level. - * - * @return The storage level. - */ - public StorageLevel level() { - return level; - } - /** * Returns the storage directory. * - * @return The storage directory or {@code null} if the {@link #level()} is {@link StorageLevel#MEMORY} + * @return The storage directory. */ public File directory() { return directory; @@ -147,7 +137,7 @@ public Log open() { @Override public String toString() { - return String.format("%s[directory=%s, level=%s]", getClass().getSimpleName(), directory, level); + return String.format("%s[directory=%s]", getClass().getSimpleName(), directory); } /** @@ -206,23 +196,6 @@ public Builder withDirectory(File directory) { return this; } - /** - * Sets the log storage level. - *

- * The storage level dictates how entries in the log are persisted. By default, the {@link StorageLevel#DISK} level - * is used to persist entries to disk. - * - * @param level The storage level. - * @return The log builder. - * @throws java.lang.NullPointerException If the {@code level} is {@code null} - */ - public Builder withStorageLevel(StorageLevel level) { - if (level == null) - throw new NullPointerException("level cannot be null"); - storage.level = level; - return this; - } - /** * Sets the maximum entry count, returning the builder for method chaining. *

diff --git a/storage/src/main/java/net/kuujo/copycat/io/storage/TypedEntryPool.java b/storage/src/main/java/net/kuujo/copycat/io/storage/TypedEntryPool.java index 1fbf0f7b14..d4e8dd67da 100644 --- a/storage/src/main/java/net/kuujo/copycat/io/storage/TypedEntryPool.java +++ b/storage/src/main/java/net/kuujo/copycat/io/storage/TypedEntryPool.java @@ -56,7 +56,7 @@ public > T acquire(Class type, long index) { } T entry = pool.acquire(); - entry.setIndex(index); + entry.setIndex(index).setPersistenceLevel(PersistenceLevel.DISK); return entry; } diff --git a/storage/src/test/java/net/kuujo/copycat/io/storage/CleanerTest.java b/storage/src/test/java/net/kuujo/copycat/io/storage/CleanerTest.java index 51d351e7a5..4d66572f61 100644 --- a/storage/src/test/java/net/kuujo/copycat/io/storage/CleanerTest.java +++ b/storage/src/test/java/net/kuujo/copycat/io/storage/CleanerTest.java @@ -35,7 +35,6 @@ public class CleanerTest extends ConcurrentTestCase { */ public void testCompact() throws Throwable { Storage storage = Storage.builder() - .withStorageLevel(StorageLevel.MEMORY) .withMaxEntriesPerSegment(10) .withSerializer(new Serializer(new ServiceLoaderTypeResolver())) .build(); diff --git a/storage/src/test/java/net/kuujo/copycat/io/storage/LogTest.java b/storage/src/test/java/net/kuujo/copycat/io/storage/LogTest.java index fa35ac3d46..4d3091def9 100644 --- a/storage/src/test/java/net/kuujo/copycat/io/storage/LogTest.java +++ b/storage/src/test/java/net/kuujo/copycat/io/storage/LogTest.java @@ -18,6 +18,8 @@ import net.kuujo.copycat.io.serializer.Serializer; import net.kuujo.copycat.io.serializer.ServiceLoaderTypeResolver; import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; /** @@ -27,6 +29,12 @@ */ @Test public class LogTest { + private Log log; + + @BeforeMethod + public void resetLog() { + log = null; + } /** * Tests writing and reading an entry. @@ -218,13 +226,13 @@ public void testSkipOnRollOver() { */ private Log createLog() { Storage storage = Storage.builder() - .withStorageLevel(StorageLevel.MEMORY) .withMaxEntrySize(1024) .withMaxSegmentSize(1024 * 1024) .withMaxEntriesPerSegment(1024) .withSerializer(new Serializer(new ServiceLoaderTypeResolver())) .build(); - return storage.open(); + log = storage.open(); + return log; } /** @@ -240,4 +248,11 @@ private void appendEntries(Log log, int entries) { } } + @AfterMethod + public void deleteLog() { + if (log != null) { + log.delete(); + } + } + }