Skip to content

Commit

Permalink
ZEPPELIN-3668. Can't hide Spark Jobs (Spark UI) button
Browse files Browse the repository at this point in the history
This is to fix the bug of unable to hide spark jobs.

[Bug Fix]

* [ ] - Task

* https://issues.apache.org/jira/browse/ZEPPELIN-3668

* Unit test added
* Verify it manually as well.

* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjffdu@apache.org>

Closes #3104 from zjffdu/ZEPPELIN-3668 and squashes the following commits:

c7ddecc [Jeff Zhang] ZEPPELIN-3668. Can't hide Spark Jobs (Spark UI) button

(cherry picked from commit 343fd17)
Signed-off-by: Jeff Zhang <zjffdu@apache.org>
  • Loading branch information
zjffdu committed Aug 1, 2018
1 parent 489549f commit b80e179
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 21 deletions.
Expand Up @@ -54,6 +54,7 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;


Expand All @@ -67,7 +68,7 @@ public class NewSparkInterpreterTest {
// catch the interpreter output in onUpdate
private InterpreterResultMessageOutput messageOutput;

private RemoteEventClient mockRemoteEventClient;
private RemoteEventClient mockRemoteEventClient = mock(RemoteEventClient.class);

@Test
public void testSparkInterpreter() throws IOException, InterruptedException, InterpreterException {
Expand Down Expand Up @@ -158,7 +159,7 @@ public void testSparkInterpreter() throws IOException, InterruptedException, Int

result = interpreter.interpret(
"case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)\n" +
"val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n" +
"val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n" +
" s => Bank(s(0).toInt, \n" +
" s(1).replaceAll(\"\\\"\", \"\"),\n" +
" s(2).replaceAll(\"\\\"\", \"\"),\n" +
Expand Down Expand Up @@ -305,7 +306,7 @@ public void run() {
interpretThread.start();
boolean nonZeroProgress = false;
int progress = 0;
while(interpretThread.isAlive()) {
while (interpretThread.isAlive()) {
progress = interpreter.getProgress(context2);
assertTrue(progress >= 0);
if (progress != 0 && progress != 100) {
Expand Down Expand Up @@ -422,6 +423,31 @@ public void testDisableReplOutput() throws InterpreterException {
assertEquals("hello world", output);
}

@Test
public void testDisableSparkUI() throws InterpreterException {
Properties properties = new Properties();
properties.setProperty("spark.master", "local");
properties.setProperty("spark.app.name", "test");
properties.setProperty("zeppelin.spark.maxResult", "100");
properties.setProperty("zeppelin.spark.test", "true");
properties.setProperty("zeppelin.spark.useNew", "true");
properties.setProperty("spark.ui.enabled", "false");

interpreter = new SparkInterpreter(properties);
assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
InterpreterContext.set(getInterpreterContext());
interpreter.open();

InterpreterContext context = getInterpreterContext();
InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());

// spark job url is not sent
verify(mockRemoteEventClient, never()).onParaInfosReceived(any(String.class),
any(String.class), any(Map.class));
}

@After
public void tearDown() throws InterpreterException {
if (this.interpreter != null) {
Expand Down
Expand Up @@ -129,15 +129,13 @@ public void setUp() {
@Test
public void runUnerLocalTest() {
sparkShims.buildSparkJobUrl("local", "http://sparkurl", 0, mockProperties);

Map<String, String> mapValue = argumentCaptor.getValue();
assertTrue(mapValue.keySet().contains("jobUrl"));
assertTrue(mapValue.get("jobUrl").contains("/jobs/job?id="));
}

@Test
public void runUnerYarnTest() {

sparkShims.buildSparkJobUrl("yarn", "http://sparkurl", 0, mockProperties);

Map<String, String> mapValue = argumentCaptor.getValue();
Expand Down
Expand Up @@ -99,7 +99,6 @@ protected String getParagraphId(String jobgroupId) {
protected void buildSparkJobUrl(
String master, String sparkWebUrl, int jobId, Properties jobProperties) {
String jobGroupId = jobProperties.getProperty("spark.jobGroup.id");
String uiEnabled = jobProperties.getProperty("spark.ui.enabled");
String jobUrl = sparkWebUrl + "/jobs/job?id=" + jobId;

String version = VersionInfo.getVersion();
Expand All @@ -109,18 +108,15 @@ protected void buildSparkJobUrl(

String noteId = getNoteId(jobGroupId);
String paragraphId = getParagraphId(jobGroupId);
// Button visible if Spark UI property not set, set as invalid boolean or true
boolean showSparkUI = uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false");
if (showSparkUI) {
RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient();
Map<String, String> infos = new java.util.HashMap<>();
infos.put("jobUrl", jobUrl);
infos.put("label", "SPARK JOB");
infos.put("tooltip", "View in Spark web UI");
if (eventClient != null) {
eventClient.onParaInfosReceived(noteId, paragraphId, infos);
}
RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient();
Map<String, String> infos = new java.util.HashMap<>();
infos.put("jobUrl", jobUrl);
infos.put("label", "SPARK JOB");
infos.put("tooltip", "View in Spark web UI");
if (eventClient != null) {
eventClient.onParaInfosReceived(noteId, paragraphId, infos);
}

}

/**
Expand Down
Expand Up @@ -28,11 +28,13 @@
public class Spark1Shims extends SparkShims {

public void setupSparkListener(final String master, final String sparkWebUrl) {
SparkContext sc = SparkContext.getOrCreate();
final SparkContext sc = SparkContext.getOrCreate();
sc.addSparkListener(new JobProgressListener(sc.getConf()) {
@Override
public void onJobStart(SparkListenerJobStart jobStart) {
buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties());
if (sc.getConf().getBoolean("spark.ui.enabled", true)) {
buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties());
}
}
});
}
Expand Down
Expand Up @@ -28,11 +28,13 @@
public class Spark2Shims extends SparkShims {

public void setupSparkListener(final String master, final String sparkWebUrl) {
SparkContext sc = SparkContext.getOrCreate();
final SparkContext sc = SparkContext.getOrCreate();
sc.addSparkListener(new SparkListener() {
@Override
public void onJobStart(SparkListenerJobStart jobStart) {
buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties());
if (sc.getConf().getBoolean("spark.ui.enabled", true)) {
buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties());
}
}
});
}
Expand Down

0 comments on commit b80e179

Please sign in to comment.