Skip to content

Commit

Permalink
[ZEPPELIN-4692]. zeppelin pyspark doesn't print java output
Browse files Browse the repository at this point in the history
  • Loading branch information
zjffdu committed Mar 25, 2020
1 parent 8988723 commit 57e6c16
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,12 @@ public void testIPythonPlotting() throws InterpreterException, InterruptedExcept
"df = pd.DataFrame(np.random.randn(1000, 4), index=idx, columns=list('ABCD')).cumsum()\n" +
"import hvplot.pandas\n" +
"df.hvplot()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(context.out.toInterpreterResultMessage().get(0).getData(),
InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.toInterpreterResultMessage();
for (InterpreterResultMessage message : interpreterResultMessages) {
System.out.println(message);
}
assertEquals(4, interpreterResultMessages.size());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.python.IPythonInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.PrintStream;
import java.util.Map;
import java.util.Properties;

Expand Down Expand Up @@ -91,25 +93,35 @@ public ZeppelinContext buildZeppelinContext() {
@Override
public InterpreterResult interpret(String st,
InterpreterContext context) throws InterpreterException {
Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
InterpreterContext.set(context);
String jobGroupId = Utils.buildJobGroupId(context);
String jobDesc = Utils.buildJobDesc(context);
String setJobGroupStmt = "sc.setJobGroup('" + jobGroupId + "', '" + jobDesc + "')";
InterpreterResult result = super.interpret(setJobGroupStmt, context);
if (result.code().equals(InterpreterResult.Code.ERROR)) {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setJobGroup");
// redirect java stdout/stdout to interpreter output. Because pyspark may call java code.
PrintStream originalStdout = System.out;
PrintStream originalStderr = System.err;
try {
System.setOut(new PrintStream(context.out));
System.setErr(new PrintStream(context.out));
Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
InterpreterContext.set(context);
String jobGroupId = Utils.buildJobGroupId(context);
String jobDesc = Utils.buildJobDesc(context);
String setJobGroupStmt = "sc.setJobGroup('" + jobGroupId + "', '" + jobDesc + "')";
InterpreterResult result = super.interpret(setJobGroupStmt, context);
if (result.code().equals(InterpreterResult.Code.ERROR)) {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setJobGroup");
}
String pool = "None";
if (context.getLocalProperties().containsKey("pool")) {
pool = "'" + context.getLocalProperties().get("pool") + "'";
}
String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
result = super.interpret(setPoolStmt, context);
if (result.code().equals(InterpreterResult.Code.ERROR)) {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setPool");
}
return super.interpret(st, context);
} finally {
System.setOut(originalStdout);
System.setErr(originalStderr);
}
String pool = "None";
if (context.getLocalProperties().containsKey("pool")) {
pool = "'" + context.getLocalProperties().get("pool") + "'";
}
String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
result = super.interpret(setPoolStmt, context);
if (result.code().equals(InterpreterResult.Code.ERROR)) {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setPool");
}
return super.interpret(st, context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.python.IPythonInterpreter;
import org.apache.zeppelin.python.PythonInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
Expand Down Expand Up @@ -125,8 +127,18 @@ protected ZeppelinContext createZeppelinContext() {
@Override
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
return super.interpret(st, context);
// redirect java stdout/stdout to interpreter output. Because pyspark may call java code.
PrintStream originalStdout = System.out;
PrintStream originalStderr = System.err;
try {
System.setOut(new PrintStream(context.out));
System.setErr(new PrintStream(context.out));
Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
return super.interpret(st, context);
} finally {
System.setOut(originalStdout);
System.setErr(originalStderr);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ public void run() {
assertTrue(completions.size() > 0);
completions.contains(new InterpreterCompletion("sc", "sc", ""));

// python call java via py4j
context = createInterpreterContext(mockIntpEventClient);
result = interpreter.interpret("sc._jvm.java.lang.System.out.println(\"hello world\")", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.toInterpreterResultMessage();
assertEquals(1, interpreterResultMessages.size());
assertEquals("hello world\n", interpreterResultMessages.get(0).getData());

// pyspark streaming TODO(zjffdu) disable pyspark streaming test temporary
context = createInterpreterContext(mockIntpEventClient);
// result = interpreter.interpret(
Expand Down
4 changes: 2 additions & 2 deletions testing/install_external_dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ if [[ -n "$PYTHON" ]] ; then

if [[ "$PYTHON" == "2" ]] ; then
pip install -q numpy==1.14.5 pandas==0.21.1 matplotlib==2.1.1 scipy==1.2.1 grpcio==1.19.0 bkzep==0.6.1 hvplot==0.5.2 \
protobuf==3.7.0 pandasql==0.7.3 ipython==5.8.0 ipykernel==4.10.0 bokeh==1.3.4
protobuf==3.7.0 pandasql==0.7.3 ipython==5.8.0 ipykernel==4.10.0 bokeh==1.3.4 panel==0.6.0 holoviews==1.12.3
else
pip install -q pycodestyle==2.5.0
pip install -q numpy==1.17.3 pandas==0.25.0 scipy==1.3.1 grpcio==1.19.0 bkzep==0.6.1 hvplot==0.5.2 protobuf==3.10.0 \
pandasql==0.7.3 ipython==7.8.0 matplotlib==3.0.3 ipykernel==5.1.2 jupyter_client==5.3.4 bokeh==1.3.4 pycodestyle==2.5.0 apache_beam==2.15.0
pandasql==0.7.3 ipython==7.8.0 matplotlib==3.0.3 ipykernel==5.1.2 jupyter_client==5.3.4 bokeh==1.3.4 panel==0.6.0 holoviews==1.12.3 pycodestyle==2.5.0 apache_beam==2.15.0
fi

if [[ -n "$TENSORFLOW" ]] ; then
Expand Down

0 comments on commit 57e6c16

Please sign in to comment.