Skip to content

Commit

Permalink
Refactor Commit to facilitate cleaning entries via the Commit API.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Aug 4, 2015
1 parent fb5ac49 commit 35bd65f
Show file tree
Hide file tree
Showing 12 changed files with 460 additions and 271 deletions.
83 changes: 83 additions & 0 deletions core/src/main/java/net/kuujo/copycat/manager/ResourceCommit.java
@@ -0,0 +1,83 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.manager;

import net.kuujo.copycat.raft.Operation;
import net.kuujo.copycat.raft.Session;
import net.kuujo.copycat.raft.server.Commit;

/**
* Resource commit.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
class ResourceCommit implements Commit {
private final ResourceCommitPool pool;
private Commit commit;
private Session session;

public ResourceCommit(ResourceCommitPool pool) {
this.pool = pool;
}

/**
* Resets the resource commit.
*
* @param commit The parent commit.
* @param session The resource session.
*/
void reset(Commit commit, Session session) {
this.commit = commit;
this.session = session;
}

@Override
public long index() {
return commit.index();
}

@Override
public Session session() {
return session;
}

@Override
public long timestamp() {
return commit.timestamp();
}

@Override
public Class type() {
return commit.type();
}

@Override
public Operation operation() {
return commit.operation();
}

@Override
public void clean() {
commit.clean();
pool.release(this);
}

@Override
public String toString() {
return String.format("%s[index=%d, session=%s, timestamp=%d, operation=%s]", getClass().getSimpleName(), index(), session(), timestamp(), operation());
}

}
@@ -0,0 +1,57 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.manager;

