Skip to content

Commit

Permalink
ZEPPELIN-3668. Can't hide Spark Jobs (Spark UI) button
Browse files Browse the repository at this point in the history
  • Loading branch information
zjffdu committed Jul 31, 2018
1 parent 76e0027 commit fff4f96
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 25 deletions.
Expand Up @@ -34,6 +34,7 @@
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

Expand All @@ -44,7 +45,6 @@
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand All @@ -53,6 +53,7 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;


Expand All @@ -66,7 +67,7 @@ public class NewSparkInterpreterTest {
// catch the interpreter output in onUpdate
private InterpreterResultMessageOutput messageOutput;

private RemoteInterpreterEventClient mockRemoteEventClient = mock(RemoteInterpreterEventClient.class);
private RemoteInterpreterEventClient mockRemoteEventClient;

@Test
public void testSparkInterpreter() throws IOException, InterruptedException, InterpreterException {
Expand Down Expand Up @@ -163,7 +164,7 @@ public void testSparkInterpreter() throws IOException, InterruptedException, Int

result = interpreter.interpret(
"case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)\n" +
"val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n" +
"val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n" +
" s => Bank(s(0).toInt, \n" +
" s(1).replaceAll(\"\\\"\", \"\"),\n" +
" s(2).replaceAll(\"\\\"\", \"\"),\n" +
Expand All @@ -188,7 +189,7 @@ public void testSparkInterpreter() throws IOException, InterruptedException, Int
"df.show()", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(output.contains(
"+---+----+\n" +
"+---+----+\n" +
"| _1| _2|\n" +
"+---+----+\n" +
"| 1| a|\n" +
Expand All @@ -203,7 +204,7 @@ public void testSparkInterpreter() throws IOException, InterruptedException, Int
"df.show()", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(output.contains(
"+---+----+\n" +
"+---+----+\n" +
"| _1| _2|\n" +
"+---+----+\n" +
"| 1| a|\n" +
Expand Down Expand Up @@ -318,7 +319,7 @@ public void run() {
interpretThread.start();
boolean nonZeroProgress = false;
int progress = 0;
while(interpretThread.isAlive()) {
while (interpretThread.isAlive()) {
progress = interpreter.getProgress(context2);
assertTrue(progress >= 0);
if (progress != 0 && progress != 100) {
Expand Down Expand Up @@ -463,6 +464,35 @@ public void testSchedulePool() throws InterpreterException {
assertEquals(null, interpreter.getSparkContext().getLocalProperty("spark.scheduler.pool"));
}

@Test
public void testDisableSparkUI() throws InterpreterException {
Properties properties = new Properties();
properties.setProperty("spark.master", "local");
properties.setProperty("spark.app.name", "test");
properties.setProperty("zeppelin.spark.maxResult", "100");
properties.setProperty("zeppelin.spark.test", "true");
properties.setProperty("zeppelin.spark.useNew", "true");
properties.setProperty("spark.ui.enabled", "false");

interpreter = new SparkInterpreter(properties);
assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
InterpreterContext.set(getInterpreterContext());
interpreter.open();

InterpreterContext context = getInterpreterContext();
InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());

// spark job url is not sent
verify(mockRemoteEventClient, never()).onParaInfosReceived(any(Map.class));
}

@Before
public void setUp() {
mockRemoteEventClient = mock(RemoteInterpreterEventClient.class);
}

@After
public void tearDown() throws InterpreterException {
if (this.interpreter != null) {
Expand Down
Expand Up @@ -108,7 +108,6 @@ public String showDataFrame(Object obj, int maxResult) {
@PrepareForTest({BaseZeppelinContext.class, VersionInfo.class})
@PowerMockIgnore({"javax.net.*", "javax.security.*"})
public static class SingleTests {
@Mock Properties mockProperties;
@Captor ArgumentCaptor<Map<String, String>> argumentCaptor;

SparkShims sparkShims;
Expand All @@ -130,7 +129,7 @@ public void setUp() {

@Test
public void runUnderLocalTest() {
sparkShims.buildSparkJobUrl("local", "http://sparkurl", 0, mockProperties, mockContext);
sparkShims.buildSparkJobUrl("local", "http://sparkurl", 0, mockContext);

Map<String, String> mapValue = argumentCaptor.getValue();
assertTrue(mapValue.keySet().contains("jobUrl"));
Expand All @@ -140,7 +139,7 @@ public void runUnderLocalTest() {
@Test
public void runUnderYarnTest() {

sparkShims.buildSparkJobUrl("yarn", "http://sparkurl", 0, mockProperties, mockContext);
sparkShims.buildSparkJobUrl("yarn", "http://sparkurl", 0, mockContext);

Map<String, String> mapValue = argumentCaptor.getValue();
assertTrue(mapValue.keySet().contains("jobUrl"));
Expand Down
Expand Up @@ -104,26 +104,20 @@ protected String getParagraphId(String jobgroupId) {
protected void buildSparkJobUrl(String master,
String sparkWebUrl,
int jobId,
Properties jobProperties,
InterpreterContext context) {
String uiEnabled = jobProperties.getProperty("spark.ui.enabled");
String jobUrl = sparkWebUrl + "/jobs/job?id=" + jobId;
// Button visible if Spark UI property not set, set as invalid boolean or true
boolean showSparkUI =
uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false");
String version = VersionInfo.getVersion();
if (master.toLowerCase().contains("yarn") && !supportYarn6615(version)) {
jobUrl = sparkWebUrl + "/jobs";
}
if (showSparkUI && jobUrl != null) {
Map<String, String> infos = new java.util.HashMap<String, String>();
infos.put("jobUrl", jobUrl);
infos.put("label", "SPARK JOB");
infos.put("tooltip", "View in Spark web UI");
infos.put("noteId", context.getNoteId());
infos.put("paraId", context.getParagraphId());
context.getIntpEventClient().onParaInfosReceived(infos);
}

Map<String, String> infos = new java.util.HashMap<String, String>();
infos.put("jobUrl", jobUrl);
infos.put("label", "SPARK JOB");
infos.put("tooltip", "View in Spark web UI");
infos.put("noteId", context.getNoteId());
infos.put("paraId", context.getParagraphId());
context.getIntpEventClient().onParaInfosReceived(infos);
}

/**
Expand Down
Expand Up @@ -38,7 +38,7 @@ public void setupSparkListener(final String master,
sc.addSparkListener(new JobProgressListener(sc.getConf()) {
@Override
public void onJobStart(SparkListenerJobStart jobStart) {
buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties(), context);
buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), context);
}
});
}
Expand Down
Expand Up @@ -39,7 +39,9 @@ public void setupSparkListener(final String master,
sc.addSparkListener(new SparkListener() {
@Override
public void onJobStart(SparkListenerJobStart jobStart) {
buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties(), context);
if (sc.getConf().getBoolean("spark.ui.enabled", true)) {
buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), context);
}
}
});
}
Expand Down

0 comments on commit fff4f96

Please sign in to comment.