Skip to content

Commit

Permalink
Clean up resource manager.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Sep 26, 2015
1 parent 93f5c02 commit ce56990
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 18 deletions.
Expand Up @@ -46,6 +46,11 @@ public static Builder builder() {
public DeleteResource() {
}

@Override
public PersistenceLevel persistence() {
return PersistenceLevel.PERSISTENT;
}

/**
* Returns the resource ID.
*
Expand Down
38 changes: 20 additions & 18 deletions core/src/main/java/io/atomix/copycat/manager/ResourceManager.java
Expand Up @@ -67,12 +67,9 @@ public void configure(StateMachineExecutor executor) {
@SuppressWarnings("unchecked")
private CompletableFuture<Object> operateResource(Commit<ResourceOperation> commit) {
ResourceHolder resource;
ManagedResourceSession session = null;

SessionHolder client = sessions.get(commit.operation().resource());
if (client != null) {
resource = resources.get(client.resource);
session = client.session;
SessionHolder session = sessions.get(commit.operation().resource());
if (session != null) {
resource = resources.get(session.resource);
} else {
resource = resources.get(commit.operation().resource());
}
Expand All @@ -88,9 +85,7 @@ private CompletableFuture<Object> operateResource(Commit<ResourceOperation> comm
}
}

CompletableFuture<Object> future = new CompletableFuture<>();

ResourceCommit resourceCommit = commits.acquire(commit, session);
ResourceCommit resourceCommit = commits.acquire(commit, session.session);
return resource.executor.execute(resourceCommit);
}

Expand All @@ -113,7 +108,7 @@ protected long getResource(Commit<GetResource> commit) {
ResourceHolder resource = new ResourceHolder(path, stateMachine, executor);
resources.put(resourceId, resource);
stateMachine.init(executor);
resource.sessions.put(commit.session().id(), new ManagedResourceSession(resourceId, commit.session()));
resource.sessions.put(commit.session().id(), new SessionHolder(resourceId, commit, new ManagedResourceSession(resourceId, commit.session())));
} catch (InstantiationException | IllegalAccessException e) {
throw new ResourceManagerException("failed to instantiate state machine", e);
}
Expand All @@ -122,6 +117,7 @@ protected long getResource(Commit<GetResource> commit) {
if (resource == null || resource.stateMachine.getClass() != commit.operation().type()) {
throw new ResourceManagerException("inconsistent resource type: " + commit.operation().type());
}
commit.clean();
}
return resourceId;
}
Expand Down Expand Up @@ -159,7 +155,7 @@ private long createResource(Commit<CreateResource> commit) {

id = commit.index();
ManagedResourceSession session = new ManagedResourceSession(id, commit.session());
sessions.put(id, new SessionHolder(resourceId, session));
sessions.put(id, new SessionHolder(resourceId, commit, session));
resource.executor.execute(() -> resource.stateMachine.register(session));
return id;
}
Expand All @@ -185,8 +181,10 @@ protected boolean deleteResource(Commit<DeleteResource> commit) {

Iterator<Map.Entry<Long, SessionHolder>> iterator = sessions.entrySet().iterator();
while (iterator.hasNext()) {
if (iterator.next().getValue().resource == commit.operation().resource()) {
SessionHolder session = iterator.next().getValue();
if (session.resource == commit.operation().resource()) {
iterator.remove();
session.commit.clean();
}
}
return true;
Expand All @@ -195,9 +193,9 @@ protected boolean deleteResource(Commit<DeleteResource> commit) {
@Override
public void expire(Session session) {
for (ResourceHolder resource : resources.values()) {
ManagedResourceSession resourceSession = resource.sessions.get(session.id());
SessionHolder resourceSession = resource.sessions.get(session.id());
if (resourceSession != null) {
resource.executor.execute(() -> resource.stateMachine.expire(resourceSession));
resource.executor.execute(() -> resource.stateMachine.expire(resourceSession.session));
}
}

Expand All @@ -214,9 +212,10 @@ public void expire(Session session) {
@Override
public void close(Session session) {
for (ResourceHolder resource : resources.values()) {
ManagedResourceSession resourceSession = resource.sessions.get(session.id());
SessionHolder resourceSession = resource.sessions.remove(session.id());
if (resourceSession != null) {
resource.executor.execute(() -> resource.stateMachine.close(resourceSession));
resource.executor.execute(() -> resource.stateMachine.close(resourceSession.session));
resourceSession.commit.clean();
}
}

Expand All @@ -228,6 +227,7 @@ public void close(Session session) {
if (resource != null) {
resource.executor.execute(() -> resource.stateMachine.close(sessionHolder.session));
}
sessionHolder.commit.clean();
iterator.remove();
}
}
Expand All @@ -245,7 +245,7 @@ private static class ResourceHolder {
private final String path;
private final StateMachine stateMachine;
private final ResourceStateMachineExecutor executor;
private final Map<Long, ManagedResourceSession> sessions = new HashMap<>();
private final Map<Long, SessionHolder> sessions = new HashMap<>();

private ResourceHolder(String path, StateMachine stateMachine, ResourceStateMachineExecutor executor) {
this.path = path;
Expand All @@ -259,10 +259,12 @@ private ResourceHolder(String path, StateMachine stateMachine, ResourceStateMach
*/
private static class SessionHolder {
private final long resource;
private final Commit commit;
private final ManagedResourceSession session;

private SessionHolder(long resource, ManagedResourceSession session) {
private SessionHolder(long resource, Commit commit, ManagedResourceSession session) {
this.resource = resource;
this.commit = commit;
this.session = session;
}
}
Expand Down

0 comments on commit ce56990

Please sign in to comment.