Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
zjffdu committed Apr 11, 2022
1 parent 0ad77e0 commit 1131564
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 446 deletions.
22 changes: 16 additions & 6 deletions spark/README.md
Expand Up @@ -5,14 +5,24 @@ Spark interpreter is the first and most important interpreter of Zeppelin. It su

# Module structure of Spark interpreter


* interpreter - This module is the entry module of Spark interpreter. All the interpreter interfaces are defined here, but the implementation will be delegated to the scala-xxx module depends on the Scala version of current Spark.
* spark-scala-parent - Parent module for each scala module
* scala-2.11 - Scala module for scala 2.11
* interpreter
- This module is the entry module of Spark interpreter. All the interpreters are defined here. SparkInterpreter is the most important one,
SparkContext/SparkSession is created here, other interpreters (PySparkInterpreter,IPySparkInterpreter, SparkRInterpreter and etc) are all depends on SparkInterpreter.
Due to incompatibility between Scala versions, there are several scala-x modules for each supported Scala version.
Due to incompatibility between Spark versions, there are several spark-shims modules for each supported Spark version.
* spark-scala-parent
- Parent module for each Scala module
* scala-2.11
- Scala module for Scala 2.11
* scala-2.12
- Scala module for Scala 2.12
* scala-2.13
* spark-shims
- Scala module for Scala 2.13
* spark-shims
- Parent module for each Spark module
* spark2-shims
- Shims module for Spark2
* spark3-shims
- Shims module for Spark3


# How to build Spark interpreter
Expand Up @@ -79,7 +79,7 @@ public SQLContext getSqlContext() {
return this.sqlContext;
}

public Object getSparkSession() {
public SparkSession getSparkSession() {
return this.sparkSession;
}

Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.zeppelin.interpreter.AbstractInterpreter;
import org.apache.zeppelin.interpreter.ZeppelinContext;
import org.apache.zeppelin.interpreter.InterpreterContext;
Expand Down Expand Up @@ -73,7 +74,7 @@ public class SparkInterpreter extends AbstractInterpreter {
private SparkContext sc;
private JavaSparkContext jsc;
private SQLContext sqlContext;
private Object sparkSession;
private SparkSession sparkSession;

private SparkVersion sparkVersion;
private String scalaVersion;
Expand Down Expand Up @@ -187,14 +188,14 @@ private AbstractSparkScalaInterpreter loadSparkScalaInterpreter(SparkConf conf)
.newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), innerInterpreterClazz.getClassLoader(), scalaShellOutputDir);
}

@Override
@Override
public void close() throws InterpreterException {
LOGGER.info("Close SparkInterpreter");
if (SESSION_NUM.decrementAndGet() == 0 && innerInterpreter != null) {
innerInterpreter.close();
innerInterpreterClazz = null;
}
innerInterpreter = null;
innerInterpreter = null;
}

@Override
Expand Down Expand Up @@ -248,22 +249,15 @@ public SparkContext getSparkContext() {
return this.sc;
}

