Skip to content

Commit

Permalink
Fixes #55: Support multiple resource instances per node.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Sep 25, 2015
1 parent 7e1341c commit fb132f7
Showing 1 changed file with 119 additions and 67 deletions.
186 changes: 119 additions & 67 deletions core/src/main/java/io/atomix/copycat/manager/ResourceManager.java
Expand Up @@ -20,16 +20,15 @@
import io.atomix.catalog.server.StateMachine; import io.atomix.catalog.server.StateMachine;
import io.atomix.catalog.server.StateMachineExecutor; import io.atomix.catalog.server.StateMachineExecutor;
import io.atomix.catalyst.util.Assert; import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.concurrent.ComposableFuture;
import io.atomix.catalyst.util.concurrent.ThreadContext; import io.atomix.catalyst.util.concurrent.ThreadContext;
import io.atomix.catalyst.util.concurrent.ThreadPoolContext; import io.atomix.catalyst.util.concurrent.ThreadPoolContext;
import io.atomix.copycat.resource.ResourceOperation; import io.atomix.copycat.resource.ResourceOperation;


import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Function; import java.util.function.Function;


/** /**
Expand All @@ -42,6 +41,7 @@ public class ResourceManager extends StateMachine {
private StateMachineExecutor executor; private StateMachineExecutor executor;
private final Map<String, Long> paths = new HashMap<>(); private final Map<String, Long> paths = new HashMap<>();
private final Map<Long, ResourceHolder> resources = new HashMap<>(); private final Map<Long, ResourceHolder> resources = new HashMap<>();
private final Map<Long, SessionHolder> sessions = new HashMap<>();
private final ResourceCommitPool commits = new ResourceCommitPool(); private final ResourceCommitPool commits = new ResourceCommitPool();


/** /**
Expand All @@ -62,25 +62,34 @@ public void configure(StateMachineExecutor executor) {
} }


/** /**
* Applies resource commands. * Performs an operation on a resource.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected CompletableFuture<Object> operateResource(Commit<? extends ResourceOperation> commit) { private CompletableFuture<Object> operateResource(Commit<ResourceOperation> commit) {
final ResourceHolder resource = resources.get(commit.operation().resource()); ResourceHolder resource;
if (resource != null) { ManagedResourceSession session = null;
CompletableFuture<Object> future = new ComposableFuture<>();
ResourceCommit resourceCommit = commits.acquire(commit, resource.sessions.computeIfAbsent(commit.session().id(), SessionHolder client = sessions.get(commit.operation().resource());
id -> new ManagedResourceSession(commit.operation().resource(), commit.session()))); if (client != null) {
resource.executor.execute(resourceCommit).whenComplete((BiConsumer<Object, Throwable>) (result, error) -> { resource = resources.get(client.resource);
if (error == null) { session = client.session;
future.complete(result); } else {
} else { resource = resources.get(commit.operation().resource());
future.completeExceptionally(error); }
}
}); if (resource == null) {
return future; throw new ResourceManagerException("unknown resource: " + commit.operation().resource());
}

if (session == null) {
session = resource.sessions.get(commit.session().id());
if (session == null) {
throw new ResourceManagerException("unknown resource session: " + commit.session().id());
}
} }
throw new IllegalArgumentException("unknown resource: " + commit.operation().resource());
ResourceCommit resourceCommit = commits.acquire(commit, session);
return resource.executor.execute(resourceCommit);
} }


/** /**
Expand All @@ -89,58 +98,68 @@ protected CompletableFuture<Object> operateResource(Commit<? extends ResourceOpe
protected long getResource(Commit<GetResource> commit) { protected long getResource(Commit<GetResource> commit) {
String path = commit.operation().path(); String path = commit.operation().path();


Long id = paths.get(path); Long resourceId = paths.get(path);
if (id == null) {
throw new ResourceManagerException("unknown path: " + path);
}


ResourceHolder resource = resources.get(id); if (resourceId == null) {
if (resource == null) { resourceId = commit.index();
throw new ResourceManagerException("unknown resource: " + path); paths.put(path, commit.index());
}


if (resource.stateMachine.getClass() != commit.operation().type()) { try {
throw new ResourceManagerException("inconsistent resource type: " + commit.operation().type()); StateMachine stateMachine = commit.operation().type().newInstance();
ThreadContext context = new ThreadPoolContext(scheduler, ThreadContext.currentContext().serializer().clone());
ResourceStateMachineExecutor executor = new ResourceStateMachineExecutor(commit.index(), this.executor, context);
ResourceHolder resource = new ResourceHolder(path, stateMachine, executor);
resources.put(resourceId, resource);
stateMachine.init(executor);
resource.sessions.put(commit.session().id(), new ManagedResourceSession(resourceId, commit.session()));
} catch (InstantiationException | IllegalAccessException e) {
throw new ResourceManagerException("failed to instantiate state machine", e);
}
} else {
ResourceHolder resource = resources.get(resourceId);
if (resource == null || resource.stateMachine.getClass() != commit.operation().type()) {
throw new ResourceManagerException("inconsistent resource type: " + commit.operation().type());
}
} }

return resourceId;
return id;
} }


/** /**
* Applies a create resource commit. * Applies a create resource commit.
*/ */
protected long createResource(Commit<CreateResource> commit) { private long createResource(Commit<CreateResource> commit) {
String path = commit.operation().path(); String path = commit.operation().path();


// Check for an existing instance of a resource at this path. Long resourceId = paths.get(path);
Long id = paths.get(path);


// If a resource already exists, verify that it is of the same type. long id;
if (id != null) { ResourceHolder resource;
ResourceHolder resource = resources.get(id); if (resourceId == null) {
if (resource != null) { resourceId = commit.index();
if (resource.stateMachine.getClass() != commit.operation().type()) { paths.put(path, commit.index());
throw new ResourceManagerException("inconsistent resource type: " + commit.operation().type());
} try {
return id; StateMachine stateMachine = commit.operation().type().newInstance();
ThreadContext context = new ThreadPoolContext(scheduler, ThreadContext.currentContext().serializer().clone());
ResourceStateMachineExecutor executor = new ResourceStateMachineExecutor(commit.index(), this.executor, context);
resource = new ResourceHolder(path, stateMachine, executor);
resources.put(resourceId, resource);
stateMachine.init(executor);
} catch (InstantiationException | IllegalAccessException e) {
throw new ResourceManagerException("failed to instantiate state machine", e);
}
} else {
resource = resources.get(resourceId);
if (resource == null || resource.stateMachine.getClass() != commit.operation().type()) {
throw new ResourceManagerException("inconsistent resource type: " + commit.operation().type());
} }
} }


id = commit.index(); id = commit.index();

ManagedResourceSession session = new ManagedResourceSession(id, commit.session());
try { sessions.put(id, new SessionHolder(resourceId, session));
StateMachine resource = commit.operation().type().newInstance(); resource.executor.execute(() -> resource.stateMachine.register(session));
ThreadContext context = new ThreadPoolContext(scheduler, ThreadContext.currentContext().serializer().clone()); return id;
ResourceStateMachineExecutor executor = new ResourceStateMachineExecutor(id, this.executor, context);

paths.put(path, id);
resources.put(id, new ResourceHolder(path, resource, executor));

resource.init(executor);
return id;
} catch (InstantiationException | IllegalAccessException e) {
throw new ResourceManagerException("failed to instantiate state machine", e);
}
} }


