Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 0 additions & 25 deletions python/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
**/PythonInterpreterPandasSqlTest.java,
**/PythonInterpreterMatplotlibTest.java
</python.test.exclude>
<pypi.repo.url>https://pypi.python.org/packages</pypi.repo.url>
<python.py4j.repo.folder>/64/5c/01e13b68e8caafece40d549f232c9b5677ad1016071a48d04cc3895acaa3</python.py4j.repo.folder>
<grpc.version>1.4.0</grpc.version>
<plugin.shade.version>2.4.1</plugin.shade.version>
</properties>
Expand Down Expand Up @@ -137,35 +135,12 @@
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>wagon-maven-plugin</artifactId>
<version>1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals><goal>download-single</goal></goals>
<configuration>
<url>${pypi.repo.url}${python.py4j.repo.folder}</url>
<fromFile>py4j-${python.py4j.version}.zip</fromFile>
<toFile>${project.build.directory}/../../interpreter/python/py4j-${python.py4j.version}.zip</toFile>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<phase>package</phase>
<configuration>
<target>
<unzip src="${project.build.directory}/../../interpreter/python/py4j-${python.py4j.version}.zip"
dest="${project.build.directory}/../../interpreter/python"/>
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,16 +243,21 @@ private void setupJVMGateway(int jvmGatewayPort) throws IOException {
private void launchIPythonKernel(int ipythonPort)
throws IOException, URISyntaxException {
// copy the python scripts to a temp directory, then launch ipython kernel in that folder
File tmpPythonScriptFolder = Files.createTempDirectory("zeppelin_ipython").toFile();
File pythonWorkDir = Files.createTempDirectory("zeppelin_ipython").toFile();
String[] ipythonScripts = {"ipython_server.py", "ipython_pb2.py", "ipython_pb2_grpc.py"};
for (String ipythonScript : ipythonScripts) {
URL url = getClass().getClassLoader().getResource("grpc/python"
+ "/" + ipythonScript);
FileUtils.copyURLToFile(url, new File(tmpPythonScriptFolder, ipythonScript));
FileUtils.copyURLToFile(url, new File(pythonWorkDir, ipythonScript));
}

//TODO(zjffdu) don't do hard code on py4j here
File py4jDestFile = new File(pythonWorkDir, "py4j-src-0.9.2.zip");
FileUtils.copyURLToFile(getClass().getClassLoader().getResource(
"python/py4j-src-0.9.2.zip"), py4jDestFile);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, 2.3 is running with Py4J 0.10.6

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this detect any mismatch here? check spark version or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is fine to use py4j 0.9.2 here for IPythonInterpreter, as for IPySparkInterpreter it would use the py4j of spark instead of py4j 0.9.2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why Spark doesn't ship py4j zip file version-agnostic? Filed https://issues.apache.org/jira/browse/SPARK-23965

Copy link
Member

@HyukjinKwon HyukjinKwon Apr 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a strong reason to rename or make a link for Spark's Py4J within Spark. Also, to be clear, I think It's an orthogonal issue with the current change here, if I am not mistaken.


CommandLine cmd = CommandLine.parse(pythonExecutable);
cmd.addArgument(tmpPythonScriptFolder.getAbsolutePath() + "/ipython_server.py");
cmd.addArgument(pythonWorkDir.getAbsolutePath() + "/ipython_server.py");
cmd.addArgument(ipythonPort + "");
DefaultExecutor executor = new DefaultExecutor();
ProcessLogOutputStream processOutput = new ProcessLogOutputStream(LOGGER);
Expand All @@ -261,20 +266,12 @@ private void launchIPythonKernel(int ipythonPort)
executor.setWatchdog(watchDog);

if (useBuiltinPy4j) {
String py4jLibPath = null;
if (System.getenv("ZEPPELIN_HOME") != null) {
py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator
+ PythonInterpreter.ZEPPELIN_PY4JPATH;
} else {
Path workingPath = Paths.get("..").toAbsolutePath();
py4jLibPath = workingPath + File.separator + PythonInterpreter.ZEPPELIN_PY4JPATH;
}
if (additionalPythonPath != null) {
// put the py4j at the end, because additionalPythonPath may already contain py4j.
// e.g. PySparkInterpreter
additionalPythonPath = additionalPythonPath + ":" + py4jLibPath;
additionalPythonPath = additionalPythonPath + ":" + py4jDestFile.getAbsolutePath();
} else {
additionalPythonPath = py4jLibPath;
additionalPythonPath = py4jDestFile.getAbsolutePath();
}
}

Expand Down Expand Up @@ -326,7 +323,7 @@ protected Map<String, String> setupIPythonEnv() throws IOException {
@Override
public void close() throws InterpreterException {
if (watchDog != null) {
LOGGER.debug("Kill IPython Process");
LOGGER.info("Kill IPython Process");
ipythonClient.stop(StopRequest.newBuilder().build());
watchDog.destroyProcess();
gatewayServer.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@

/**
* Conda support
* TODO(zjffdu) Add removing conda env
*/
public class PythonCondaInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(PythonCondaInterpreter.class);
private static Logger logger = LoggerFactory.getLogger(PythonCondaInterpreter.class);
public static final String ZEPPELIN_PYTHON = "zeppelin.python";
public static final String CONDA_PYTHON_PATH = "/bin/python";
public static final String DEFAULT_ZEPPELIN_PYTHON = "python";
Expand Down Expand Up @@ -145,33 +146,22 @@ private void changePythonEnvironment(String envName)
}
}
setCurrentCondaEnvName(envName);
python.setPythonCommand(binPath);
python.setPythonExec(binPath);
}

private void restartPythonProcess() throws InterpreterException {
PythonInterpreter python = getPythonInterpreter();
logger.debug("Restarting PythonInterpreter");
Interpreter python =
getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
python.close();
python.open();
}

protected PythonInterpreter getPythonInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
PythonInterpreter python = null;
Interpreter p =
getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());

while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
python = (PythonInterpreter) p;

if (lazy != null) {
lazy.open();
}
return python;
return (PythonInterpreter) ((LazyOpenInterpreter)p).getInnerInterpreter();
}