/**
* Must use Object, because the its api signature in Spark 1.x is different from
* that of Spark 2.x.
* e.g. SqlContext.sql(sql) return different type.
*
* @return
*/
public Object getSQLContext() {
public SQLContext getSQLContext() {
return sqlContext;
}

public JavaSparkContext getJavaSparkContext() {
return this.jsc;
}

public Object getSparkSession() {
public SparkSession getSparkSession() {
return sparkSession;
}

Expand Down Expand Up @@ -297,11 +291,11 @@ private String extractScalaVersion(SparkConf conf) throws InterpreterException {
}
}

public boolean isScala211() throws InterpreterException {
public boolean isScala211() {
return scalaVersion.equals("2.11");
}

public boolean isScala212() throws InterpreterException {
public boolean isScala212() {
return scalaVersion.equals("2.12");
}

Expand Down
Expand Up @@ -148,9 +148,4 @@ public ZeppelinContext getZeppelinContext() {
return sparkInterpreter.getZeppelinContext();
}

@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
return new ArrayList<>();
}
}
Expand Up @@ -20,6 +20,7 @@
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SQLContext;
import org.apache.zeppelin.interpreter.AbstractInterpreter;
import org.apache.zeppelin.interpreter.ZeppelinContext;
import org.apache.zeppelin.interpreter.InterpreterContext;
Expand Down Expand Up @@ -57,7 +58,7 @@ public void open() throws InterpreterException {
this.sqlSplitter = new SqlSplitter();
}

public boolean concurrentSQL() {
private boolean concurrentSQL() {
return Boolean.parseBoolean(getProperty("zeppelin.spark.concurrentSQL"));
}

Expand All @@ -83,7 +84,7 @@ public InterpreterResult internalInterpret(String st, InterpreterContext context
}
Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
sparkInterpreter.getZeppelinContext().setInterpreterContext(context);
Object sqlContext = sparkInterpreter.getSQLContext();
SQLContext sqlContext = sparkInterpreter.getSQLContext();
SparkContext sc = sparkInterpreter.getSparkContext();

List<String> sqls = sqlSplitter.splitSql(st);
Expand All @@ -99,11 +100,10 @@ public InterpreterResult internalInterpret(String st, InterpreterContext context
// TODO(zjffdu) scala 2.12,2.13 still doesn't work for codegen (ZEPPELIN-4627)
Thread.currentThread().setContextClassLoader(sparkInterpreter.getScalaShellClassLoader());
}
Method method = sqlContext.getClass().getMethod("sql", String.class);
for (String sql : sqls) {
curSql = sql;
String result = sparkInterpreter.getZeppelinContext()
.showData(method.invoke(sqlContext, sql), maxResult);
.showData(sqlContext.sql(sql), maxResult);
context.out.write(result);
}
context.out.flush();
Expand Down Expand Up @@ -161,7 +161,6 @@ public FormType getFormType() {
return FormType.SIMPLE;
}


@Override
public int getProgress(InterpreterContext context) throws InterpreterException {
return sparkInterpreter.getProgress(context);
Expand Down
Expand Up @@ -27,83 +27,17 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Utility and helper functions for the Spark Interpreter
*/
class Utils {
public static Logger logger = LoggerFactory.getLogger(Utils.class);
public static String DEPRRECATED_MESSAGE =
private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class);
private static String DEPRECATED_MESSAGE =
"%html <font color=\"red\">Spark lower than 2.2 is deprecated, " +
"if you don't want to see this message, please set " +
"zeppelin.spark.deprecateMsg.show to false.</font>";

static Object invokeMethod(Object o, String name) {
return invokeMethod(o, name, new Class[]{}, new Object[]{});
}

static Object invokeMethod(Object o, String name, Class<?>[] argTypes, Object[] params) {
try {
return o.getClass().getMethod(name, argTypes).invoke(o, params);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
logger.error(e.getMessage(), e);
}
return null;
}

static Object invokeStaticMethod(Class<?> c, String name, Class<?>[] argTypes, Object[] params) {
try {
return c.getMethod(name, argTypes).invoke(null, params);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
logger.error(e.getMessage(), e);
}
return null;
}

static Object invokeStaticMethod(Class<?> c, String name) {
return invokeStaticMethod(c, name, new Class[]{}, new Object[]{});
}

static Class<?> findClass(String name) {
return findClass(name, false);
}

static Class<?> findClass(String name, boolean silence) {
try {
return Class.forName(name);
} catch (ClassNotFoundException e) {
if (!silence) {
logger.error(e.getMessage(), e);
}
return null;
}
}

static Object instantiateClass(String name, Class<?>[] argTypes, Object[] params) {
try {
Constructor<?> constructor = Utils.class.getClassLoader()
.loadClass(name).getConstructor(argTypes);
return constructor.newInstance(params);
} catch (NoSuchMethodException | ClassNotFoundException | IllegalAccessException |
InstantiationException | InvocationTargetException e) {
logger.error(e.getMessage(), e);
}
return null;
}

// function works after intp is initialized
static boolean isScala2_10() {
try {
Class.forName("org.apache.spark.repl.SparkIMain");
return true;
} catch (ClassNotFoundException e) {
return false;
} catch (IncompatibleClassChangeError e) {
return false;
}
}

public static String buildJobGroupId(InterpreterContext context) {
String uName = "anonymous";
Expand Down Expand Up @@ -136,7 +70,7 @@ public static void printDeprecateMessage(SparkVersion sparkVersion,
&& Boolean.parseBoolean(
properties.getProperty("zeppelin.spark.deprecatedMsg.show", "true"))) {
try {
context.out.write(DEPRRECATED_MESSAGE);
context.out.write(DEPRECATED_MESSAGE);
context.out.write("%text ");
} catch (IOException e) {
throw new InterpreterException(e);
Expand Down
Expand Up @@ -139,7 +139,7 @@ class SparkScala211Interpreter(conf: SparkConf,
@throws[InterpreterException]
def scalaInterpretQuietly(code: String): Unit = {
scalaInterpret(code) match {
case success@scala.tools.nsc.interpreter.Results.Success =>
case scala.tools.nsc.interpreter.Results.Success =>
// do nothing
case scala.tools.nsc.interpreter.Results.Error =>
throw new InterpreterException("Fail to run code: " + code)
Expand Down
Expand Up @@ -140,7 +140,7 @@ class SparkScala212Interpreter(conf: SparkConf,
@throws[InterpreterException]
def scalaInterpretQuietly(code: String): Unit = {
scalaInterpret(code) match {
case success@scala.tools.nsc.interpreter.Results.Success =>
case scala.tools.nsc.interpreter.Results.Success =>
// do nothing
case scala.tools.nsc.interpreter.Results.Error =>
throw new InterpreterException("Fail to run code: " + code)
Expand Down
Expand Up @@ -114,7 +114,7 @@ class SparkScala213Interpreter(conf: SparkConf,
@throws[InterpreterException]
def scalaInterpretQuietly(code: String): Unit = {
scalaInterpret(code) match {
case success@scala.tools.nsc.interpreter.Results.Success =>
case scala.tools.nsc.interpreter.Results.Success =>
// do nothing
case scala.tools.nsc.interpreter.Results.Error =>
throw new InterpreterException("Fail to run code: " + code)
Expand Down

0 comments on commit 1131564

Please sign in to comment.