/** /**
Expand All @@ -161,33 +180,53 @@ protected boolean deleteResource(Commit<DeleteResource> commit) {


resource.executor.close(); resource.executor.close();
paths.remove(resource.path); paths.remove(resource.path);
return true;
}


@Override Iterator<Map.Entry<Long, SessionHolder>> iterator = sessions.entrySet().iterator();
public void register(Session session) { while (iterator.hasNext()) {
for (Map.Entry<Long, ResourceHolder> entry : resources.entrySet()) { if (iterator.next().getValue().resource == commit.operation().resource()) {
ResourceHolder resource = entry.getValue(); iterator.remove();
resource.executor.execute(() -> resource.stateMachine.register(resource.sessions.computeIfAbsent(session.id(), id -> new ManagedResourceSession(entry.getKey(), session)))); }
} }
return true;
} }


@Override @Override
public void close(Session session) { public void expire(Session session) {
for (ResourceHolder resource : resources.values()) { for (ResourceHolder resource : resources.values()) {
ManagedResourceSession resourceSession = resource.sessions.get(session.id()); ManagedResourceSession resourceSession = resource.sessions.get(session.id());
if (resourceSession != null) { if (resourceSession != null) {
resource.executor.execute(() -> resource.stateMachine.close(resourceSession)); resource.executor.execute(() -> resource.stateMachine.expire(resourceSession));
}
}

for (SessionHolder sessionHolder : sessions.values()) {
if (sessionHolder.session.id() == session.id()) {
ResourceHolder resource = resources.get(sessionHolder.resource);
if (resource != null) {
resource.executor.execute(() -> resource.stateMachine.expire(sessionHolder.session));
}
} }
} }
} }


@Override @Override
public void expire(Session session) { public void close(Session session) {
for (ResourceHolder resource : resources.values()) { for (ResourceHolder resource : resources.values()) {
ManagedResourceSession resourceSession = resource.sessions.get(session.id()); ManagedResourceSession resourceSession = resource.sessions.get(session.id());
if (resourceSession != null) { if (resourceSession != null) {
resource.executor.execute(() -> resource.stateMachine.expire(resourceSession)); resource.executor.execute(() -> resource.stateMachine.close(resourceSession));
}
}

Iterator<Map.Entry<Long, SessionHolder>> iterator = sessions.entrySet().iterator();
while (iterator.hasNext()) {
SessionHolder sessionHolder = iterator.next().getValue();
if (sessionHolder.session.id() == session.id()) {
ResourceHolder resource = resources.get(sessionHolder.resource);
if (resource != null) {
resource.executor.execute(() -> resource.stateMachine.close(sessionHolder.session));
}
iterator.remove();
} }
} }
} }
Expand All @@ -202,9 +241,9 @@ public void close() {
*/ */
private static class ResourceHolder { private static class ResourceHolder {
private final String path; private final String path;
private final Map<Long, ManagedResourceSession> sessions = new HashMap<>();
private final StateMachine stateMachine; private final StateMachine stateMachine;
private final ResourceStateMachineExecutor executor; private final ResourceStateMachineExecutor executor;
private final Map<Long, ManagedResourceSession> sessions = new HashMap<>();


private ResourceHolder(String path, StateMachine stateMachine, ResourceStateMachineExecutor executor) { private ResourceHolder(String path, StateMachine stateMachine, ResourceStateMachineExecutor executor) {
this.path = path; this.path = path;
Expand All @@ -213,4 +252,17 @@ private ResourceHolder(String path, StateMachine stateMachine, ResourceStateMach
} }
} }


/**
* Session holder.
*/
private static class SessionHolder {
private final long resource;
private final ManagedResourceSession session;

private SessionHolder(long resource, ManagedResourceSession session) {
this.resource = resource;
this.session = session;
}
}

} }

0 comments on commit fb132f7

Please sign in to comment.