public static String runCondaCommandForTextOutput(String title, List<String> commands)
Expand Down Expand Up @@ -392,27 +382,50 @@ public Scheduler getScheduler() {

public static String runCommand(List<String> commands)
throws IOException, InterruptedException {
logger.info("Starting shell commands: " + StringUtils.join(commands, " "));
Process process = Runtime.getRuntime().exec(commands.toArray(new String[0]));
StreamGobbler errorGobbler = new StreamGobbler(process.getErrorStream());
StreamGobbler outputGobbler = new StreamGobbler(process.getInputStream());
errorGobbler.start();
outputGobbler.start();
if (process.waitFor() != 0) {
throw new IOException("Fail to run shell commands: " + StringUtils.join(commands, " "));
}
logger.info("Complete shell commands: " + StringUtils.join(commands, " "));
return outputGobbler.getOutput();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we had some launcher wrapper for something like this?

}

StringBuilder sb = new StringBuilder();
private static class StreamGobbler extends Thread {
InputStream is;
StringBuilder output = new StringBuilder();

ProcessBuilder builder = new ProcessBuilder(commands);
builder.redirectErrorStream(true);
Process process = builder.start();
InputStream stdout = process.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(stdout));
String line;
while ((line = br.readLine()) != null) {
sb.append(line);
sb.append("\n");
// reads everything from is until empty.
StreamGobbler(InputStream is) {
this.is = is;
}
int r = process.waitFor(); // Let the process finish.

if (r != 0) {
throw new RuntimeException("Failed to execute `" +
StringUtils.join(commands, " ") + "` exited with " + r);
public void run() {
try {
InputStreamReader isr = new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);
String line = null;
long startTime = System.currentTimeMillis();
while ( (line = br.readLine()) != null) {
output.append(line + "\n");
// logging per 5 seconds
if ((System.currentTimeMillis() - startTime) > 5000) {
logger.info(line);
startTime = System.currentTimeMillis();
}
}
} catch (IOException ioe) {
ioe.printStackTrace();
}
}

return sb.toString();
public String getOutput() {
return output.toString();
}
}

public static String runCommand(String ... command)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void close() {
@Override
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
File pythonScript = new File(getPythonInterpreter().getScriptPath());
File pythonWorkDir = getPythonInterpreter().getPythonWorkDir();
InterpreterOutput out = context.out;

Matcher activateMatcher = activatePattern.matcher(st);
Expand All @@ -73,26 +73,23 @@ public InterpreterResult interpret(String st, InterpreterContext context)
pull(out, image);

// mount pythonscript dir
String mountPythonScript = "-v " +
pythonScript.getParentFile().getAbsolutePath() +
":/_zeppelin_tmp ";
String mountPythonScript = "-v " + pythonWorkDir.getAbsolutePath() +
":/_python_workdir ";

// mount zeppelin dir
String mountPy4j = "-v " +
zeppelinHome.getAbsolutePath() +
String mountPy4j = "-v " + zeppelinHome.getAbsolutePath() +
":/_zeppelin ";

// set PYTHONPATH
String pythonPath = ":/_zeppelin/" + PythonInterpreter.ZEPPELIN_PY4JPATH + ":" +
":/_zeppelin/" + PythonInterpreter.ZEPPELIN_PYTHON_LIBS;
String pythonPath = ".:/_python_workdir/py4j-src-0.9.2.zip:/_python_workdir";

setPythonCommand("docker run -i --rm " +
mountPythonScript +
mountPy4j +
"-e PYTHONPATH=\"" + pythonPath + "\" " +
image + " " +
getPythonInterpreter().getPythonBindPath() + " " +
"/_zeppelin_tmp/" + pythonScript.getName());
getPythonInterpreter().getPythonExec() + " " +
"/_python_workdir/zeppelin_python.py");
restartPythonProcess();
out.clear();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "\"" + image + "\" activated");
Expand All @@ -108,7 +105,7 @@ public InterpreterResult interpret(String st, InterpreterContext context)

public void setPythonCommand(String cmd) throws InterpreterException {
PythonInterpreter python = getPythonInterpreter();
python.setPythonCommand(cmd);
python.setPythonExec(cmd);
}

private void printUsage(InterpreterOutput out) {
Expand Down
Loading