Skip to content

Commit

Permalink
ZEPPELIN-3336. Remove sending spark web url to interpreter setting page
Browse files Browse the repository at this point in the history
### What is this PR for?
This PR  is to remove the spark web url of interpreter setting page. As it doesn't make sense to do that. There would many spark interpreter processes associated with one interpreter setting when it is in isolated mode.

### What type of PR is it?
[Bug Fix ]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3336

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjffdu@apache.org>

Closes #3031 from zjffdu/ZEPPELIN-3336 and squashes the following commits:

e2bf02e [Jeff Zhang] ZEPPELIN-3336. Remove sending spark web url to interpreter setting page
  • Loading branch information
zjffdu committed Jun 26, 2018
1 parent 93531b5 commit adf83a3
Show file tree
Hide file tree
Showing 46 changed files with 18 additions and 1,109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public AbstractSparkInterpreter(Properties properties) {

public abstract JavaSparkContext getJavaSparkContext();

public abstract void populateSparkWebUrl(InterpreterContext ctx);

public abstract SparkZeppelinContext getZeppelinContext();

public abstract String getSparkUIUrl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public BaseZeppelinContext buildZeppelinContext() {
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
InterpreterContext.set(context);
sparkInterpreter.populateSparkWebUrl(context);
String jobGroupId = Utils.buildJobGroupId(context);
String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
String setJobGroupStmt = "sc.setJobGroup('" + jobGroupId + "', '" + jobDesc + "')";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
z.setGui(context.getGui());
z.setNoteGui(context.getNoteGui());
z.setInterpreterContext(context);
populateSparkWebUrl(context);
String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
return innerInterpreter.interpret(st, context);
Expand Down Expand Up @@ -217,27 +216,6 @@ private String extractScalaVersion() throws IOException, InterruptedException {
}
}

public void populateSparkWebUrl(InterpreterContext ctx) {
Map<String, String> infos = new java.util.HashMap<>();
infos.put("url", sparkUrl);
String uiEnabledProp = properties.getProperty("spark.ui.enabled", "true");
java.lang.Boolean uiEnabled = java.lang.Boolean.parseBoolean(
uiEnabledProp.trim());
if (!uiEnabled) {
infos.put("message", "Spark UI disabled");
} else {
if (StringUtils.isNotBlank(sparkUrl)) {
infos.put("message", "Spark UI enabled");
} else {
infos.put("message", "No spark url defined");
}
}
if (ctx != null) {
LOGGER.debug("Sending metadata to Zeppelin server: {}", infos.toString());
ctx.getIntpEventClient().onMetaInfosReceived(infos);
}
}

public boolean isSparkContextInitialized() {
return this.sc != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.StringUtils;
Expand All @@ -42,30 +41,10 @@
import org.apache.spark.SparkEnv;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.repl.SparkILoop;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.Pool;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
import org.apache.spark.scheduler.SparkListenerBlockUpdated;
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
import org.apache.spark.scheduler.SparkListenerTaskStart;
import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.ui.SparkUI;
import org.apache.spark.scheduler.SparkListener;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
Expand All @@ -74,7 +53,6 @@
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.resource.ResourcePool;
Expand All @@ -91,14 +69,8 @@
import scala.None;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.convert.WrapAsJava$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.HashSet;
import scala.reflect.io.AbstractFile;
import scala.tools.nsc.Global;
import scala.tools.nsc.Settings;
Expand Down Expand Up @@ -908,28 +880,6 @@ private Results.Result interpret(String line) {
new Object[] {line});
}

public void populateSparkWebUrl(InterpreterContext ctx) {
sparkUrl = getSparkUIUrl();
Map<String, String> infos = new java.util.HashMap<>();
infos.put("url", sparkUrl);
String uiEnabledProp = getProperty("spark.ui.enabled", "true");
java.lang.Boolean uiEnabled = java.lang.Boolean.parseBoolean(
uiEnabledProp.trim());
if (!uiEnabled) {
infos.put("message", "Spark UI disabled");
} else {
if (StringUtils.isNotBlank(sparkUrl)) {
infos.put("message", "Spark UI enabled");
} else {
infos.put("message", "No spark url defined");
}
}
if (ctx != null) {
logger.info("Sending metadata to Zeppelin server: {}", infos.toString());
ctx.getIntpEventClient().onMetaInfosReceived(infos);
}
}

private List<File> currentClassPath() {
List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
Expand Down Expand Up @@ -1080,7 +1030,6 @@ public InterpreterResult interpret(String line, InterpreterContext context) {
return new InterpreterResult(Code.ERROR, "Spark " + sparkVersion.toString()
+ " is not supported");
}
populateSparkWebUrl(context);
z.setInterpreterContext(context);
if (line == null || line.trim().length() == 0) {
return new InterpreterResult(Code.SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ protected BaseZeppelinContext createZeppelinContext() {
@Override
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
sparkInterpreter.populateSparkWebUrl(context);
return super.interpret(st, context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,6 @@ public JavaSparkContext getJavaSparkContext() {
return delegation.getJavaSparkContext();
}

@Override
public void populateSparkWebUrl(InterpreterContext ctx) {
delegation.populateSparkWebUrl(ctx);
}

@Override
public SparkZeppelinContext getZeppelinContext() {
return delegation.getZeppelinContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public void open() throws InterpreterException {
public InterpreterResult interpret(String lines, InterpreterContext interpreterContext)
throws InterpreterException {

sparkInterpreter.populateSparkWebUrl(interpreterContext);
String jobGroup = Utils.buildJobGroupId(interpreterContext);
String jobDesc = "Started by: " +
Utils.getUserName(interpreterContext.getAuthenticationInfo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public InterpreterResult interpret(String st, InterpreterContext context)
+ sparkInterpreter.getSparkVersion().toString() + " is not supported");
}

sparkInterpreter.populateSparkWebUrl(context);
sparkInterpreter.getZeppelinContext().setInterpreterContext(context);
sqlc = sparkInterpreter.getSQLContext();
SparkContext sc = sqlc.sparkContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ public static void testPySpark(final Interpreter interpreter, RemoteInterpreterE
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
String sparkVersion = context.out.toInterpreterResultMessage().get(0).getData();
// spark url is sent
verify(mockIntpEventClient).onMetaInfosReceived(any(Map.class));

context = createInterpreterContext(mockIntpEventClient);
result = interpreter.interpret("sc.range(1,10).sum()", context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ public void testSparkInterpreter() throws IOException, InterruptedException, Int
InterpreterResult result = interpreter.interpret("val a=\"hello world\"", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals("a: String = hello world\n", output);
// spark web url is sent
verify(mockRemoteEventClient).onMetaInfosReceived(any(Map.class));

result = interpreter.interpret("print(a)", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.resource.LocalResourcePool;
Expand All @@ -43,10 +42,8 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static org.junit.Assert.assertEquals;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ public void testSparkRInterpreter() throws InterpreterException, InterruptedExce
InterpreterResult result = sparkRInterpreter.interpret("1+1", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("2"));
// spark web url is sent
verify(mockRemoteIntpEventClient).onMetaInfosReceived(any(Map.class));

result = sparkRInterpreter.interpret("sparkR.version()", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -32,7 +31,6 @@
import org.apache.hadoop.util.VersionInfo;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -44,7 +42,6 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@

import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -294,14 +294,6 @@ public synchronized void onAppStatusUpdate(String noteId, String paragraphId, St
}
}

public synchronized void onMetaInfosReceived(Map<String, String> infos) {
try {
intpEventServiceClient.sendMetaInfo(intpGroupId, gson.toJson(infos));
} catch (TException e) {
LOGGER.warn("Fail to sendMetaInfo: " + infos, e);
}
}

public synchronized void onParaInfosReceived(Map<String, String> infos) {
try {
intpEventServiceClient.sendParagraphInfo(intpGroupId, gson.toJson(infos));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-5-29")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-6-19")
public class AngularObjectId implements org.apache.thrift.TBase<AngularObjectId, AngularObjectId._Fields>, java.io.Serializable, Cloneable, Comparable<AngularObjectId> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AngularObjectId");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-5-29")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-6-19")
public class AppOutputAppendEvent implements org.apache.thrift.TBase<AppOutputAppendEvent, AppOutputAppendEvent._Fields>, java.io.Serializable, Cloneable, Comparable<AppOutputAppendEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppOutputAppendEvent");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-5-29")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-6-19")
public class AppOutputUpdateEvent implements org.apache.thrift.TBase<AppOutputUpdateEvent, AppOutputUpdateEvent._Fields>, java.io.Serializable, Cloneable, Comparable<AppOutputUpdateEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppOutputUpdateEvent");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-5-29")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-6-19")
public class AppStatusUpdateEvent implements org.apache.thrift.TBase<AppStatusUpdateEvent, AppStatusUpdateEvent._Fields>, java.io.Serializable, Cloneable, Comparable<AppStatusUpdateEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppStatusUpdateEvent");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-5-29")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-6-19")
public class InterpreterCompletion implements org.apache.thrift.TBase<InterpreterCompletion, InterpreterCompletion._Fields>, java.io.Serializable, Cloneable, Comparable<InterpreterCompletion> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-5-29")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-6-19")
public class OutputAppendEvent implements org.apache.thrift.TBase<OutputAppendEvent, OutputAppendEvent._Fields>, java.io.Serializable, Cloneable, Comparable<OutputAppendEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputAppendEvent");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-5-29")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-6-19")
public class OutputUpdateAllEvent implements org.apache.thrift.TBase<OutputUpdateAllEvent, OutputUpdateAllEvent._Fields>, java.io.Serializable, Cloneable, Comparable<OutputUpdateAllEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputUpdateAllEvent");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-5-29")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-6-19")
public class OutputUpdateEvent implements org.apache.thrift.TBase<OutputUpdateEvent, OutputUpdateEvent._Fields>, java.io.Serializable, Cloneable, Comparable<OutputUpdateEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputUpdateEvent");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-5-29")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-6-19")
public class RegisterInfo implements org.apache.thrift.TBase<RegisterInfo, RegisterInfo._Fields>, java.io.Serializable, Cloneable, Comparable<RegisterInfo> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RegisterInfo");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-5-29")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-6-19")
public class RemoteApplicationResult implements org.apache.thrift.TBase<RemoteApplicationResult, RemoteApplicationResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteApplicationResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult");

Expand Down

0 comments on commit adf83a3

Please sign in to comment.