From b80e179680836b2b768fc046e75ffe62c852e63d Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 31 Jul 2018 09:04:57 +0800 Subject: [PATCH] ZEPPELIN-3668. Can't hide Spark Jobs (Spark UI) button This is to fix the bug of unable to hide spark jobs. [Bug Fix] * [ ] - Task * https://issues.apache.org/jira/browse/ZEPPELIN-3668 * Unit test added * Verify it manually as well. * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang Closes #3104 from zjffdu/ZEPPELIN-3668 and squashes the following commits: c7ddecc6e [Jeff Zhang] ZEPPELIN-3668. Can't hide Spark Jobs (Spark UI) button (cherry picked from commit 343fd178edf85bb7880ebd4fcecf0b11a7f38561) Signed-off-by: Jeff Zhang --- .../spark/NewSparkInterpreterTest.java | 32 +++++++++++++++++-- .../apache/zeppelin/spark/SparkShimsTest.java | 2 -- .../org/apache/zeppelin/spark/SparkShims.java | 20 +++++------- .../apache/zeppelin/spark/Spark1Shims.java | 6 ++-- .../apache/zeppelin/spark/Spark2Shims.java | 6 ++-- 5 files changed, 45 insertions(+), 21 deletions(-) diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java index a704a1d70a8..e6d47f467b2 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java @@ -54,6 +54,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -67,7 +68,7 @@ public class NewSparkInterpreterTest { // catch the interpreter output in onUpdate private InterpreterResultMessageOutput messageOutput; - private RemoteEventClient mockRemoteEventClient; + private RemoteEventClient mockRemoteEventClient = mock(RemoteEventClient.class); @Test public void testSparkInterpreter() throws IOException, InterruptedException, InterpreterException { @@ -158,7 +159,7 @@ public void testSparkInterpreter() throws IOException, InterruptedException, Int result = interpreter.interpret( "case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)\n" + - "val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n" + + "val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n" + " s => Bank(s(0).toInt, \n" + " s(1).replaceAll(\"\\\"\", \"\"),\n" + " s(2).replaceAll(\"\\\"\", \"\"),\n" + @@ -305,7 +306,7 @@ public void run() { interpretThread.start(); boolean nonZeroProgress = false; int progress = 0; - while(interpretThread.isAlive()) { + while (interpretThread.isAlive()) { progress = interpreter.getProgress(context2); assertTrue(progress >= 0); if (progress != 0 && progress != 100) { @@ -422,6 +423,31 @@ public void testDisableReplOutput() throws InterpreterException { assertEquals("hello world", output); } + @Test + public void testDisableSparkUI() throws InterpreterException { + Properties properties = new Properties(); + properties.setProperty("spark.master", "local"); + properties.setProperty("spark.app.name", "test"); + properties.setProperty("zeppelin.spark.maxResult", "100"); + properties.setProperty("zeppelin.spark.test", "true"); + properties.setProperty("zeppelin.spark.useNew", "true"); + properties.setProperty("spark.ui.enabled", "false"); + + interpreter = new SparkInterpreter(properties); + assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter); + interpreter.setInterpreterGroup(mock(InterpreterGroup.class)); + InterpreterContext.set(getInterpreterContext()); + interpreter.open(); + + InterpreterContext context = getInterpreterContext(); + InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + // spark job url is not sent + verify(mockRemoteEventClient, never()).onParaInfosReceived(any(String.class), + any(String.class), any(Map.class)); + } + @After public void tearDown() throws InterpreterException { if (this.interpreter != null) { diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java index 25afd4e1169..154835bd494 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java @@ -129,7 +129,6 @@ public void setUp() { @Test public void runUnerLocalTest() { sparkShims.buildSparkJobUrl("local", "http://sparkurl", 0, mockProperties); - Map mapValue = argumentCaptor.getValue(); assertTrue(mapValue.keySet().contains("jobUrl")); assertTrue(mapValue.get("jobUrl").contains("/jobs/job?id=")); @@ -137,7 +136,6 @@ public void runUnerLocalTest() { @Test public void runUnerYarnTest() { - sparkShims.buildSparkJobUrl("yarn", "http://sparkurl", 0, mockProperties); Map mapValue = argumentCaptor.getValue(); diff --git a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java index 1d7323b0b33..3e877be9086 100644 --- a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java +++ b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java @@ -99,7 +99,6 @@ protected String getParagraphId(String jobgroupId) { protected void buildSparkJobUrl( String master, String sparkWebUrl, int jobId, Properties jobProperties) { String jobGroupId = jobProperties.getProperty("spark.jobGroup.id"); - String uiEnabled = jobProperties.getProperty("spark.ui.enabled"); String jobUrl = sparkWebUrl + "/jobs/job?id=" + jobId; String version = VersionInfo.getVersion(); @@ -109,18 +108,15 @@ protected void buildSparkJobUrl( String noteId = getNoteId(jobGroupId); String paragraphId = getParagraphId(jobGroupId); - // Button visible if Spark UI property not set, set as invalid boolean or true - boolean showSparkUI = uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false"); - if (showSparkUI) { - RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient(); - Map infos = new java.util.HashMap<>(); - infos.put("jobUrl", jobUrl); - infos.put("label", "SPARK JOB"); - infos.put("tooltip", "View in Spark web UI"); - if (eventClient != null) { - eventClient.onParaInfosReceived(noteId, paragraphId, infos); - } + RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient(); + Map infos = new java.util.HashMap<>(); + infos.put("jobUrl", jobUrl); + infos.put("label", "SPARK JOB"); + infos.put("tooltip", "View in Spark web UI"); + if (eventClient != null) { + eventClient.onParaInfosReceived(noteId, paragraphId, infos); } + } /** diff --git a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java index c84218801ac..091eb658cd2 100644 --- a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java +++ b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java @@ -28,11 +28,13 @@ public class Spark1Shims extends SparkShims { public void setupSparkListener(final String master, final String sparkWebUrl) { - SparkContext sc = SparkContext.getOrCreate(); + final SparkContext sc = SparkContext.getOrCreate(); sc.addSparkListener(new JobProgressListener(sc.getConf()) { @Override public void onJobStart(SparkListenerJobStart jobStart) { - buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties()); + if (sc.getConf().getBoolean("spark.ui.enabled", true)) { + buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties()); + } } }); } diff --git a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java index 5a6c5b20fd6..53e714e19d8 100644 --- a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java +++ b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java @@ -28,11 +28,13 @@ public class Spark2Shims extends SparkShims { public void setupSparkListener(final String master, final String sparkWebUrl) { - SparkContext sc = SparkContext.getOrCreate(); + final SparkContext sc = SparkContext.getOrCreate(); sc.addSparkListener(new SparkListener() { @Override public void onJobStart(SparkListenerJobStart jobStart) { - buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties()); + if (sc.getConf().getBoolean("spark.ui.enabled", true)) { + buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties()); + } } }); }