diff --git a/manager/src/main/java/io/atomix/manager/internal/ManagedResourceSession.java b/manager/src/main/java/io/atomix/manager/internal/ManagedResourceSession.java index 2f4f7c56f7..a1c522ba83 100644 --- a/manager/src/main/java/io/atomix/manager/internal/ManagedResourceSession.java +++ b/manager/src/main/java/io/atomix/manager/internal/ManagedResourceSession.java @@ -16,6 +16,7 @@ package io.atomix.manager.internal; import io.atomix.catalyst.concurrent.Listener; +import io.atomix.copycat.server.Commit; import io.atomix.copycat.server.session.ServerSession; import io.atomix.copycat.session.Session; import io.atomix.manager.resource.internal.InstanceEvent; @@ -29,10 +30,12 @@ */ final class ManagedResourceSession implements ServerSession { private final long resource; + final Commit commit; private final ServerSession parent; - public ManagedResourceSession(long resource, ServerSession parent) { + public ManagedResourceSession(long resource, Commit commit, ServerSession parent) { this.resource = resource; + this.commit = commit; this.parent = parent; } diff --git a/manager/src/main/java/io/atomix/manager/internal/ResourceManagerSessions.java b/manager/src/main/java/io/atomix/manager/internal/ResourceManagerSessions.java new file mode 100644 index 0000000000..6456647f2a --- /dev/null +++ b/manager/src/main/java/io/atomix/manager/internal/ResourceManagerSessions.java @@ -0,0 +1,111 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ +package io.atomix.manager.internal; + +import io.atomix.catalyst.util.Assert; +import io.atomix.copycat.server.session.ServerSession; +import io.atomix.copycat.server.session.SessionListener; +import io.atomix.copycat.server.session.Sessions; + +import java.util.*; + +/** + * Resource manager sessions. + * + * @author iterator() { + return (Iterator) sessions.values().iterator(); + } + + @Override + public void close() { + for (ManagedResourceSession session : sessions.values()) { + session.commit.close(); + } + } + +} diff --git a/manager/src/main/java/io/atomix/manager/internal/ResourceManagerState.java b/manager/src/main/java/io/atomix/manager/internal/ResourceManagerState.java index cc507b8665..d7eda113ac 100644 --- a/manager/src/main/java/io/atomix/manager/internal/ResourceManagerState.java +++ b/manager/src/main/java/io/atomix/manager/internal/ResourceManagerState.java @@ -90,20 +90,16 @@ private Object operateResource(Commit commit) { throw new ResourceManagerException("unknown resource: " + resourceId); } - ServerSession session; - // If the session exists for the resource, use the existing session. - SessionHolder sessionHolder = resource.sessions.get(commit.session().id()); - if (sessionHolder != null) { - session = sessionHolder.session; - } - // If the commit session is not open for this resource, add the commit session to the resource. - else { - session = new ManagedResourceSession(resourceId, commit.session()); + ManagedResourceSession resourceSession = resource.executor.context.sessions.session(commit.session().id()); + + // If the session is not open for this resource, add the commit session to the resource. + if (resourceSession == null) { + resourceSession = new ManagedResourceSession(resourceId, commit, commit.session()); } // Execute the operation. - return resource.executor.execute(commits.acquire(commit, session)); + return resource.executor.execute(commits.acquire(commit, resourceSession)); } /** @@ -129,19 +125,15 @@ protected long getResource(Commit commit) { ResourceManagerStateMachineExecutor executor = new ResourceManagerStateMachineExecutor(resourceId, this.executor); // Store the resource to be referenced by its resource ID. - ResourceHolder resourceHolder = new ResourceHolder(resourceId, key, type, commit, stateMachine, executor); - resources.put(resourceId, resourceHolder); + ResourceHolder resource = new ResourceHolder(resourceId, key, type, commit, stateMachine, executor); + resources.put(resourceId, resource); // Initialize the resource state machine. stateMachine.init(executor); // Create a resource session for the client resource instance. - ManagedResourceSession session = new ManagedResourceSession(resourceId, commit.session()); - SessionHolder holder = new SessionHolder(commit, session); - resourceHolder.sessions.put(commit.session().id(), holder); - - // Register the newly created session with the resource state machine. - stateMachine.register(session); + ManagedResourceSession resourceSession = new ManagedResourceSession(resourceId, commit, commit.session()); + resource.executor.context.sessions.register(resourceSession); // Returns the session ID for the resource client session. return resourceId; @@ -150,29 +142,16 @@ protected long getResource(Commit commit) { } } else { // If a resource was found, validate that the resource type matches. - ResourceHolder resourceHolder = resources.get(resourceId); - if (resourceHolder == null || !resourceHolder.type.equals(type)) { + ResourceHolder resource = resources.get(resourceId); + if (resource == null || !resource.type.equals(type)) { throw new ResourceManagerException("inconsistent resource type: " + commit.operation().type()); } - // If the session is not already associated with the resource, attach the session to the resource. - // Otherwise, if the session already has a multiton instance of the resource open, clean the commit. - SessionHolder holder = resourceHolder.sessions.get(commit.session().id()); - if (holder == null) { - // Crete a resource session for the client resource instance. - ManagedResourceSession session = new ManagedResourceSession(resourceId, commit.session()); - holder = new SessionHolder(commit, session); - resourceHolder.sessions.put(commit.session().id(), holder); + // Create a resource session for the client resource instance. + ManagedResourceSession resourceSession = new ManagedResourceSession(resourceId, commit, commit.session()); + resource.executor.context.sessions.register(resourceSession); - // Register the newly created session with the resource state machine. - resourceHolder.stateMachine.register(session); - - return resourceId; - } else { - // Return the resource client session ID and clean the commit since no new resource or session was created. - commit.close(); - return holder.session.id(); - } + return resourceId; } } @@ -208,23 +187,13 @@ protected boolean resourceExists(Commit commit) { protected void closeResource(Commit commit) { try { long resourceId = commit.operation().resource(); - ResourceHolder resourceHolder = resources.get(resourceId); - if (resourceHolder == null) { + ResourceHolder resource = resources.get(resourceId); + if (resource == null) { throw new ResourceManagerException("unknown resource: " + resourceId); } - SessionHolder sessionHolder = resourceHolder.sessions.remove(commit.session().id()); - if (sessionHolder != null) { - try { - resourceHolder.stateMachine.unregister(sessionHolder.session); - resourceHolder.stateMachine.close(sessionHolder.session); - } finally { - // Ensure that the commit that created the resource is not closed even if the client closed its reference to it. - if (sessionHolder.commit != resourceHolder.commit) { - sessionHolder.commit.close(); - } - } - } + resource.executor.context.sessions.unregister(commit.session().id()); + resource.executor.context.sessions.close(commit.session().id()); } finally { commit.close(); } @@ -235,28 +204,16 @@ protected void closeResource(Commit commit) { */ protected boolean deleteResource(Commit commit) { try { - ResourceHolder resourceHolder = resources.remove(commit.operation().resource()); - if (resourceHolder == null) { + ResourceHolder resource = resources.remove(commit.operation().resource()); + if (resource == null) { throw new ResourceManagerException("unknown resource: " + commit.operation().resource()); } // Delete the resource state machine and close the resource state machine executor. - try { - resourceHolder.stateMachine.delete(); - resourceHolder.executor.close(); - } finally { - resourceHolder.commit.close(); - } - - // Close all commits that opened sessions to the resource. - // Don't close the commit that created the resource since that was closed above. - for (SessionHolder sessionHolder : resourceHolder.sessions.values()) { - if (sessionHolder.commit != resourceHolder.commit) { - sessionHolder.commit.close(); - } - } + resource.stateMachine.delete(); + resource.executor.close(); - keys.remove(resourceHolder.key); + keys.remove(resource.key); return true; } finally { commit.close(); @@ -289,35 +246,21 @@ public void register(ServerSession session) { @Override public void expire(ServerSession session) { for (ResourceHolder resource : resources.values()) { - SessionHolder sessionHolder = resource.sessions.get(session.id()); - if (sessionHolder != null) { - resource.stateMachine.expire(sessionHolder.session); - } + resource.executor.context.sessions.expire(session.id()); } } @Override public void unregister(ServerSession session) { for (ResourceHolder resource : resources.values()) { - SessionHolder sessionHolder = resource.sessions.get(session.id()); - if (sessionHolder != null) { - resource.stateMachine.unregister(sessionHolder.session); - } + resource.executor.context.sessions.unregister(session.id()); } } @Override public void close(ServerSession session) { for (ResourceHolder resource : resources.values()) { - SessionHolder sessionHolder = resource.sessions.remove(session.id()); - if (sessionHolder != null) { - resource.stateMachine.close(sessionHolder.session); - - // Ensure that the commit that created the resource is not closed when the session that created it is closed. - if (resource.commit != sessionHolder.commit) { - sessionHolder.commit.close(); - } - } + resource.executor.context.sessions.close(session.id()); } } @@ -331,7 +274,6 @@ private static class ResourceHolder { private final Commit commit; private final ResourceStateMachine stateMachine; private final ResourceManagerStateMachineExecutor executor; - private final Map sessions = new HashMap<>(); private ResourceHolder(long id, String key, ResourceType type, Commit commit, ResourceStateMachine stateMachine, ResourceManagerStateMachineExecutor executor) { this.id = id; diff --git a/manager/src/main/java/io/atomix/manager/internal/ResourceManagerStateMachineContext.java b/manager/src/main/java/io/atomix/manager/internal/ResourceManagerStateMachineContext.java new file mode 100644 index 0000000000..c445c30ad6 --- /dev/null +++ b/manager/src/main/java/io/atomix/manager/internal/ResourceManagerStateMachineContext.java @@ -0,0 +1,56 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ +package io.atomix.manager.internal; + +import io.atomix.copycat.server.StateMachineContext; +import io.atomix.copycat.server.session.Sessions; + +import java.time.Clock; + +/** + * Resource state machine context. + * + * @author Jordan Halterman */ class ResourceManagerStateMachineExecutor implements StateMachineExecutor { - private final StateMachineExecutor parent; + final StateMachineExecutor parent; + final ResourceManagerStateMachineContext context; private final Logger logger; private final Map operations = new HashMap<>(); private final Set tasks = new HashSet<>(); ResourceManagerStateMachineExecutor(long resource, StateMachineExecutor parent) { this.parent = parent; + this.context = new ResourceManagerStateMachineContext(parent.context()); this.logger = LoggerFactory.getLogger(String.format("%s-%d", getClass().getName(), resource)); } @Override - public StateMachineContext context() { - return parent.context(); + public ResourceManagerStateMachineContext context() { + return context; } @Override @@ -137,6 +138,7 @@ public , U> StateMachineExecutor register(Class type, @Override public void close() { tasks.forEach(Scheduled::cancel); + context.close(); } } diff --git a/resource/src/main/java/io/atomix/resource/ResourceStateMachine.java b/resource/src/main/java/io/atomix/resource/ResourceStateMachine.java index 10feab05f5..c66947f813 100644 --- a/resource/src/main/java/io/atomix/resource/ResourceStateMachine.java +++ b/resource/src/main/java/io/atomix/resource/ResourceStateMachine.java @@ -122,6 +122,8 @@ public final void init(StateMachineExecutor executor) { executor.serializer().register(ResourceQuery.Config.class, -52); executor.serializer().register(ResourceCommand.Delete.class, -53); + executor.context().sessions().addListener(this); + ResourceStateMachineExecutor wrappedExecutor = new ResourceStateMachineExecutor(executor); wrappedExecutor.register(ResourceQuery.Config.class, this::config); wrappedExecutor.register(ResourceCommand.Delete.class, this::delete);