Skip to content
This repository was archived by the owner on Nov 17, 2023. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ml.dmlc.mxnet.spark

import java.io.IOException
import java.io.{IOException, InputStream, OutputStream}
import java.util.concurrent.atomic.AtomicReference

import ml.dmlc.mxnet.KVStoreServer
Expand Down Expand Up @@ -77,14 +77,46 @@ class ParameterServer(private val classpath: String,
private val logger: Logger = LoggerFactory.getLogger(classOf[ParameterServer])
private val trackerProcess: AtomicReference[Process] = new AtomicReference[Process]

/**
* A utility class to redirect the child process's stdout or stderr.
*/
private class RedirectThread(
in: InputStream,
out: OutputStream,
name: String,
propagateEof: Boolean = false)
extends Thread(name) {

setDaemon(true)
override def run() {
val buf = new Array[Byte](1024)
var len = in.read(buf)
while (len != -1) {
out.write(buf, 0, len)
out.flush()
len = in.read(buf)
}
if (propagateEof) {
out.close()
}
}
}

def startProcess(): Boolean = {
val cp = if (classpath == null) "" else s"-cp $classpath"
val cmd = s"$java $jvmOpts $cp $runningClass " +
s"--role=$role --root-uri=$rootUri --root-port=$rootPort " +
s"--num-server=$numServer --num-worker=$numWorker"
logger.info(s"Start process: $cmd")
try {
trackerProcess.set(Runtime.getRuntime.exec(cmd))
val childProcess = Runtime.getRuntime.exec(cmd)
trackerProcess.set(childProcess)
val inputStream = childProcess.getInputStream
val errorStream = childProcess.getErrorStream
logger.info("Starting InputStream-Redirecter Thread")
new RedirectThread(inputStream, System.out, "InputStream-Redirecter", true).start()
logger.info("Starting ErrorStream-Redirecter Thread")
new RedirectThread(errorStream, System.err, "ErrorStream-Redirecter", true).start()
true
} catch {
case ioe: IOException =>
Expand Down