From 4087cfe2b17fa0327c5a73d51aad44ce1e74776b Mon Sep 17 00:00:00 2001 From: Igor Drozdov Date: Mon, 20 Mar 2017 17:16:22 +0300 Subject: [PATCH 1/4] Interpret scala code as a single piece --- hbase/pom.xml | 4 - pom.xml | 8 ++ .../zeppelin/python/PythonInterpreter.java | 3 +- .../zeppelin/spark/PySparkInterpreter.java | 3 +- .../zeppelin/spark/SparkInterpreter.java | 118 ++++++------------ .../interpreter/InterpreterResultMessage.java | 22 +++- .../util/InterpreterOutputStream.java | 9 +- zeppelin-server/pom.xml | 6 + .../rest/ZeppelinSparkClusterTest.java | 55 ++++---- 9 files changed, 114 insertions(+), 114 deletions(-) diff --git a/hbase/pom.xml b/hbase/pom.xml index 08b0cd70712..a7a862e9366 100644 --- a/hbase/pom.xml +++ b/hbase/pom.xml @@ -39,9 +39,6 @@ 2.5.0 1.1 2.12.1 - - - 1.3 @@ -66,7 +63,6 @@ org.hamcrest hamcrest-all - ${hamcrest.all.version} test diff --git a/pom.xml b/pom.xml index 87af32376c7..3da0c0a5026 100644 --- a/pom.xml +++ b/pom.xml @@ -109,6 +109,7 @@ 4.12 + 1.3 1.10.19 1.7.0 1.6.4 @@ -263,6 +264,13 @@ test + + org.hamcrest + hamcrest-all + ${hamcrest.all.version} + test + + org.assertj assertj-core diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java index 0bfcae0d3e6..96200528979 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java @@ -313,8 +313,9 @@ public void onPythonScriptInitialized(long pid) { } } + @SuppressWarnings("unused") // reflection call from python public void appendOutput(String message) throws IOException { - outputStream.getInterpreterOutput().write(message); + outputStream.getInterpreterOutput().write(message.getBytes()); } @Override diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index b4e434f3b0d..b1aa95459a5 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -332,9 +332,10 @@ public void onPythonScriptInitialized(long pid) { } } + @SuppressWarnings("unused") // reflection call from python public void appendOutput(String message) throws IOException { LOGGER.debug("Output from python process: " + message); - outputStream.getInterpreterOutput().write(message); + outputStream.getInterpreterOutput().write(message.getBytes()); } @Override diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index f757c21c8a8..3d49768385c 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.spark; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.PrintWriter; @@ -32,6 +33,7 @@ import com.google.common.base.Joiner; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; @@ -909,6 +911,8 @@ public void open() { interpret("import spark.sql"); interpret("import org.apache.spark.sql.functions._"); } else { + // see https://issues.apache.org/jira/browse/SPARK-14146 + System.setProperty("spark.repl.fallback", "true"); if (sparkVersion.oldSqlContextImplicits()) { interpret("import sqlContext._"); } else { @@ -1162,118 +1166,72 @@ boolean isUnsupportedSparkVersion() { return enableSupportedVersionCheck && sparkVersion.isUnsupportedVersion(); } - /** - * Interpret a single line. - */ @Override - public InterpreterResult interpret(String line, InterpreterContext context) { + public InterpreterResult interpret(String code, InterpreterContext context) { if (isUnsupportedSparkVersion()) { return new InterpreterResult(Code.ERROR, "Spark " + sparkVersion.toString() + " is not supported"); } populateSparkWebUrl(context); z.setInterpreterContext(context); - if (line == null || line.trim().length() == 0) { + if (StringUtils.isBlank(code)) { return new InterpreterResult(Code.SUCCESS); } - return interpret(line.split("\n"), context); + return doInterpret(code, context); } - public InterpreterResult interpret(String[] lines, InterpreterContext context) { + public InterpreterResult doInterpret(String code, InterpreterContext context) { synchronized (this) { z.setGui(context.getGui()); sc.setJobGroup(Utils.buildJobGroupId(context), "Zeppelin", false); - InterpreterResult r = interpretInput(lines, context); + InterpreterResult r = interpretInput(code, context); sc.clearJobGroup(); return r; } } - public InterpreterResult interpretInput(String[] lines, InterpreterContext context) { + public InterpreterResult interpretInput(String code, InterpreterContext context) { SparkEnv.set(env); - String[] linesToRun = new String[lines.length]; - for (int i = 0; i < lines.length; i++) { - linesToRun[i] = lines[i]; - } - Console.setOut(context.out); - out.setInterpreterOutput(context.out); + ByteArrayOutputStream replOutput = new ByteArrayOutputStream(); + out.setInterpreterOutput(replOutput); context.out.clear(); - Code r = null; - String incomplete = ""; - boolean inComment = false; - - for (int l = 0; l < linesToRun.length; l++) { - String s = linesToRun[l]; - // check if next line starts with "." (but not ".." or "./") it is treated as an invocation - if (l + 1 < linesToRun.length) { - String nextLine = linesToRun[l + 1].trim(); - boolean continuation = false; - if (nextLine.isEmpty() - || nextLine.startsWith("//") // skip empty line or comment - || nextLine.startsWith("}") - || nextLine.startsWith("object")) { // include "} object" for Scala companion object - continuation = true; - } else if (!inComment && nextLine.startsWith("/*")) { - inComment = true; - continuation = true; - } else if (inComment && nextLine.lastIndexOf("*/") >= 0) { - inComment = false; - continuation = true; - } else if (nextLine.length() > 1 - && nextLine.charAt(0) == '.' - && nextLine.charAt(1) != '.' // ".." - && nextLine.charAt(1) != '/') { // "./" - continuation = true; - } else if (inComment) { - continuation = true; - } - if (continuation) { - incomplete += s + "\n"; - continue; - } - } - scala.tools.nsc.interpreter.Results.Result res = null; - try { - res = interpret(incomplete + s); - } catch (Exception e) { - sc.clearJobGroup(); - out.setInterpreterOutput(null); - logger.info("Interpreter exception", e); - return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e)); - } + scala.tools.nsc.interpreter.Results.Result res; + try { + res = interpret(code); - r = getResultCode(res); + Code r = getResultCode(res); if (r == Code.ERROR) { - sc.clearJobGroup(); - out.setInterpreterOutput(null); return new InterpreterResult(r, ""); - } else if (r == Code.INCOMPLETE) { - incomplete += s + "\n"; - } else { - incomplete = ""; } - } - // make sure code does not finish with comment - if (r == Code.INCOMPLETE) { - scala.tools.nsc.interpreter.Results.Result res = null; - res = interpret(incomplete + "\nprint(\"\")"); - r = getResultCode(res); - } + // make sure code does not finish with comment + if (r == Code.INCOMPLETE) { + res = interpret(code + "\nprint(\"\")"); + r = getResultCode(res); + } - if (r == Code.INCOMPLETE) { - sc.clearJobGroup(); - out.setInterpreterOutput(null); - return new InterpreterResult(r, "Incomplete expression"); - } else { - sc.clearJobGroup(); - putLatestVarInResourcePool(context); + if (r == Code.INCOMPLETE) { + return new InterpreterResult(r, "Incomplete expression"); + } else { + putLatestVarInResourcePool(context); + return new InterpreterResult(Code.SUCCESS); + } + } catch (Exception e) { + logger.info("Interpreter exception", e); + return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e)); + } finally { + try { + context.out.setType(InterpreterResult.Type.TEXT); + context.out.write(replOutput.toByteArray()); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } out.setInterpreterOutput(null); - return new InterpreterResult(Code.SUCCESS); + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessage.java index f137ca56be7..60efb1b7f06 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessage.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessage.java @@ -22,8 +22,8 @@ * Interpreter result message */ public class InterpreterResultMessage implements Serializable { - InterpreterResult.Type type; - String data; + private final InterpreterResult.Type type; + private final String data; public InterpreterResultMessage(InterpreterResult.Type type, String data) { this.type = type; @@ -38,6 +38,24 @@ public String getData() { return data; } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + InterpreterResultMessage that = (InterpreterResultMessage) o; + + if (type != that.type) return false; + return data.equals(that.data); + } + + @Override + public int hashCode() { + int result = type.hashCode(); + result = 31 * result + data.hashCode(); + return result; + } + public String toString() { return "%" + type.name().toLowerCase() + " " + data; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/InterpreterOutputStream.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/InterpreterOutputStream.java index 6f2a0b4059a..5729ec838dd 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/InterpreterOutputStream.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/InterpreterOutputStream.java @@ -21,6 +21,7 @@ import org.slf4j.Logger; import java.io.IOException; +import java.io.OutputStream; /** * Output Stream integrated with InterpreterOutput. @@ -29,18 +30,18 @@ */ public class InterpreterOutputStream extends LogOutputStream { private Logger logger; - InterpreterOutput interpreterOutput; - boolean ignoreLeadingNewLinesFromScalaReporter = false; + private OutputStream interpreterOutput; + private boolean ignoreLeadingNewLinesFromScalaReporter = false; public InterpreterOutputStream(Logger logger) { this.logger = logger; } - public InterpreterOutput getInterpreterOutput() { + public OutputStream getInterpreterOutput() { return interpreterOutput; } - public void setInterpreterOutput(InterpreterOutput interpreterOutput) { + public void setInterpreterOutput(OutputStream interpreterOutput) { this.interpreterOutput = interpreterOutput; } diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index ac5cb2b1256..e7784e33712 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -329,6 +329,12 @@ test + + org.hamcrest + hamcrest-all + test + + org.mockito mockito-all diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 27143522b70..6b720a5920b 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -16,8 +16,15 @@ */ package org.apache.zeppelin.rest; +import static org.hamcrest.Matchers.anything; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import java.io.File; @@ -28,6 +35,7 @@ import org.apache.commons.io.FileUtils; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResultMessage; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.Paragraph; @@ -91,10 +99,11 @@ public void scalaOutputTest() throws IOException { p.setAuthenticationInfo(anonymous); note.run(p.getId()); waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals("import java.util.Date\n" + - "import java.net.URL\n" + - "hello\n", p.getResult().message().get(0).getData()); + assertThat(p.getStatus(), is(Status.FINISHED)); + assertThat(p.getResult().message(), contains( + new InterpreterResultMessage(InterpreterResult.Type.TEXT, "hello\n"), + new InterpreterResultMessage(InterpreterResult.Type.TEXT, "import java.util.Date\nimport java.net.URL\n") + )); ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } @@ -136,23 +145,25 @@ public void sparkSQLTest() throws IOException { p.setAuthenticationInfo(anonymous); note.run(p.getId()); waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - assertTrue(p.getResult().message().get(0).getData().contains( - "Array[org.apache.spark.sql.Row] = Array([hello,20])")); + assertThat(p.getStatus(), is(Status.FINISHED)); + assertThat(p.getResult().message(), contains( + hasProperty("data", containsString("Array[org.apache.spark.sql.Row] = Array([hello,20])")) + )); // test display DataFrame p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); config = p.getConfig(); config.put("enabled", true); p.setConfig(config); - p.setText("%spark val df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n" + - "z.show(df)"); + p.setText("%spark val df=sqlContext.createDataFrame(Seq((\"hello\",20)))\nz.show(df)"); p.setAuthenticationInfo(anonymous); note.run(p.getId()); waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(1).getType()); - assertEquals("_1\t_2\nhello\t20\n", p.getResult().message().get(1).getData()); + assertThat(p.getStatus(), is(Status.FINISHED)); + assertThat(p.getResult().message(), contains( + is(new InterpreterResultMessage(InterpreterResult.Type.TABLE, "_1\t_2\nhello\t20\n")), + anything() + )); // test display DataSet if (sparkVersion >= 20) { @@ -165,9 +176,11 @@ public void sparkSQLTest() throws IOException { p.setAuthenticationInfo(anonymous); note.run(p.getId()); waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(1).getType()); - assertEquals("_1\t_2\nhello\t20\n", p.getResult().message().get(1).getData()); + assertThat(p.getStatus(), is(Status.FINISHED)); + assertThat(p.getResult().message(), contains( + is(new InterpreterResultMessage(InterpreterResult.Type.TABLE, "_1\t_2\nhello\t20\n")), + anything() + )); } ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } @@ -522,19 +535,17 @@ public void testSparkZeppelinContextDynamicForms() throws IOException { note.run(p.getId()); waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); + assertThat(p.getStatus(), is(Status.FINISHED)); Iterator formIter = p.settings.getForms().keySet().iterator(); assert(formIter.next().equals("my_input")); assert(formIter.next().equals("my_select")); assert(formIter.next().equals("my_checkbox")); // check dynamic forms values - String[] result = p.getResult().message().get(0).getData().split("\n"); - assertEquals(4, result.length); - assertEquals("default_name", result[0]); - assertEquals("1", result[1]); - assertEquals("items: Seq[Object] = Buffer(2)", result[2]); - assertEquals("2", result[3]); + assertThat(p.getResult().message(), contains( + new InterpreterResultMessage(InterpreterResult.Type.TEXT, "default_name\n1\n2\n"), + new InterpreterResultMessage(InterpreterResult.Type.TEXT, "items: Seq[Object] = Buffer(2)\n") + )); } @Test From 71d8eabf5811d1a6186dbdf6710536d245af4d88 Mon Sep 17 00:00:00 2001 From: Igor Drozdov Date: Thu, 18 May 2017 11:13:57 +0300 Subject: [PATCH 2/4] Companion object definition test --- .../zeppelin/spark/SparkInterpreterTest.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index 3a31e5dd845..790ad6d1784 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -43,6 +43,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.hamcrest.CoreMatchers.is; + @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class SparkInterpreterTest { @@ -327,4 +329,16 @@ public void testParagraphUrls() { assertTrue(jobUrl.startsWith(sparkUIUrl + "/jobs/job?id=")); } + + @Test + public void define_companion_object_in_different_line() throws IOException { + // Companion object has access to class' private members + InterpreterResult result = repl.interpret( + "class Foo(private val privateMember: Any = null)\n" + + "object Foo {\n" + + " def foo = new Foo().privateMember\n" + + "}\n", + context); + assertThat(result.code(), is(Code.SUCCESS)); + } } From c7e5c58d333bea498fdd9edfbe7548efd75458b3 Mon Sep 17 00:00:00 2001 From: Igor Drozdov Date: Mon, 22 May 2017 16:07:46 +0300 Subject: [PATCH 3/4] fixup Interpret scala code as a single piece --- .../zeppelin/spark/SparkInterpreter.java | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 3d49768385c..f60844efe01 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -905,14 +905,17 @@ public void open() { interpret("import org.apache.spark.SparkContext._"); + if (Utils.isScala2_10()) { + // see https://issues.apache.org/jira/browse/SPARK-14146 + System.setProperty("spark.repl.fallback", "true"); + } + if (importImplicit()) { if (Utils.isSpark2()) { interpret("import spark.implicits._"); interpret("import spark.sql"); interpret("import org.apache.spark.sql.functions._"); } else { - // see https://issues.apache.org/jira/browse/SPARK-14146 - System.setProperty("spark.repl.fallback", "true"); if (sparkVersion.oldSqlContextImplicits()) { interpret("import sqlContext._"); } else { @@ -1198,23 +1201,14 @@ public InterpreterResult interpretInput(String code, InterpreterContext context) out.setInterpreterOutput(replOutput); context.out.clear(); - scala.tools.nsc.interpreter.Results.Result res; try { - res = interpret(code); + scala.tools.nsc.interpreter.Results.Result res = interpret(code); Code r = getResultCode(res); if (r == Code.ERROR) { return new InterpreterResult(r, ""); - } - - // make sure code does not finish with comment - if (r == Code.INCOMPLETE) { - res = interpret(code + "\nprint(\"\")"); - r = getResultCode(res); - } - - if (r == Code.INCOMPLETE) { + } else if (r == Code.INCOMPLETE) { return new InterpreterResult(r, "Incomplete expression"); } else { putLatestVarInResourcePool(context); From 12b3d3303865da426e9759cffa90a27bef346eac Mon Sep 17 00:00:00 2001 From: Igor Drozdov Date: Mon, 5 Jun 2017 12:05:01 +0300 Subject: [PATCH 4/4] Don't use hamrest in tests --- .../rest/ZeppelinSparkClusterTest.java | 51 ++++++++----------- 1 file changed, 20 insertions(+), 31 deletions(-) diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 6b720a5920b..9de9a6d9a83 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -16,15 +16,8 @@ */ package org.apache.zeppelin.rest; -import static org.hamcrest.Matchers.anything; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.hasProperty; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import java.io.File; @@ -99,11 +92,12 @@ public void scalaOutputTest() throws IOException { p.setAuthenticationInfo(anonymous); note.run(p.getId()); waitForFinish(p); - assertThat(p.getStatus(), is(Status.FINISHED)); - assertThat(p.getResult().message(), contains( - new InterpreterResultMessage(InterpreterResult.Type.TEXT, "hello\n"), - new InterpreterResultMessage(InterpreterResult.Type.TEXT, "import java.util.Date\nimport java.net.URL\n") - )); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals(p.getResult().message().size(), 2); + assertEquals(p.getResult().message().get(0), + new InterpreterResultMessage(InterpreterResult.Type.TEXT, "hello\n")); + assertEquals(p.getResult().message().get(1), + new InterpreterResultMessage(InterpreterResult.Type.TEXT, "import java.util.Date\nimport java.net.URL\n")); ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } @@ -145,10 +139,9 @@ public void sparkSQLTest() throws IOException { p.setAuthenticationInfo(anonymous); note.run(p.getId()); waitForFinish(p); - assertThat(p.getStatus(), is(Status.FINISHED)); - assertThat(p.getResult().message(), contains( - hasProperty("data", containsString("Array[org.apache.spark.sql.Row] = Array([hello,20])")) - )); + assertEquals(Status.FINISHED, p.getStatus()); + assertTrue(p.getResult().message().get(0).getData().contains( + "Array[org.apache.spark.sql.Row] = Array([hello,20])")); // test display DataFrame p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); @@ -159,11 +152,9 @@ public void sparkSQLTest() throws IOException { p.setAuthenticationInfo(anonymous); note.run(p.getId()); waitForFinish(p); - assertThat(p.getStatus(), is(Status.FINISHED)); - assertThat(p.getResult().message(), contains( - is(new InterpreterResultMessage(InterpreterResult.Type.TABLE, "_1\t_2\nhello\t20\n")), - anything() - )); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals(p.getResult().message().get(0), + new InterpreterResultMessage(InterpreterResult.Type.TABLE, "_1\t_2\nhello\t20\n")); // test display DataSet if (sparkVersion >= 20) { @@ -176,11 +167,9 @@ public void sparkSQLTest() throws IOException { p.setAuthenticationInfo(anonymous); note.run(p.getId()); waitForFinish(p); - assertThat(p.getStatus(), is(Status.FINISHED)); - assertThat(p.getResult().message(), contains( - is(new InterpreterResultMessage(InterpreterResult.Type.TABLE, "_1\t_2\nhello\t20\n")), - anything() - )); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals(p.getResult().message().get(0), + new InterpreterResultMessage(InterpreterResult.Type.TABLE, "_1\t_2\nhello\t20\n")); } ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } @@ -535,17 +524,17 @@ public void testSparkZeppelinContextDynamicForms() throws IOException { note.run(p.getId()); waitForFinish(p); - assertThat(p.getStatus(), is(Status.FINISHED)); + assertEquals(Status.FINISHED, p.getStatus()); Iterator formIter = p.settings.getForms().keySet().iterator(); assert(formIter.next().equals("my_input")); assert(formIter.next().equals("my_select")); assert(formIter.next().equals("my_checkbox")); // check dynamic forms values - assertThat(p.getResult().message(), contains( - new InterpreterResultMessage(InterpreterResult.Type.TEXT, "default_name\n1\n2\n"), - new InterpreterResultMessage(InterpreterResult.Type.TEXT, "items: Seq[Object] = Buffer(2)\n") - )); + assertEquals(p.getResult().message().get(0), + new InterpreterResultMessage(InterpreterResult.Type.TEXT, "default_name\n1\n2\n")); + assertEquals(p.getResult().message().get(1), + new InterpreterResultMessage(InterpreterResult.Type.TEXT, "items: Seq[Object] = Buffer(2)\n")); } @Test