Skip to content

Commit

Permalink
Merge pull request #188 from atomix/state-machine-context
Browse files Browse the repository at this point in the history
Wrap StateMachineContext and Sessions to expose ManagedResourceSession to ResourceStateMachines
  • Loading branch information
kuujo committed Jun 11, 2016
2 parents 5497bce + cb15261 commit 5e402f6
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 92 deletions.
Expand Up @@ -16,6 +16,7 @@
package io.atomix.manager.internal;

import io.atomix.catalyst.concurrent.Listener;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.session.Session;
import io.atomix.manager.resource.internal.InstanceEvent;
Expand All @@ -29,10 +30,12 @@
*/
final class ManagedResourceSession implements ServerSession {
private final long resource;
final Commit commit;
private final ServerSession parent;

public ManagedResourceSession(long resource, ServerSession parent) {
public ManagedResourceSession(long resource, Commit commit, ServerSession parent) {
this.resource = resource;
this.commit = commit;
this.parent = parent;
}

Expand Down
@@ -0,0 +1,111 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package io.atomix.manager.internal;

import io.atomix.catalyst.util.Assert;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.session.Sessions;

import java.util.*;

/**
* Resource manager sessions.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public class ResourceManagerSessions implements Sessions, AutoCloseable {
private ManagedResourceSession first;
private final Map<Long, ManagedResourceSession> sessions = new HashMap<>();
private final Set<SessionListener> listeners = new HashSet<>();

@Override
public ManagedResourceSession session(long sessionId) {
return sessions.get(sessionId);
}

void register(ManagedResourceSession session) {
// If this is the first registered session, store it so it can be removed once all sessions are removed.
if (first == null)
first = session;

// If a session was already registered for the session ID, release the new commit.
if (sessions.containsKey(session.id())) {
session.commit.close();
} else {
sessions.put(session.id(), session);
for (SessionListener listener : listeners) {
listener.register(session);
}
}
}

void unregister(long sessionId) {
ManagedResourceSession session = sessions.get(sessionId);
if (session != null) {
for (SessionListener listener : listeners) {
listener.unregister(session);
}
}
}

void expire(long sessionId) {
ManagedResourceSession session = sessions.get(sessionId);
if (session != null) {
for (SessionListener listener : listeners) {
listener.expire(session);
}
}
}

void close(long sessionId) {
ManagedResourceSession session = sessions.remove(sessionId);
if (session != null) {
for (SessionListener listener : listeners) {
listener.close(session);
}
if (session.commit != first.commit) {
session.commit.close();
}
}
}

@Override
public Sessions addListener(SessionListener listener) {
listeners.add(Assert.notNull(listener, "listener"));
return this;
}

@Override
public Sessions removeListener(SessionListener listener) {
listeners.remove(Assert.notNull(listener, "listener"));
return this;
}

@Override
@SuppressWarnings("unchecked")
public Iterator<ServerSession> iterator() {
return (Iterator) sessions.values().iterator();
}

@Override
public void close() {
for (ManagedResourceSession session : sessions.values()) {
session.commit.close();
}
}

}
Expand Up @@ -90,20 +90,16 @@ private Object operateResource(Commit<InstanceOperation> commit) {
throw new ResourceManagerException("unknown resource: " + resourceId);
}

ServerSession session;

// If the session exists for the resource, use the existing session.
SessionHolder sessionHolder = resource.sessions.get(commit.session().id());
if (sessionHolder != null) {
session = sessionHolder.session;
}
// If the commit session is not open for this resource, add the commit session to the resource.
else {
session = new ManagedResourceSession(resourceId, commit.session());
ManagedResourceSession resourceSession = resource.executor.context.sessions.session(commit.session().id());

// If the session is not open for this resource, add the commit session to the resource.
if (resourceSession == null) {
resourceSession = new ManagedResourceSession(resourceId, commit, commit.session());
}

// Execute the operation.
return resource.executor.execute(commits.acquire(commit, session));
return resource.executor.execute(commits.acquire(commit, resourceSession));
}

/**
Expand All @@ -129,19 +125,15 @@ protected long getResource(Commit<? extends GetResource> commit) {
ResourceManagerStateMachineExecutor executor = new ResourceManagerStateMachineExecutor(resourceId, this.executor);

// Store the resource to be referenced by its resource ID.
ResourceHolder resourceHolder = new ResourceHolder(resourceId, key, type, commit, stateMachine, executor);
resources.put(resourceId, resourceHolder);
ResourceHolder resource = new ResourceHolder(resourceId, key, type, commit, stateMachine, executor);
resources.put(resourceId, resource);

// Initialize the resource state machine.
stateMachine.init(executor);

// Create a resource session for the client resource instance.
ManagedResourceSession session = new ManagedResourceSession(resourceId, commit.session());
SessionHolder holder = new SessionHolder(commit, session);
resourceHolder.sessions.put(commit.session().id(), holder);

// Register the newly created session with the resource state machine.
stateMachine.register(session);
ManagedResourceSession resourceSession = new ManagedResourceSession(resourceId, commit, commit.session());
resource.executor.context.sessions.register(resourceSession);

// Returns the session ID for the resource client session.
return resourceId;
Expand All @@ -150,29 +142,16 @@ protected long getResource(Commit<? extends GetResource> commit) {
}
} else {
// If a resource was found, validate that the resource type matches.
ResourceHolder resourceHolder = resources.get(resourceId);
if (resourceHolder == null || !resourceHolder.type.equals(type)) {
ResourceHolder resource = resources.get(resourceId);
if (resource == null || !resource.type.equals(type)) {
throw new ResourceManagerException("inconsistent resource type: " + commit.operation().type());
}

// If the session is not already associated with the resource, attach the session to the resource.
// Otherwise, if the session already has a multiton instance of the resource open, clean the commit.
SessionHolder holder = resourceHolder.sessions.get(commit.session().id());
if (holder == null) {
// Crete a resource session for the client resource instance.
ManagedResourceSession session = new ManagedResourceSession(resourceId, commit.session());
holder = new SessionHolder(commit, session);
resourceHolder.sessions.put(commit.session().id(), holder);
// Create a resource session for the client resource instance.
ManagedResourceSession resourceSession = new ManagedResourceSession(resourceId, commit, commit.session());
resource.executor.context.sessions.register(resourceSession);

// Register the newly created session with the resource state machine.
resourceHolder.stateMachine.register(session);

return resourceId;
} else {
// Return the resource client session ID and clean the commit since no new resource or session was created.
commit.close();
return holder.session.id();
}
return resourceId;
}
}

Expand Down Expand Up @@ -208,23 +187,13 @@ protected boolean resourceExists(Commit<ResourceExists> commit) {
protected void closeResource(Commit<CloseResource> commit) {
try {
long resourceId = commit.operation().resource();
ResourceHolder resourceHolder = resources.get(resourceId);
if (resourceHolder == null) {
ResourceHolder resource = resources.get(resourceId);
if (resource == null) {
throw new ResourceManagerException("unknown resource: " + resourceId);
}

SessionHolder sessionHolder = resourceHolder.sessions.remove(commit.session().id());
if (sessionHolder != null) {
try {
resourceHolder.stateMachine.unregister(sessionHolder.session);
resourceHolder.stateMachine.close(sessionHolder.session);
} finally {
// Ensure that the commit that created the resource is not closed even if the client closed its reference to it.
if (sessionHolder.commit != resourceHolder.commit) {
sessionHolder.commit.close();
}
}
}
resource.executor.context.sessions.unregister(commit.session().id());
resource.executor.context.sessions.close(commit.session().id());
} finally {
commit.close();
}
Expand All @@ -235,28 +204,16 @@ protected void closeResource(Commit<CloseResource> commit) {
*/
protected boolean deleteResource(Commit<DeleteResource> commit) {
try {
ResourceHolder resourceHolder = resources.remove(commit.operation().resource());
if (resourceHolder == null) {
ResourceHolder resource = resources.remove(commit.operation().resource());
if (resource == null) {
throw new ResourceManagerException("unknown resource: " + commit.operation().resource());
}

// Delete the resource state machine and close the resource state machine executor.
try {
resourceHolder.stateMachine.delete();
resourceHolder.executor.close();
} finally {
resourceHolder.commit.close();
}

// Close all commits that opened sessions to the resource.
// Don't close the commit that created the resource since that was closed above.
for (SessionHolder sessionHolder : resourceHolder.sessions.values()) {
if (sessionHolder.commit != resourceHolder.commit) {
sessionHolder.commit.close();
}
}
resource.stateMachine.delete();
resource.executor.close();

keys.remove(resourceHolder.key);
keys.remove(resource.key);
return true;
} finally {
commit.close();
Expand Down Expand Up @@ -289,35 +246,21 @@ public void register(ServerSession session) {
@Override
public void expire(ServerSession session) {
for (ResourceHolder resource : resources.values()) {
SessionHolder sessionHolder = resource.sessions.get(session.id());
if (sessionHolder != null) {
resource.stateMachine.expire(sessionHolder.session);
}
resource.executor.context.sessions.expire(session.id());
}
}

@Override
public void unregister(ServerSession session) {
for (ResourceHolder resource : resources.values()) {
SessionHolder sessionHolder = resource.sessions.get(session.id());
if (sessionHolder != null) {
resource.stateMachine.unregister(sessionHolder.session);
}
resource.executor.context.sessions.unregister(session.id());
}
}

@Override
public void close(ServerSession session) {
for (ResourceHolder resource : resources.values()) {
SessionHolder sessionHolder = resource.sessions.remove(session.id());
if (sessionHolder != null) {
resource.stateMachine.close(sessionHolder.session);

// Ensure that the commit that created the resource is not closed when the session that created it is closed.
if (resource.commit != sessionHolder.commit) {
sessionHolder.commit.close();
}
}
resource.executor.context.sessions.close(session.id());
}
}

Expand All @@ -331,7 +274,6 @@ private static class ResourceHolder {
private final Commit<? extends GetResource> commit;
private final ResourceStateMachine stateMachine;
private final ResourceManagerStateMachineExecutor executor;
private final Map<Long, SessionHolder> sessions = new HashMap<>();

private ResourceHolder(long id, String key, ResourceType type, Commit<? extends GetResource> commit, ResourceStateMachine stateMachine, ResourceManagerStateMachineExecutor executor) {
this.id = id;
Expand Down
@@ -0,0 +1,56 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package io.atomix.manager.internal;

import io.atomix.copycat.server.StateMachineContext;
import io.atomix.copycat.server.session.Sessions;

import java.time.Clock;

/**
* Resource state machine context.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
class ResourceManagerStateMachineContext implements StateMachineContext, AutoCloseable {
private final StateMachineContext parent;
final ResourceManagerSessions sessions = new ResourceManagerSessions();

ResourceManagerStateMachineContext(StateMachineContext parent) {
this.parent = parent;
}

@Override
public long index() {
return parent.index();
}

@Override
public Clock clock() {
return parent.clock();
}

@Override
public Sessions sessions() {
return sessions;
}

@Override
public void close() {
sessions.close();
}

}

0 comments on commit 5e402f6

Please sign in to comment.