Skip to content

Commit

Permalink
Fix thread safety issues when pooling and reusing commits.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Aug 5, 2015
1 parent 6e0a7f3 commit d437833
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 44 deletions.
Expand Up @@ -16,6 +16,7 @@
package net.kuujo.copycat.collections.state; package net.kuujo.copycat.collections.state;


import net.kuujo.copycat.PersistenceLevel; import net.kuujo.copycat.PersistenceLevel;
import net.kuujo.copycat.raft.Operation;
import net.kuujo.copycat.raft.Session; import net.kuujo.copycat.raft.Session;
import net.kuujo.copycat.raft.server.Apply; import net.kuujo.copycat.raft.server.Apply;
import net.kuujo.copycat.raft.server.Commit; import net.kuujo.copycat.raft.server.Commit;
Expand All @@ -25,6 +26,7 @@
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;


/** /**
* Map state machine. * Map state machine.
Expand All @@ -35,6 +37,11 @@ public class MapState extends StateMachine {
private Map<Object, Commit<? extends MapCommands.TtlCommand>> map; private Map<Object, Commit<? extends MapCommands.TtlCommand>> map;
private final Set<Long> sessions = new HashSet<>(); private final Set<Long> sessions = new HashSet<>();


@Override
public CompletableFuture<Object> apply(Commit<? extends Operation> commit) {
return super.apply(commit);
}

@Override @Override
public void register(Session session) { public void register(Session session) {
sessions.add(session.id()); sessions.add(session.id());
Expand Down Expand Up @@ -210,7 +217,7 @@ protected Object remove(Commit<MapCommands.Remove> commit) {
} }
return false; return false;
} else { } else {
Commit<? extends MapCommands.TtlCommand> previous = map.remove(commit.operation().key()); Commit<? extends MapCommands.TtlCommand> previous = map.remove(commit.operation().key());
if (previous == null) { if (previous == null) {
commit.clean(); commit.clean();
return true; return true;
Expand Down
Expand Up @@ -29,6 +29,7 @@ class ResourceCommit implements Commit {
private final ResourceCommitPool pool; private final ResourceCommitPool pool;
private Commit<ResourceOperation> commit; private Commit<ResourceOperation> commit;
private Session session; private Session session;
private volatile boolean open;


public ResourceCommit(ResourceCommitPool pool) { public ResourceCommit(ResourceCommitPool pool) {
this.pool = pool; this.pool = pool;
Expand All @@ -43,6 +44,7 @@ public ResourceCommit(ResourceCommitPool pool) {
void reset(Commit<ResourceOperation> commit, Session session) { void reset(Commit<ResourceOperation> commit, Session session) {
this.commit = commit; this.commit = commit;
this.session = session; this.session = session;
open = true;
} }


@Override @Override
Expand Down Expand Up @@ -72,13 +74,19 @@ public Operation operation() {


@Override @Override
public void clean() { public void clean() {
if (!open)
throw new IllegalStateException("commit closed");
commit.clean(); commit.clean();
close(); close();
} }


@Override @Override
public void close() { public void close() {
pool.release(this); if (open) {
commit.close();
pool.release(this);
open = false;
}
} }


@Override @Override
Expand Down
Expand Up @@ -40,8 +40,8 @@ public ResourceCommit acquire(Commit commit, Session session) {
ResourceCommit resourceCommit = pool.poll(); ResourceCommit resourceCommit = pool.poll();
if (resourceCommit == null) { if (resourceCommit == null) {
resourceCommit = new ResourceCommit(this); resourceCommit = new ResourceCommit(this);
resourceCommit.reset(commit, session);
} }
resourceCommit.reset(commit, session);
return resourceCommit; return resourceCommit;
} }


Expand Down
Expand Up @@ -64,13 +64,14 @@ private void init(Commit commit) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Apply({ResourceCommand.class, ResourceQuery.class}) @Apply({ResourceCommand.class, ResourceQuery.class})
protected CompletableFuture<Object> commandResource(Commit<? extends ResourceOperation> commit) { protected CompletableFuture<Object> commandResource(Commit<? extends ResourceOperation> commit) {
ResourceHolder resource = resources.get(commit.operation().resource()); final ResourceHolder resource = resources.get(commit.operation().resource());
if (resource != null) { if (resource != null) {
CompletableFuture<Object> future = new ComposableFuture<>(); CompletableFuture<Object> future = new ComposableFuture<>();
resource.context.execute(() -> { resource.context.execute(() -> {
CompletableFuture<Object> resultFuture = resource.stateMachine.apply( StateMachine resourceStateMachine = resource.stateMachine;
commits.acquire(commit, resource.sessions.computeIfAbsent(commit.session().id(), ResourceCommit resourceCommit = commits.acquire(commit, resource.sessions.computeIfAbsent(commit.session().id(),
id -> new ManagedResourceSession(commit.operation().resource(), commit.session())))); id -> new ManagedResourceSession(commit.operation().resource(), commit.session())));
CompletableFuture<Object> resultFuture = resourceStateMachine.apply(resourceCommit);
resultFuture.whenComplete((result, error) -> { resultFuture.whenComplete((result, error) -> {
if (error == null) { if (error == null) {
future.complete(result); future.complete(result);
Expand Down
Expand Up @@ -125,26 +125,6 @@ private void declareOperations(Method method) {
} }
} }


/**
* Wraps an operation method.
*/
@SuppressWarnings("unchecked")
private Function<Commit<?>, CompletableFuture<Object>> wrapOperation(Method method) {
if (method.getParameterCount() < 1) {
throw new IllegalStateException("invalid operation method: not enough arguments");
} else if (method.getParameterCount() > 1) {
throw new IllegalStateException("invalid operation method: too many arguments");
} else {
return commit -> {
try {
return (CompletableFuture<Object>) method.invoke(this, commit);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new ApplicationException("failed to invoke operation", e);
}
};
}
}

