Skip to content
Browse files

Initial Comment that contains a mostly completed version of this

little tool.
  • Loading branch information...
0 parents commit 24e097acd97731bbfb6ba607e517387a24616034 @ccorsi committed Jan 15, 2012
49 pom.xml
@@ -0,0 +1,49 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.valhalla.tools.net</groupId>
+ <artifactId>net-proc-comm-tool</artifactId>
+ <version>1.0.0</version>
+ <packaging>jar</packaging>
+
+ <name>net-ipc-tools</name>
+ <url>http://maven.apache.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.2</version>
+ <scope>test</scope>
+ <optional>false</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.6.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.valhalla.tools.process</groupId>
+ <artifactId>process-ipc-tools</artifactId>
+ <version>1.0.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>1.0.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.0.0</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
45 src/main/java/org/valhalla/tools/net/CommandExecutor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.valhalla.tools.net;
+
+import java.io.Serializable;
+
+/**
+ * This is the main interface that all users of this library will be required to
+ * implement. This interface implementation will have intimate knowledge of what
+ * actions need to be performed on the passed instances. The current library
+ * has no claims on this and leaves it to the user of this library.
+ *
+ * @author Claudio Corsi
+ *
+ */
+public interface CommandExecutor<S, R> extends Serializable {
+
+ /**
+ * This method will apply the given changes to the passed instance.
+ * It will return a reply to be sent to the remote process. The
+ * returned instance can be null.
+ *
+ * @param destination The instance that this instance will perform
+ * the given request/reply
+ *
+ * @return An instance of a CommandExecutor that will be sent to
+ * the requesting/replying process. This can be null
+ */
+ CommandExecutor<R, S> apply(S destination);
+
+}
139 src/main/java/org/valhalla/tools/net/ServiceClient.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.valhalla.tools.net;
+
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This ServiceClient class is used to communicate with the ServiceServer instance and
+ * implements all of the required calls to be able to receive and send CommandExecutor
+ * instances between this process and the server process.
+ *
+ * @author Claudio Corsi
+ *
+ */
+public class ServiceClient<C, S> {
+
+ private static final Logger logger = LoggerFactory.getLogger(ServiceClient.class);
+
+ private CountDownLatch shutdown = new CountDownLatch(1);
+ private ServiceManager<C, S> manager = null;
+ private String hostname;
+ private AtomicBoolean started;
+
+ /**
+ * This default constructor will assume that the service server instance is located
+ * on the same host as this process. This is similar to passing "localhost" to the
+ * constructor that expects a hostname.
+ */
+ public ServiceClient() {
+ this("localhost");
+ }
+
+ /**
+ * This method will create an instance of the service client that will be used to
+ * communicate to the service server instance that is located within the passed
+ * host name.
+ *
+ * @param hostname the name of the host that the service server instance is located
+ *
+ */
+ public ServiceClient(String hostname) {
+ this.hostname = hostname;
+ }
+
+ /**
+ * This method is used to start the process of communicating with the service
+ * server instance for the passed host name. This method will setup the
+ * socket communication and instantiate the ServiceManager instance that is
+ * used to communicate with the service manager instance of the service
+ * server. </br>
+ *
+ * This method can only be called once per instance. Any subsequent calls to
+ * this method will generate and IllegalArgumentException.
+ *
+ * @param client the client instance that received CommandExecutor instances
+ * are applied to.
+ *
+ * @throws Exception If any exceptions are raised when trying to setup the
+ * socket communication between this service client and the service server.
+ *
+ */
+ public void execute(final C client) throws Exception {
+ if (started.compareAndSet(false, true) == false) {
+ logger.error("The execute method was called multiple times which is not allowed.");
+ throw new IllegalStateException("This method can only be called once");
+ }
+
+ // TODO: Should we be creating a thread that will then perform a callback
+ // action to inform the calling process that the ServiceManager was
+ // successfully created and is ready to perform CommandExecutor
+ // processing.
+ // It might not be necessary since the ServiceManager instance already
+ // spawns a thread to process the CommandExecutor actions.
+
+ // This is the main point of entry that we will be using to
+ // test the new feature...
+ int port = Integer.getInteger(ServiceConstants.SERVICE_MANAGER_PORT_NAME, 0);
+ if (port <= 0) {
+ logger.info("No service manager port number was defined");
+ throw new IllegalArgumentException("The service manager port is invalid: " + port);
+ }
+ Socket socket = new Socket(hostname, port);
+ try {
+ logger.info("Creating Service Manager instance");
+ manager = new ServiceManager<C, S>(socket, client);
+ // Now that we started the manager we just wait until we are told to
+ // finish....
+ logger.info("Waiting to be told that I am done");
+ shutdown.await();
+ } finally {
+ logger.info("Exiting client application");
+ if (manager != null) {
+ // Closing the manager causes the socket to be closed also...
+ manager.stop();
+ } else {
+ // Close the opened socket...
+ socket.close();
+ }
+ }
+ }
+
+ /**
+ * This method is called whenever you want to shutdown the communication between
+ * this client service manager and the server service manager.
+ *
+ */
+ public void shutdown() {
+ this.shutdown.countDown();
+ }
+
+ /**
+ * This method will return this instance service manager instance.
+ *
+ * @return the service manager instance
+ */
+ public ServiceManager<C, S> getManager() {
+ return manager;
+ }
+
+}
27 src/main/java/org/valhalla/tools/net/ServiceConstants.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.valhalla.tools.net;
+
+/**
+ * @author Claudio Corsi
+ *
+ */
+public interface ServiceConstants {
+
+ String SERVICE_MANAGER_PORT_NAME = "service.manager.port";
+
+}
219 src/main/java/org/valhalla/tools/net/ServiceManager.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.valhalla.tools.net;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.Socket;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used to process the individual commands that will perform actions on the
+ * passed destination instance passed to the constructor. </br>
+ *
+ * This implementation does not take into account the possibility that the passed Socket
+ * instance is using non-blocking instead of blocking communication. This does not mean
+ * that the non-blocking socket will not work in this case but that it is possible that
+ * it might not. This needs to be tested but then it is possible that this will require
+ * changes to the interface to provide this ability. </br>
+ *
+ * The other aspect that has not been tested is the condition that we would require the
+ * use of a secure socket implementation. Again, this is something that might work but
+ * again needs to be tested. In the end, this requirement might not even be required.
+ *
+ * @author Claudio Corsi
+ *
+ */
+public class ServiceManager<S, R> {
+
+ private static final Logger logger = LoggerFactory.getLogger(ServiceManager.class);
+
+ private Socket socket;
+ private Thread readCommands;
+ private Thread writeCommands;
+ private Thread executeCommands;
+
+ private BlockingQueue<CommandExecutor<S, R>> incomingCommands = new LinkedBlockingQueue<CommandExecutor<S, R>>();
+ private BlockingQueue<CommandExecutor<R, S>> outgoingCommands = new LinkedBlockingQueue<CommandExecutor<R, S>>();
+
+ private S destination;
+
+ private boolean stopping;
+
+ public ServiceManager(Socket socket, S destination) throws IOException {
+ this.socket = socket;
+ this.destination = destination;
+
+ logger.debug("Starting the different threads");
+ this.readCommands = new Thread() {
+ {
+ {
+ logger.debug("Setting up the read commands thread");
+ setName("ServiceManagerReadCommands-" + getName());
+ setDaemon(true);
+ start();
+ logger.debug("Called the start method for the read commands thread");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void run() {
+ CommandExecutor<S, R> command = null;
+ try {
+ logger.debug("Getting a reference to the input stream");
+ ObjectInputStream ois = new ObjectInputStream(ServiceManager.this.socket.getInputStream());
+ logger.debug("Entering the while loop for the read commands thread");
+ while(!ServiceManager.this.stopping) {
+ command = (CommandExecutor<S, R>) ois.readObject();
+ // If the returned command is null then break out of the while loop...
+ // TODO: This might be a problem when using non-blocking sockets.
+ if (command == null) break;
+ logger.debug("Received an incoming command: {}", command);
+ // Add command to a queue so that another thread can apply the
+ // changes...We do not want to process this because we might need
+ // to read the next command over the wire and we do not want it to
+ // wait...Still the use of the offer call is not a guarantee that the
+ // received command will be included as part of the incomingCommands
+ // container...maybe create a temporary container in the run method
+ // and add those that have not be accepted by the incomingCommands
+ // container....
+ if (ServiceManager.this.incomingCommands.offer(command) == false) {
+ logger.error("Unable to add the given command [{}] to the incoming queue.",
+ command);
+ }
+ }
+ } catch (Throwable e) {
+ if (!ServiceManager.this.stopping) {
+ logger.error(
+ "An exception was generated when processing a Command",
+ e);
+ }
+ }
+ logger.info("Exiting the read command thread");
+ }
+ };
+
+ this.writeCommands = new Thread() {
+ {
+ {
+ logger.debug("Setting up the write commands thread");
+ setName("ServiceManagerWriteCommands-" + getName());
+ setDaemon(true);
+ start();
+ logger.debug("Called the start method of the write commands thread");
+ }
+ }
+
+ public void run() {
+ try {
+ logger.debug("Getting a reference to the output stream");
+ ObjectOutputStream oos = new ObjectOutputStream(ServiceManager.this.socket.getOutputStream());
+ logger.debug("Entering the while loop of the write commands thread");
+ while (!ServiceManager.this.stopping) {
+ logger.debug("Calling take command for outgoing commands");
+ // Get the next command from the blocking queue....
+ CommandExecutor<R, S> command = ServiceManager.this.outgoingCommands
+ .take();
+ logger.debug("Sending on out going command: {}", command);
+ // Send the command to the other JVM...
+ oos.writeObject(command);
+ logger.debug("Out going command has been sent");
+ }
+ } catch (Exception e) {
+ if (!ServiceManager.this.stopping) {
+ logger.error("An exception was raised while sending commands", e);
+ }
+ }
+ logger.info("Exiting the write commands thread");
+ }
+ };
+
+ this.executeCommands = new Thread() {
+ {
+ {
+ logger.debug("Setting up the execute commands thread");
+ setName("ServiceManagerExecuteCommands-" + getName());
+ setDaemon(true);
+ start();
+ logger.debug("Called the start method of the execute command thread");
+ }
+ }
+
+ public void run() {
+ try {
+ logger.debug("Entering the execute commands thread while loop");
+ while (!ServiceManager.this.stopping) {
+ logger.debug("Calling take for incoming commands");
+ CommandExecutor<S, R> command = ServiceManager.this.incomingCommands
+ .take();
+ logger.debug("Applying command to destination");
+ CommandExecutor<R, S> result = command
+ .apply(ServiceManager.this.destination);
+ logger.debug("Command has been applied");
+ if (result != null) {
+ logger.debug("The applied command resulted in an out going reply");
+ // Shouldn't I be using add instead of put???
+ ServiceManager.this.outgoingCommands.put(result);
+ }
+ }
+ } catch (InterruptedException e) {
+ if (!ServiceManager.this.stopping) {
+ logger.error("An exception was raised when processing incoming commands", e);
+ }
+ }
+ logger.info("Exiting the execute commands thread");
+ }
+ };
+ }
+
+ public void enqueue(CommandExecutor<R, S> command) {
+ logger.debug("Adding command to queue");
+ this.outgoingCommands.add(command);
+ }
+
+ public void stop() {
+ this.stopping = true;
+ // Yield to allow the other threads to update the stopping attribute
+ Thread.yield();
+ logger.debug("Stopping the different threads");
+ // Interrupt the threads....
+ this.readCommands.interrupt();
+ logger.info("Interrupting write command thread");
+ this.writeCommands.interrupt();
+ logger.info("interrupting execute command thread");
+ this.executeCommands.interrupt();
+ logger.info("closing socket");
+ try {
+ this.socket.close();
+ } catch (IOException e) {
+ logger.warn("An exception was raised while closing the socket.", e);
+ }
+ if (this.writeCommands.isAlive()) {
+ StackTraceElement stackTrace[] = this.writeCommands.getStackTrace();
+ Throwable t = new Throwable();
+ t.setStackTrace(stackTrace);
+ logger.info("Thread not stopped", t);
+ this.writeCommands.interrupt();
+ }
+ logger.debug("Completed the process of stopping all of the threads");
+ }
+}
253 src/main/java/org/valhalla/tools/net/ServiceServer.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.valhalla.tools.net;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This abstract class represents the common functionality that would need to be
+ * implemented by implementors with the ServiceManager instance. </br>
+ *
+ * The implementor will only need to implement the serviceStarted and serviceStopped
+ * methods. </br>
+ *
+ * @author Claudio Corsi
+ *
+ */
+public abstract class ServiceServer<S, C> {
+
+ private static final Logger logger = LoggerFactory.getLogger(ServiceServer.class);
+
+ private enum State {
+ CREATED, ACCEPTING, STARTING, STARTED, STOPPING, STOPPED
+ };
+
+ private ServerSocket serverSocket;
+ private int port;
+ private ServiceManager<S, C> manager = null;
+ private Thread thread;
+ private S server;
+ private State state = State.CREATED;
+ private AtomicBoolean threadStarted;
+
+ /**
+ * This constructor will create a socket server instance that will be used
+ * to wait for a connection request from the client socket.
+ * It will throw an IOException if it was unable to create a socket server.
+ *
+ * @throws IOException
+ */
+ public ServiceServer() throws IOException {
+ serverSocket = new ServerSocket(0);
+ port = serverSocket.getLocalPort();
+ }
+
+ /**
+ * This method is used to setup the service manager property that will be
+ * sent to the client process that is used to determine what port is used
+ * to communicate between this process and the client process.
+ *
+ * @param properties the properties instance that the property will be set
+ *
+ */
+ public final void setServiceManagerPort(Properties properties) {
+ properties.setProperty(ServiceConstants.SERVICE_MANAGER_PORT_NAME, String.valueOf(port));
+ }
+
+ /**
+ * This method will return a Map.Entry instance that contains the key and value
+ * required by the ServiceClient to be able to retrieve and connect to this
+ * ServiceServer instance.
+ *
+ * @return A Map entry that contains the key/value pair stored in a properties
+ * instance.
+ */
+ public Map.Entry<String, String> getServiceManagerPortEntry() {
+ return new Map.Entry<String, String>() {
+
+ private String value = String.valueOf(port);
+
+ @Override
+ public String getKey() {
+ return ServiceConstants.SERVICE_MANAGER_PORT_NAME;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String setValue(String value) {
+ throw new UnsupportedOperationException("The setValue operation is not supported.");
+ }
+
+ };
+ }
+ /**
+ * This method will return the property name used to communicate the port
+ * number of this instance server socket.
+ *
+ * @return the name of the service manager port name
+ *
+ */
+ public String getServiceManagerPortPropertyName() {
+ return ServiceConstants.SERVICE_MANAGER_PORT_NAME;
+ }
+
+ /**
+ * This method will return the port of the server socket that is expecting a
+ * client connection that will be used to communicate between this process
+ * and the other java process.
+ *
+ * @return the port number used by this instance server socket
+ */
+ public final int getPort() {
+ return port;
+ }
+
+ /**
+ * This method is used to enqueue a command that will be transfered over the
+ * wire and be applied to the client java process.
+ *
+ * @param command the command that will be executed on the other java process
+ */
+ public final void enqueue(CommandExecutor<C, S> command) {
+ this.manager.enqueue(command);
+ }
+
+ /**
+ * This method is used to execute the process of communicating with the
+ * client process and this process. The passed server instance will have
+ * the returned CommandExecutor instance that is received from the client
+ * process. </br>
+ *
+ * TODO: This method needs to check that this method can only be called once
+ * and any time that this method is called again should probably generate an
+ * exception stating that this method was already called.
+ *
+ * @param server The instance that the received CommandExecutor instance from
+ * the client process.
+ */
+ public final void execute(S server) {
+ if (server == null) {
+ logger.error("The passed server instance can not be null");
+ throw new IllegalArgumentException("Can not pass a null instance of the server");
+ }
+ if (threadStarted.compareAndSet(false,true) == false) {
+ logger.error("This method can only be called once per instance");
+ throw new IllegalStateException("This method can be called only once per instance");
+ }
+ this.server = server;
+ thread = new Thread(new Runnable() {
+ public void run() {
+ try {
+ logger.debug("Started ServerCallback thread: {}",
+ ServiceServer.this.thread.getName());
+ ServiceServer.this.state = State.ACCEPTING;
+ Socket socket = ServiceServer.this.serverSocket.accept();
+ ServiceServer.this.state = State.STARTING;
+ // Create a ServiceManager and pass the given Socket
+ // that will be used to communicate between processes.
+ ServiceServer.this.createServiceManager(socket);
+ logger.debug("Calling serviceStarted method");
+ ServiceServer.this.serviceStarted();
+ ServiceServer.this.state = State.STARTED;
+ logger.debug("Service manager has been started");
+ } catch (IOException e) {
+ // Something happened but let us not worry about this just
+ // yet. Still we will warn the calling method of this exception.
+ logger.warn(
+ "An exception was raised during the process of setting up a service manager instance",
+ e);
+ } finally {
+ ServiceServer.this.state = State.STOPPING;
+ logger.debug("Calling the serviceStopped method");
+ ServiceServer.this.serviceStopped();
+ ServiceServer.this.state = State.STOPPED;
+ logger.debug("Called the serviceStopped method and exiting Server Callback thread");
+ }
+ }
+ }) {
+ {
+ setName("ServerCallback-" + getName());
+ setDaemon(true);
+ start();
+ }
+ };
+ }
+
+ /**
+ * This method is used to terminate the thread that was started by this instance.
+ * It will interrupt the started thread if it is not being stopped or stopped.
+ * All other states will cause this method to interrupt the thread.
+ */
+ public final void stop() {
+ switch (this.state) {
+ case CREATED:
+ case STOPPED:
+ case STOPPING:
+ break;
+ default:
+ logger.info("Interrupting thread: {}", thread);
+ // Interrupt the waiting thread and return....
+ this.thread.interrupt();
+ break;
+ }
+ }
+
+ /**
+ * This method is used to create an instance of the createServiceManager instance
+ * for the given socket. The passed socket is the client process network communication
+ * used to communicate the CommandExecutor instances between processes.
+ *
+ * @param socket the connection instance to the client process
+ *
+ * @throws IOException Whenever the instantiating ServiceManager is unable to
+ * use the passed socket to setup the communication process between this
+ * process and the client process.
+ *
+ */
+ protected final void createServiceManager(Socket socket) throws IOException {
+ logger.debug("Creating ServiceManager instance");
+ manager = new ServiceManager<S, C>(socket, server);
+ }
+
+ /**
+ * This method will be called by this class as soon as the ServiceManager
+ * instance has been initialized. This is to allow the system to react as
+ * soon as this is ready and remove the need to guess when the
+ * ServiceManager has been initialized and started.
+ */
+ protected abstract void serviceStarted();
+
+ /**
+ * This method will be called when the ServiceManager instance has been
+ * stopped. Not sure if we would need a preStopping method callback but it
+ * might not be necessary or possible.
+ */
+ protected abstract void serviceStopped();
+
+}
36 src/test/java/org/valhalla/tools/net/Callback.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.valhalla.tools.net;
+
+/**
+ * @author Claudio Corsi
+ *
+ */
+public interface Callback {
+
+ /**
+ * This is called when the Server spawned thread received a Socket instance
+ * from the remote connection.
+ */
+ void initialized();
+
+ /**
+ * This is called when the Server spawned thread is exiting.
+ */
+ void done();
+
+}
76 src/test/java/org/valhalla/tools/net/Client.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.valhalla.tools.net;
+
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Claudio Corsi
+ *
+ */
+public class Client {
+
+ private static final Logger logger = LoggerFactory.getLogger(Client.class);
+
+ private CountDownLatch shutdown = new CountDownLatch(1);
+
+ private ServiceManager<Client, Server> manager = null;
+
+ public void main() throws Exception {
+ // This is the main point of entry that we will be using to
+ // test the new feature...
+ int port = Integer.getInteger("service.manager.port", 0);
+ if (port <= 0) {
+ logger.info("No service manager port number was defined");
+ System.exit(1);
+ }
+ Socket socket = new Socket("localhost", port);
+ try {
+ logger.info("Creating Service Manager instance");
+ manager = new ServiceManager<Client, Server>(socket, this);
+ // Now that we started the manager we just wait until we are told to
+ // finish....
+ logger.info("Waiting to be told that I am done");
+ shutdown.await();
+ } finally {
+ logger.info("Exiting client application");
+ if (manager != null) {
+ // Closing the manager causes the socket to be closed also...
+ manager.stop();
+ } else {
+ // Close the opened socket...
+ socket.close();
+ }
+ }
+ }
+
+ public void shutdown() {
+ this.shutdown.countDown();
+ }
+
+ /**
+ * @return the manager
+ */
+ public ServiceManager<Client, Server> getManager() {
+ return manager;
+ }
+
+}
106 src/test/java/org/valhalla/tools/net/Server.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.valhalla.tools.net;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Claudio Corsi
+ *
+ */
+public class Server {
+
+ private static final Logger logger = LoggerFactory.getLogger(Server.class);
+
+ private ServerSocket server;
+ private int port;
+ private ServiceManager<Server, Client> manager = null;
+ private Thread thread;
+
+ public Server() throws IOException {
+ server = new ServerSocket(0);
+ port = server.getLocalPort();
+ logger.info("Created a server socket instance for port: {}", port);
+ }
+
+ public void execute(final Callback callback) {
+ thread = new Thread(new Runnable() {
+ public void run() {
+ try {
+ logger.info("Waiting for a client to connect back to me");
+ Socket socket = Server.this.server.accept();
+ // Create a ServiceManager and pass the given Socket
+ // that will be used to communicate between processes.
+ logger.info("Creating a instance of a ServiceManager");
+ Server.this.manager = new ServiceManager<Server, Client>(socket, Server.this);
+ logger.info("Calling the passed callback initialized method");
+ callback.initialized();
+ } catch (IOException e) {
+ // Something happened but let us not worry about this just
+ // yet.
+ logger.info("An exception was raised during the process of setting up a service manager instance", e);
+ } finally {
+ callback.done();
+ }
+ logger.info("Exiting the callback thread");
+ }
+ })
+ {
+ {
+ setName("ServerCallback-" + getName());
+ setDaemon(true);
+ start();
+ }
+ };
+ }
+
+ /**
+ * @return the port
+ */
+ public int getPort() {
+ return port;
+ }
+
+ public void enqueue(CommandExecutor<Client, Server> command) {
+ logger.debug("Adding Command: {} to the out going queue.", command );
+ this.manager.enqueue(command);
+ }
+
+ public void shutdown() throws IOException {
+ logger.info("Shutting down the Server system");
+ if (thread != null && thread.isAlive()) {
+ // Interrupt the thread only if it is still possibly active....
+ thread.interrupt();
+ }
+ // Close the server socket connection so that this will sever the
+ // connection between the two processes.
+ this.server.close();
+ }
+
+ /**
+ * @return the manager
+ */
+ public ServiceManager<Server, Client> getManager() {
+ return manager;
+ }
+
+}
270 src/test/java/org/valhalla/tools/net/ServiceManagerTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.valhalla.tools.net;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.valhalla.tools.process.Spawner;
+
+/**
+ * Unit test for simple ServiceManager.
+ */
+public class ServiceManagerTest implements Callback
+{
+ private static final Logger logger = LoggerFactory.getLogger(ServiceManagerTest.class);
+
+ private Server server;
+ private CountDownLatch latch = new CountDownLatch(1);
+ private boolean initialized;
+
+ @Before public void createServer() throws IOException {
+ // Create a new instance of Server....
+ server = new Server();
+ // Start it so that it is ready to receive a request...
+ server.execute(this);
+ }
+
+ @After public void destroyServer() throws IOException {
+ // Shutdown Server...
+ server.shutdown();
+ }
+
+ @Test public void SimpleNetTest() throws Exception {
+ Spawner spawner = createAndStartClientProcess("SimpleNetTestSpawner");
+ // Send and receive messages between the two systems....
+ // but in this case we are just going to send a shutdown command...
+ // keeping it simple for this test but obviously making the next
+ // more complicated....
+ server.enqueue(createClientCommandExecutor(1));
+ shutdownClientProcess(spawner);
+ }
+
+ @Test public void MultipleCommandsNetTest() throws Exception {
+ Spawner spawner = createAndStartClientProcess("MultipleCommandsNetTest");
+ server.enqueue(createClientCommandExecutor(1001));
+ shutdownClientProcess(spawner);
+ }
+
+ @Test public void MultipleParallelCommandsNetTest() throws Exception {
+ Spawner spawner = createAndStartClientProcess("MultipleParallelCommandsNetTest");
+ server.enqueue(createClientCommandExecutor(3, true));
+ shutdownClientProcess(spawner);
+ }
+
+ @Test public void MultipleParallelCommandsNetTestWithBigCount() throws Exception {
+ Spawner spawner = createAndStartClientProcess("MultipleParallelCommandsNetTest");
+ server.enqueue(createClientCommandExecutor(20, true));
+ shutdownClientProcess(spawner, 300);
+ }
+
+ /**
+ * @param name
+ * @return
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private Spawner createAndStartClientProcess(String name) throws IOException,
+ InterruptedException {
+ Spawner spawner = new Spawner(Client.class.getName(), "main");
+ List<String> jvmArgs = new LinkedList<String>();
+ jvmArgs.add("-Dservice.manager.port=" + String.valueOf(server.getPort()));
+ spawner.setJVMArgs(jvmArgs);
+ spawner.setIdentifier(name);
+ logger.info("Starting the spawned process");
+ // Start a new jvm that will execute the Client class....
+ spawner.spawnProcess();
+ logger.info("Waiting to be notified that the spawned process has started succesfully");
+ // Wait until the new java process connects to this jvm...
+ this.latch.await();
+ Assert.assertTrue("The ServiceManager was not initialized", initialized);
+ return spawner;
+ }
+
+ /**
+ * @param spawner
+ * @throws InterruptedException
+ */
+ private void shutdownClientProcess(Spawner spawner)
+ throws InterruptedException {
+ shutdownClientProcess(spawner, 5);
+ }
+
+ /**
+ * @param spawner
+ * @param maxCount
+ * @throws InterruptedException
+ */
+ private void shutdownClientProcess(Spawner spawner, int maxCount)
+ throws InterruptedException {
+ boolean processExited = spawner.isProcessExited();
+ int count = 0;
+ while (!processExited) {
+ Thread.sleep(1000);
+ count++;
+ if (count >= maxCount)
+ break;
+ processExited = spawner.isProcessExited();
+ }
+ spawner.stopProcess();
+ Assert.assertTrue(
+ "The Client side did not shutdown after receiving the shutdown command",
+ processExited);
+ }
+
+ @Override
+ public void initialized() {
+ this.initialized = true;
+ this.latch.countDown();
+ }
+
+ @Override
+ public void done() {
+ this.latch.countDown();
+ }
+
+ /**
+ * @param count
+ * @return
+ */
+ public static CommandExecutor<Client, Server> createClientCommandExecutor(final int count) {
+ return ServiceManagerTest.createClientCommandExecutor(count, false);
+ }
+
+ /**
+ * @param count
+ * @param create
+ * @return
+ */
+ public static CommandExecutor<Client, Server> createClientCommandExecutor(final int count, final boolean create) {
+ return new CommandExecutor<Client, Server>() {
+
+ private static final long serialVersionUID = -4734135412198165740L;
+
+ private int clientCount = count;
+
+ @Override
+ public CommandExecutor<Server, Client> apply(Client client) {
+ clientCount--;
+ logger.debug("Current client count: {}", clientCount);
+ CommandExecutor<Server, Client> serverCommand = null;
+ if (clientCount <= 0) {
+ // Shutdown and return nothing...
+ client.shutdown();
+ } else {
+ serverCommand = createServerCommandExecutor(clientCount);
+ if (create) {
+ ServiceManager<Client, Server> manager = client.getManager();
+ if (manager != null) {
+ manager.enqueue(ServiceManagerTest.createServerCommandExecutorWithoutShutdown(clientCount-1));
+ } else {
+ logger.error("What the manager instance is NULL!!!!");
+ logger.info("We would be at client count {}", clientCount-1);
+ }
+ }
+ }
+ return serverCommand;
+ }
+ };
+ }
+
+
+ /**
+ * @return
+ */
+ private static CommandExecutor<Server, Client> createServerCommandExecutor(final int count) {
+ return new CommandExecutor<Server, Client>() {
+
+ private static final long serialVersionUID = -313055602491132604L;
+
+ private int serverCount = count;
+
+ @Override
+ public CommandExecutor<Client, Server> apply(
+ Server destination) {
+ serverCount--;
+ logger.debug("Current server count: {}", serverCount);
+ CommandExecutor<Client, Server> command = null;
+ if (serverCount >= 0) {
+ command = createClientCommandExecutor(serverCount);
+ }
+ return command;
+ }
+
+ };
+ }
+
+ /**
+ * @param count
+ * @return
+ */
+ public static CommandExecutor<Client, Server> createClientCommandExecutorWithoutShutdown(final int count) {
+ return new CommandExecutor<Client, Server>() {
+
+ private static final long serialVersionUID = -4734135412198165740L;
+
+ private int clientCount = count;
+
+ @Override
+ public CommandExecutor<Server, Client> apply(Client client) {
+ clientCount--;
+ logger.debug("Current no shutdown client count: {}", clientCount);
+ CommandExecutor<Server, Client> serverCommand = null;
+ if (clientCount > 0) {
+ serverCommand = createServerCommandExecutorWithoutShutdown(clientCount--);
+ client.getManager().enqueue(createServerCommandExecutorWithoutShutdown(clientCount));
+ }
+ return serverCommand;
+ }
+ };
+ }
+
+
+ /**
+ * @return
+ */
+ private static CommandExecutor<Server, Client> createServerCommandExecutorWithoutShutdown(final int count) {
+ return new CommandExecutor<Server, Client>() {
+
+ private static final long serialVersionUID = -313055602491132604L;
+
+ private int serverCount = count;
+
+ @Override
+ public CommandExecutor<Client, Server> apply(
+ Server server) {
+ serverCount--;
+ logger.debug("Current no shutdown server count: {}", serverCount);
+ CommandExecutor<Client, Server> command = null;
+ if (serverCount > 0) {
+ command = createClientCommandExecutorWithoutShutdown(serverCount--);
+ server.getManager().enqueue(createClientCommandExecutorWithoutShutdown(serverCount));
+ }
+ return command;
+ }
+
+ };
+ }
+}

0 comments on commit 24e097a

Please sign in to comment.
Something went wrong with that request. Please try again.