Skip to content

Commit

Permalink
0003430: Service wrapper should verify and kill abandoned processes
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Feb 17, 2018
1 parent b76a12c commit 3202c09
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public enum Status {
public static final String ENV_SYM_HOME = "SYM_HOME";

public static final String SYSPROP_TMPDIR = "java.io.tmpdir";

public static final String JAR_NAME = "symmetric-wrapper.jar";

public static final int RC_BAD_USAGE = 1;
public static final int RC_INVALID_ARGUMENT = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@
package org.jumpmind.symmetric.wrapper;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;

import org.codehaus.mojo.animal_sniffer.IgnoreJRERequirement;
import org.jumpmind.symmetric.wrapper.jna.CLibrary;
Expand Down Expand Up @@ -138,9 +142,51 @@ public boolean isInstalled() {

@Override
protected boolean isPidRunning(int pid) {
File procFile = new File("/proc/" + pid + "/cmdline");
if (procFile.canRead()) {
try {
List<String> args = readProcFile(procFile);
String appName = config.getApplicationParameters().get(0);
boolean isJava = false;
boolean isMe = false;

for (String arg : args) {
if (arg.contains(config.getJavaCommand())) {
isJava = true;
}
if (arg.contains(appName) || arg.contains(Constants.JAR_NAME)) {
isMe = true;
}
}
return isJava && isMe;
} catch (IOException e) {
}
}
return pid != 0 && CLibrary.INSTANCE.kill(pid, 0) == 0;
}

private List<String> readProcFile(File procFile) throws IOException {
FileInputStream in = new FileInputStream(procFile);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
List<String> args = new ArrayList<String>();
byte buffer[] = new byte[512];
int len = 0;

while ((len = in.read(buffer)) != -1) {
for (int i = 0; i < len; i++) {
if (buffer[i] == (byte) 0x0) {
bout.flush();
args.add(new String(bout.toString()));
bout.reset();
} else {
bout.write(buffer[i]);
}
}
}
in.close();
return args;
}

@Override
protected int getCurrentPid() {
return CLibrary.INSTANCE.getpid();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
*/
package org.jumpmind.symmetric.wrapper;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -87,11 +90,14 @@ public void init() {

@Override
public void start() {
if (isRunning()) {
throw new WrapperException(Constants.RC_SERVER_ALREADY_RUNNING, 0, "Server is already running");
}

if (!isInstalled()) {
super.start();
} else if (isRunning()) {
throw new WrapperException(Constants.RC_SERVER_ALREADY_RUNNING, 0, "Server is already running");
} else {
stopProcesses(true);
Advapi32Ex advapi = Advapi32Ex.INSTANCE;
SC_HANDLE manager = openServiceManager();
SC_HANDLE service = advapi.OpenService(manager, config.getName(), Winsvc.SERVICE_ALL_ACCESS);
Expand Down Expand Up @@ -164,7 +170,7 @@ public boolean isRunning() {
}
closeServiceHandle(service);
closeServiceHandle(manager);
return (status.dwCurrentState == Winsvc.SERVICE_RUNNING);
return (status.dwCurrentState == Winsvc.SERVICE_RUNNING) && super.isRunning();
}
closeServiceHandle(manager);
}
Expand All @@ -175,12 +181,37 @@ public boolean isRunning() {
protected boolean isPidRunning(int pid) {
boolean isRunning = false;
if (pid != 0) {
Kernel32 kernel = Kernel32.INSTANCE;
HANDLE process = kernel.OpenProcess(Kernel32.SYNCHRONIZE, false, pid);
if (process != null) {
int rc = kernel.WaitForSingleObject(process, 0);
kernel.CloseHandle(process);
isRunning = (rc == Kernel32.WAIT_TIMEOUT);
boolean foundProcess = false;
String[] path = config.getJavaCommand().split("/|\\\\");
String javaExe = path[path.length - 1].toLowerCase();
try {
ProcessBuilder pb = new ProcessBuilder("query", "process", String.valueOf(pid));
Process proc = pb.start();
pb.redirectErrorStream(true);
BufferedReader stdout = new BufferedReader(new InputStreamReader(proc.getInputStream()));
String line = stdout.readLine();
line = stdout.readLine();
stdout.close();

if (line != null) {
String[] array = line.split("\\s+");
if (array.length > 0) {
foundProcess = true;
String exeName = array[array.length - 1];
isRunning = exeName.toLowerCase().contains(javaExe);
}
}

} catch (IOException e) {
}
if (!foundProcess) {
Kernel32Ex kernel = Kernel32Ex.INSTANCE;
HANDLE process = kernel.OpenProcess(Kernel32.SYNCHRONIZE, false, pid);
if (process != null) {
int rc = kernel.WaitForSingleObject(process, 0);
kernel.CloseHandle(process);
isRunning = (rc == Kernel32.WAIT_TIMEOUT);
}
}
}
return isRunning;
Expand Down Expand Up @@ -490,6 +521,13 @@ public void serviceMain(int argc, Pointer argv) {
}

if (!isRunning) {
try {
stopProcesses(true);
} catch (Throwable e) {
logEvent(WinNT.EVENTLOG_ERROR_TYPE, "Failed to stop abandoned processes.", e);
updateStatus(Winsvc.SERVICE_STOPPED, 0);
System.exit(Constants.RC_FAIL_STOP_SERVER);
}
try {
execJava(false);
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static void main(String[] args) throws Exception {
}
}
String configFile = dir + File.separator + "conf" + File.separator + "sym_service.conf";
String jarFile = dir + File.separator + "lib" + File.separator + "symmetric-wrapper.jar";
String jarFile = dir + File.separator + "lib" + File.separator + Constants.JAR_NAME;

WrapperHelper.run(args, dir, configFile, jarFile);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ public String getJavaCommand() {
public List<String> getOptions() {
return prop.get("wrapper.java.additional");
}

public List<String> getApplicationParameters() {
return getListProperty(prop, "wrapper.app.parameter");
}

public ArrayList<String> getCommand(boolean isConsole) {
ArrayList<String> cmdList = new ArrayList<String>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public void start() {
throw new WrapperException(Constants.RC_SERVER_ALREADY_RUNNING, 0, "Server is already running");
}

stopProcesses(true);
System.out.println("Waiting for server to start");
ArrayList<String> cmdLine = getWrapperCommand("exec");
Process process = null;
Expand Down Expand Up @@ -187,7 +188,7 @@ protected void execJava(boolean isConsole) {
logger.log(Level.SEVERE, "Stopping server because its output matches a failure condition");
child.destroy();
childReader.close();
stopProcess(serverPid, "symmetricds");
stopProcess(serverPid, "server");
break;
}
if (line.equalsIgnoreCase("Restarting")) {
Expand Down Expand Up @@ -229,18 +230,43 @@ protected void execJava(boolean isConsole) {
}

public void stop() {
int symPid = readPidFromFile(config.getServerPidFile());
stopProcesses(false);
deletePidFile(config.getServerPidFile());
deletePidFile(config.getWrapperPidFile());
System.out.println("Stopped");
}

protected void stopProcesses(boolean isStopAbandoned) {
int serverPid = readPidFromFile(config.getServerPidFile());
int wrapperPid = readPidFromFile(config.getWrapperPidFile());
if (!isPidRunning(symPid) && !isPidRunning(wrapperPid)) {
throw new WrapperException(Constants.RC_SERVER_NOT_RUNNING, 0, "Server is not running");
boolean isServerRunning = isPidRunning(serverPid);
boolean isWrapperRunning = isPidRunning(wrapperPid);

if (!isStopAbandoned) {
if (!isServerRunning && !isWrapperRunning) {
throw new WrapperException(Constants.RC_SERVER_NOT_RUNNING, 0, "Server is not running");
}

System.out.println("Waiting for server to stop");
}

if (isWrapperRunning) {
if (isStopAbandoned) {
System.out.println("Stopping abandoned wrapper PID " + wrapperPid);
}
isWrapperRunning = !stopProcess(wrapperPid, "wrapper");
}
if (isServerRunning) {
if (isStopAbandoned) {
System.out.println("Stopping abandoned server PID " + serverPid);
}
isServerRunning = !stopProcess(serverPid, "server");
}
System.out.println("Waiting for server to stop");
if (!(stopProcess(wrapperPid, "wrapper") && stopProcess(symPid, "symmetricds"))) {
if (isWrapperRunning || isServerRunning) {
throw new WrapperException(Constants.RC_FAIL_STOP_SERVER, 0, "Server did not stop");
}
System.out.println("Stopped");
}

protected boolean stopProcess(int pid, String name) {
killProcess(pid, false);
if (waitForPid(pid)) {
Expand Down Expand Up @@ -287,17 +313,20 @@ public void relaunchAsPrivileged(String cmd, String args) {

public void status() {
boolean isRunning = isRunning();
int wrapperPid = readPidFromFile(config.getWrapperPidFile());
int serverPid = readPidFromFile(config.getServerPidFile());

System.out.println("Installed: " + isInstalled());
System.out.println("Running: " + isRunning);
if (isRunning) {
System.out.println("Wrapper PID: " + readPidFromFile(config.getWrapperPidFile()));
System.out.println("Server PID: " + readPidFromFile(config.getServerPidFile()));
}
System.out.println("Wrapper PID: " + wrapperPid);
System.out.println("Wrapper Running: " + isPidRunning(wrapperPid));
System.out.println("Server PID: " + serverPid);
System.out.println("Server Running: " + isPidRunning(serverPid));
}

public boolean isRunning() {
return isPidRunning(readPidFromFile(config.getWrapperPidFile())) ||
isPidRunning(readPidFromFile(config.getServerPidFile()));
return isPidRunning(readPidFromFile(config.getWrapperPidFile()))
&& isPidRunning(readPidFromFile(config.getServerPidFile()));
}

public int getWrapperPid() {
Expand Down

0 comments on commit 3202c09

Please sign in to comment.