/** /**
* Finds the operation method for the given operation. * Finds the operation method for the given operation.
*/ */
Expand Down
Expand Up @@ -104,7 +104,7 @@ protected AppendResponse doCheckPreviousEntry(AppendRequest request) {
// If the previous entry term doesn't match the local previous term then reject the request. // If the previous entry term doesn't match the local previous term then reject the request.
RaftEntry entry = context.getLog().getEntry(request.logIndex()); RaftEntry entry = context.getLog().getEntry(request.logIndex());
if (entry == null || entry.getTerm() != request.logTerm()) { if (entry == null || entry.getTerm() != request.logTerm()) {
LOGGER.warn("{} - Rejected {}: Request log term does not match local log term {} for the same entry", context.getMember().id(), request, entry.getTerm()); LOGGER.warn("{} - Rejected {}: Request log term does not match local log term {} for the same entry", context.getMember().id(), request, entry != null ? entry.getTerm() : "unknown");
return AppendResponse.builder() return AppendResponse.builder()
.withStatus(Response.Status.OK) .withStatus(Response.Status.OK)
.withTerm(context.getTerm()) .withTerm(context.getTerm())
Expand Down
Expand Up @@ -29,8 +29,11 @@ class ServerCommit implements Commit {
private final ServerCommitPool pool; private final ServerCommitPool pool;
private final ServerCommitCleaner cleaner; private final ServerCommitCleaner cleaner;
private final SessionManager sessions; private final SessionManager sessions;
private OperationEntry entry; private long index;
private Session session; private Session session;
private long timestamp;
private Operation operation;
private volatile boolean open;


public ServerCommit(ServerCommitPool pool, ServerCommitCleaner cleaner, SessionManager sessions) { public ServerCommit(ServerCommitPool pool, ServerCommitCleaner cleaner, SessionManager sessions) {
this.pool = pool; this.pool = pool;
Expand All @@ -44,13 +47,16 @@ public ServerCommit(ServerCommitPool pool, ServerCommitCleaner cleaner, SessionM
* @param entry The entry. * @param entry The entry.
*/ */
void reset(OperationEntry entry) { void reset(OperationEntry entry) {
this.entry = entry; this.index = entry.getIndex();
this.session = sessions.getSession(entry.getSession()); this.session = sessions.getSession(entry.getSession());
this.timestamp = entry.getTimestamp();
this.operation = entry.getOperation();
open = true;
} }


@Override @Override
public long index() { public long index() {
return entry.getIndex(); return index;
} }


@Override @Override
Expand All @@ -60,28 +66,33 @@ public Session session() {


@Override @Override
public long timestamp() { public long timestamp() {
return entry.getTimestamp(); return timestamp;
} }


@Override @Override
public Class type() { public Class type() {
return entry.getOperation().getClass(); return operation.getClass();
} }


@Override @Override
public Operation operation() { public Operation operation() {
return entry.getOperation(); return operation;
} }


@Override @Override
public void clean() { public void clean() {
cleaner.clean(entry); if (!open)
throw new IllegalStateException("commit closed");
cleaner.clean(index);
close(); close();
} }


@Override @Override
public void close() { public void close() {
pool.release(this); if (open) {
pool.release(this);
open = false;
}
} }


@Override @Override
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/ */
package net.kuujo.copycat.raft.server.state; package net.kuujo.copycat.raft.server.state;


import net.kuujo.copycat.io.storage.Entry;
import net.kuujo.copycat.io.storage.Log; import net.kuujo.copycat.io.storage.Log;


/** /**
Expand All @@ -32,12 +31,9 @@ class ServerCommitCleaner {


/** /**
* Cleans the given index from the log. * Cleans the given index from the log.
*
* @param entry The entry to clean.
*/ */
void clean(Entry entry) { void clean(long index) {
log.cleanEntry(entry.getIndex()); log.cleanEntry(index);
entry.close();
} }


} }
Expand Up @@ -45,8 +45,8 @@ public ServerCommit acquire(OperationEntry entry) {
ServerCommit commit = pool.poll(); ServerCommit commit = pool.poll();
if (commit == null) { if (commit == null) {
commit = new ServerCommit(this, cleaner, sessions); commit = new ServerCommit(this, cleaner, sessions);
commit.reset(entry);
} }
commit.reset(entry);
return commit; return commit;
} }


Expand Down
Expand Up @@ -582,7 +582,8 @@ CompletableFuture<Object> apply(CommandEntry entry) {
if (session.hasResponse(entry.getSequence())) { if (session.hasResponse(entry.getSequence())) {
future = CompletableFuture.completedFuture(session.getResponse(entry.getSequence())); future = CompletableFuture.completedFuture(session.getResponse(entry.getSequence()));
} else { } else {
future = execute(() -> stateMachine.apply(commits.acquire(entry))) ServerCommit commit = commits.acquire(entry);
future = execute(() -> stateMachine.apply(commit))
.thenApply(result -> { .thenApply(result -> {
// Store the command result in the session. // Store the command result in the session.
session.registerResponse(entry.getSequence(), result); session.registerResponse(entry.getSequence(), result);
Expand Down

0 comments on commit d437833

Please sign in to comment.