import net.kuujo.copycat.raft.Session;
import net.kuujo.copycat.raft.server.Commit;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* Resource commit pool.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
class ResourceCommitPool {
private final Queue<ResourceCommit> pool = new ConcurrentLinkedQueue<>();

/**
* Acquires a commit from the pool.
*
* @param commit The commit to acquire.
* @param session The resource session.
* @return The acquired resource commit.
*/
public ResourceCommit acquire(Commit commit, Session session) {
ResourceCommit resourceCommit = pool.poll();
if (resourceCommit == null) {
resourceCommit = new ResourceCommit(this);
resourceCommit.reset(commit, session);
}
return resourceCommit;
}

/**
* Releases a commit to the pool.
*
* @param commit The commit to release.
*/
public void release(ResourceCommit commit) {
pool.add(commit);
}

}
Expand Up @@ -41,6 +41,7 @@ public class ResourceManager extends StateMachine {
private NodeHolder node;
private final Map<Long, NodeHolder> nodes = new HashMap<>();
private final Map<Long, ResourceHolder> resources = new HashMap<>();
private final ResourceCommitPool commits = new ResourceCommitPool();

public ResourceManager(ScheduledExecutorService executor) {
if (executor == null)
Expand All @@ -67,8 +68,9 @@ protected CompletableFuture<Object> commandResource(Commit<? extends ResourceOpe
if (resource != null) {
CompletableFuture<Object> future = new ComposableFuture<>();
resource.context.execute(() -> {
CompletableFuture<Object> resultFuture = resource.stateMachine.apply(new Commit(commit.index(), resource.sessions.computeIfAbsent(commit.session().id(), id ->
new ManagedResourceSession(commit.operation().resource(), commit.session())), commit.timestamp(), commit.operation().operation()));
CompletableFuture<Object> resultFuture = resource.stateMachine.apply(
commits.acquire(commit, resource.sessions.computeIfAbsent(commit.session().id(),
id -> new ManagedResourceSession(commit.operation().resource(), commit.session()))));
resultFuture.whenComplete((result, error) -> {
if (error == null) {
future.complete(result);
Expand Down
48 changes: 13 additions & 35 deletions server/src/main/java/net/kuujo/copycat/raft/server/Commit.java
Expand Up @@ -31,18 +31,7 @@
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public class Commit<T extends Operation> {
private final long index;
private final long timestamp;
private final Session session;
private final T operation;

public Commit(long index, Session session, long timestamp, T operation) {
this.index = index;
this.session = session;
this.timestamp = timestamp;
this.operation = operation;
}
public interface Commit<T extends Operation> {

/**
* Returns the commit index.
Expand All @@ -57,9 +46,7 @@ public Commit(long index, Session session, long timestamp, T operation) {
*
* @return The commit index.
*/
public long index() {
return index;
}
long index();

/**
* Returns the session that submitted the operation.
Expand All @@ -75,9 +62,7 @@ public long index() {
*
* @return The session that created the commit.
*/
public Session session() {
return session;
}
Session session();

/**
* Returns the commit timestamp.
Expand All @@ -91,9 +76,7 @@ public Session session() {
*
* @return the commit timestamp.
*/
public long timestamp() {
return timestamp;
}
long timestamp();

/**
* Returns the commit type.
Expand All @@ -102,26 +85,21 @@ public long timestamp() {
*
* @return The commit type.
*/
@SuppressWarnings("unchecked")
public Class<T> type() {
return (Class<T>) operation.getClass();
}
Class<T> type();

/**
* Returns the operation submitted by the user.
* <p>
* The returned {@link net.kuujo.copycat.raft.Operation} is the operation submitted by the user via
* {@link net.kuujo.copycat.raft.client.RaftClient#submit(net.kuujo.copycat.raft.Operation)}.
*
* @return The operation submitted by the user.
*/
public T operation() {
return operation;
}
T operation();

@Override
public String toString() {
return String.format("Commit[index=%d, timestamp=%d, session=%s, operation=%s]", index, timestamp, session, operation);
}
/**
* Cleans the commit.
* <p>
* When the commit is cleaned, it will be removed from the log and may be removed permanently from disk at some
* arbitrary point in the future.
*/
void clean();

}
@@ -0,0 +1,87 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.raft.server.state;

import net.kuujo.copycat.raft.Operation;
import net.kuujo.copycat.raft.Session;
import net.kuujo.copycat.raft.server.Commit;
import net.kuujo.copycat.raft.server.storage.OperationEntry;

/**
* Server commit.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
class ServerCommit implements Commit {
private final ServerCommitPool pool;
private final ServerCommitCleaner cleaner;
private final SessionManager sessions;
private OperationEntry entry;
private Session session;

public ServerCommit(ServerCommitPool pool, ServerCommitCleaner cleaner, SessionManager sessions) {
this.pool = pool;
this.cleaner = cleaner;
this.sessions = sessions;
}

/**
* Resets the commit.
*
* @param entry The entry.
*/
void reset(OperationEntry entry) {
this.entry = entry;
this.session = sessions.getSession(entry.getSession());
}

@Override
public long index() {
return entry.getIndex();
}

@Override
public Session session() {
return session;
}

@Override
public long timestamp() {
return entry.getTimestamp();
}

@Override
public Class type() {
return entry.getOperation().getClass();
}

@Override
public Operation operation() {
return entry.getOperation();
}

@Override
public void clean() {
cleaner.clean(entry);
pool.release(this);
}

@Override
public String toString() {
return String.format("%s[index=%d, session=%s, timestamp=%d, operation=%s]", getClass().getSimpleName(), index(), session(), timestamp(), operation());
}

}
@@ -0,0 +1,43 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.raft.server.state;

import net.kuujo.copycat.io.storage.Entry;
import net.kuujo.copycat.io.storage.Log;

/**
* Server commit cleaner.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
class ServerCommitCleaner {
private final Log log;

ServerCommitCleaner(Log log) {
this.log = log;
}

/**
* Cleans the given index from the log.
*
* @param entry The entry to clean.
*/
void clean(Entry entry) {
log.cleanEntry(entry.getIndex());
entry.close();
}

}

0 comments on commit 35bd65f

Please sign in to comment.