From 4b06a9e667c0ab3d32aa122766998ca2f6d938f7 Mon Sep 17 00:00:00 2001 From: lei wang Date: Tue, 11 Apr 2017 23:18:14 +0800 Subject: [PATCH 1/6] Improvement improve the spark sql interpreter to run paragraph with multi sql statements split by semicolon --- .../zeppelin/spark/SparkSqlInterpreter.java | 62 ++++++++++++++++--- 1 file changed, 54 insertions(+), 8 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index d2de9a18b15..b10610b9694 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -19,10 +19,10 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.List; -import java.util.Properties; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkContext; import org.apache.spark.sql.SQLContext; import org.apache.zeppelin.interpreter.Interpreter; @@ -112,7 +112,11 @@ public InterpreterResult interpret(String st, InterpreterContext context) { // to def sql(sqlText: String): DataFrame (1.3 and later). // Therefore need to use reflection to keep binary compatibility for all spark versions. Method sqlMethod = sqlc.getClass().getMethod("sql", String.class); - rdd = sqlMethod.invoke(sqlc, st); + for (String statement : splitSqlScript(st)) { + if (StringUtils.isNotBlank(statement)) { + rdd = sqlMethod.invoke(sqlc, statement); + } + } } catch (InvocationTargetException ite) { if (Boolean.parseBoolean(getProperty("zeppelin.spark.sql.stacktrace"))) { throw new InterpreterException(ite); @@ -131,11 +135,53 @@ public InterpreterResult interpret(String st, InterpreterContext context) { return new InterpreterResult(Code.SUCCESS, msg); } - @Override - public void cancel(InterpreterContext context) { - SparkInterpreter sparkInterpreter = getSparkInterpreter(); - SQLContext sqlc = sparkInterpreter.getSQLContext(); - SparkContext sc = sqlc.sparkContext(); + //split sql script to single statement list + public static List splitSqlScript(String script) { + List queries = new LinkedList(); + StringBuilder query = new StringBuilder(); + List quoteList = Arrays.asList('\'', '"', '`'); + Stack operatorStack = new Stack(); + char lastCharacter = ' '; + for (char character : script.toCharArray()) { + if (';' == character && lastCharacter != '\\' && operatorStack.isEmpty()) { + if (query.length() > 0) { + queries.add(query.toString()); + query.setLength(0); + } + } else { + query.append(character); + if (operatorStack.isEmpty()) { + if ('-' == character && '-' == lastCharacter) { + operatorStack.push("--"); + } else if (lastCharacter == '/' && '*' == character) { + operatorStack.push("/*"); + } else if (quoteList.contains(character) && lastCharacter != '\\') { + operatorStack.push(String.valueOf(character)); + } + } else { + if ('\n' == character && "--".equals(operatorStack.peek())) { + operatorStack.pop(); + } else if (lastCharacter == '*' && character == '/' + && operatorStack.peek().equals("/*")) { + operatorStack.pop(); + } else if (quoteList.contains(character) + && operatorStack.peek().equals(String.valueOf(character))) { + operatorStack.pop(); + } + } + } + lastCharacter = character; + } + if (query.length() > 0) { + queries.add(query.toString()); + } + return queries; + } + + @Override + public void cancel(InterpreterContext context) { + SQLContext sqlc = getSparkInterpreter().getSQLContext(); + SparkContext sc = sqlc.sparkContext(); sc.cancelJobGroup(Utils.buildJobGroupId(context)); } From fb15033db21d7819ad362b5bddfbdb8cf7da09e8 Mon Sep 17 00:00:00 2001 From: LeiWang Date: Tue, 11 Apr 2017 23:36:51 +0800 Subject: [PATCH 2/6] Update SparkSqlInterpreter.java update code style --- .../zeppelin/spark/SparkSqlInterpreter.java | 233 +++++++++--------- 1 file changed, 117 insertions(+), 116 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index b10610b9694..638179a65f9 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -42,99 +42,100 @@ * Spark SQL interpreter for Zeppelin. */ public class SparkSqlInterpreter extends Interpreter { - private Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class); + private Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class); - public static final String MAX_RESULTS = "zeppelin.spark.maxResult"; + public static final String MAX_RESULTS = "zeppelin.spark.maxResult"; - AtomicInteger num = new AtomicInteger(0); + AtomicInteger num = new AtomicInteger(0); - private int maxResult; + private int maxResult; - public SparkSqlInterpreter(Properties property) { - super(property); - } + public SparkSqlInterpreter(Properties property) { + super(property); + } - @Override - public void open() { - this.maxResult = Integer.parseInt(getProperty(MAX_RESULTS)); - } + @Override + public void open() { + this.maxResult = Integer.parseInt(getProperty(MAX_RESULTS)); + } - private SparkInterpreter getSparkInterpreter() { - LazyOpenInterpreter lazy = null; - SparkInterpreter spark = null; - Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); + private SparkInterpreter getSparkInterpreter() { + LazyOpenInterpreter lazy = null; + SparkInterpreter spark = null; + Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); + + while (p instanceof WrappedInterpreter) { + if (p instanceof LazyOpenInterpreter) { + lazy = (LazyOpenInterpreter) p; + } + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + spark = (SparkInterpreter) p; - while (p instanceof WrappedInterpreter) { - if (p instanceof LazyOpenInterpreter) { - lazy = (LazyOpenInterpreter) p; - } - p = ((WrappedInterpreter) p).getInnerInterpreter(); + if (lazy != null) { + lazy.open(); + } + return spark; } - spark = (SparkInterpreter) p; - if (lazy != null) { - lazy.open(); + public boolean concurrentSQL() { + return Boolean.parseBoolean(getProperty("zeppelin.spark.concurrentSQL")); } - return spark; - } - public boolean concurrentSQL() { - return Boolean.parseBoolean(getProperty("zeppelin.spark.concurrentSQL")); - } + @Override + public void close() { + } - @Override - public void close() {} + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + SQLContext sqlc = null; + SparkInterpreter sparkInterpreter = getSparkInterpreter(); - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - SQLContext sqlc = null; - SparkInterpreter sparkInterpreter = getSparkInterpreter(); + if (sparkInterpreter.isUnsupportedSparkVersion()) { + return new InterpreterResult(Code.ERROR, "Spark " + + sparkInterpreter.getSparkVersion().toString() + " is not supported"); + } - if (sparkInterpreter.isUnsupportedSparkVersion()) { - return new InterpreterResult(Code.ERROR, "Spark " - + sparkInterpreter.getSparkVersion().toString() + " is not supported"); - } + sparkInterpreter.populateSparkWebUrl(context); + sqlc = getSparkInterpreter().getSQLContext(); + SparkContext sc = sqlc.sparkContext(); + if (concurrentSQL()) { + sc.setLocalProperty("spark.scheduler.pool", "fair"); + } else { + sc.setLocalProperty("spark.scheduler.pool", null); + } - sparkInterpreter.populateSparkWebUrl(context); - sqlc = getSparkInterpreter().getSQLContext(); - SparkContext sc = sqlc.sparkContext(); - if (concurrentSQL()) { - sc.setLocalProperty("spark.scheduler.pool", "fair"); - } else { - sc.setLocalProperty("spark.scheduler.pool", null); - } + sc.setJobGroup(Utils.buildJobGroupId(context), "Zeppelin", false); + Object rdd = null; + try { + // method signature of sqlc.sql() is changed + // from def sql(sqlText: String): SchemaRDD (1.2 and prior) + // to def sql(sqlText: String): DataFrame (1.3 and later). + // Therefore need to use reflection to keep binary compatibility for all spark versions. + Method sqlMethod = sqlc.getClass().getMethod("sql", String.class); + for (String statement : splitSqlScript(st)) { + if (StringUtils.isNotBlank(statement)) { + rdd = sqlMethod.invoke(sqlc, statement); + } + } + } catch (InvocationTargetException ite) { + if (Boolean.parseBoolean(getProperty("zeppelin.spark.sql.stacktrace"))) { + throw new InterpreterException(ite); + } + logger.error("Invocation target exception", ite); + String msg = ite.getTargetException().getMessage() + + "\nset zeppelin.spark.sql.stacktrace = true to see full stacktrace"; + return new InterpreterResult(Code.ERROR, msg); + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException e) { + throw new InterpreterException(e); + } - sc.setJobGroup(Utils.buildJobGroupId(context), "Zeppelin", false); - Object rdd = null; - try { - // method signature of sqlc.sql() is changed - // from def sql(sqlText: String): SchemaRDD (1.2 and prior) - // to def sql(sqlText: String): DataFrame (1.3 and later). - // Therefore need to use reflection to keep binary compatibility for all spark versions. - Method sqlMethod = sqlc.getClass().getMethod("sql", String.class); - for (String statement : splitSqlScript(st)) { - if (StringUtils.isNotBlank(statement)) { - rdd = sqlMethod.invoke(sqlc, statement); - } - } - } catch (InvocationTargetException ite) { - if (Boolean.parseBoolean(getProperty("zeppelin.spark.sql.stacktrace"))) { - throw new InterpreterException(ite); - } - logger.error("Invocation target exception", ite); - String msg = ite.getTargetException().getMessage() - + "\nset zeppelin.spark.sql.stacktrace = true to see full stacktrace"; - return new InterpreterResult(Code.ERROR, msg); - } catch (NoSuchMethodException | SecurityException | IllegalAccessException - | IllegalArgumentException e) { - throw new InterpreterException(e); + String msg = ZeppelinContext.showDF(sc, context, rdd, maxResult); + sc.clearJobGroup(); + return new InterpreterResult(Code.SUCCESS, msg); } - String msg = ZeppelinContext.showDF(sc, context, rdd, maxResult); - sc.clearJobGroup(); - return new InterpreterResult(Code.SUCCESS, msg); - } - //split sql script to single statement list public static List splitSqlScript(String script) { List queries = new LinkedList(); @@ -183,47 +184,47 @@ public void cancel(InterpreterContext context) { SQLContext sqlc = getSparkInterpreter().getSQLContext(); SparkContext sc = sqlc.sparkContext(); - sc.cancelJobGroup(Utils.buildJobGroupId(context)); - } - - @Override - public FormType getFormType() { - return FormType.SIMPLE; - } - - - @Override - public int getProgress(InterpreterContext context) { - SparkInterpreter sparkInterpreter = getSparkInterpreter(); - return sparkInterpreter.getProgress(context); - } - - @Override - public Scheduler getScheduler() { - if (concurrentSQL()) { - int maxConcurrency = 10; - return SchedulerFactory.singleton().createOrGetParallelScheduler( - SparkSqlInterpreter.class.getName() + this.hashCode(), maxConcurrency); - } else { - // getSparkInterpreter() calls open() inside. - // That means if SparkInterpreter is not opened, it'll wait until SparkInterpreter open. - // In this moment UI displays 'READY' or 'FINISHED' instead of 'PENDING' or 'RUNNING'. - // It's because of scheduler is not created yet, and scheduler is created by this function. - // Therefore, we can still use getSparkInterpreter() here, but it's better and safe - // to getSparkInterpreter without opening it. - - Interpreter intp = - getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); - if (intp != null) { - return intp.getScheduler(); - } else { - return null; - } + sc.cancelJobGroup(Utils.buildJobGroupId(context)); + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + + @Override + public int getProgress(InterpreterContext context) { + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + return sparkInterpreter.getProgress(context); } - } - @Override - public List completion(String buf, int cursor) { - return null; - } + @Override + public Scheduler getScheduler() { + if (concurrentSQL()) { + int maxConcurrency = 10; + return SchedulerFactory.singleton().createOrGetParallelScheduler( + SparkSqlInterpreter.class.getName() + this.hashCode(), maxConcurrency); + } else { + // getSparkInterpreter() calls open() inside. + // That means if SparkInterpreter is not opened, it'll wait until SparkInterpreter open. + // In this moment UI displays 'READY' or 'FINISHED' instead of 'PENDING' or 'RUNNING'. + // It's because of scheduler is not created yet, and scheduler is created by this function. + // Therefore, we can still use getSparkInterpreter() here, but it's better and safe + // to getSparkInterpreter without opening it. + + Interpreter intp = + getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); + if (intp != null) { + return intp.getScheduler(); + } else { + return null; + } + } + } + + @Override + public List completion(String buf, int cursor) { + return null; + } } From 2196f658aaa2ed240a0a214bf5598549566fdfa6 Mon Sep 17 00:00:00 2001 From: LeiWang Date: Tue, 11 Apr 2017 23:50:33 +0800 Subject: [PATCH 3/6] Update SparkSqlInterpreter.java update code style --- .../zeppelin/spark/SparkSqlInterpreter.java | 318 +++++++++--------- 1 file changed, 158 insertions(+), 160 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index 638179a65f9..e929ac0d4a4 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -42,189 +42,187 @@ * Spark SQL interpreter for Zeppelin. */ public class SparkSqlInterpreter extends Interpreter { - private Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class); + private Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class); - public static final String MAX_RESULTS = "zeppelin.spark.maxResult"; + public static final String MAX_RESULTS = "zeppelin.spark.maxResult"; - AtomicInteger num = new AtomicInteger(0); + AtomicInteger num = new AtomicInteger(0); - private int maxResult; + private int maxResult; - public SparkSqlInterpreter(Properties property) { - super(property); - } - - @Override - public void open() { - this.maxResult = Integer.parseInt(getProperty(MAX_RESULTS)); - } + public SparkSqlInterpreter(Properties property) { + super(property); + } - private SparkInterpreter getSparkInterpreter() { - LazyOpenInterpreter lazy = null; - SparkInterpreter spark = null; - Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); + @Override + public void open() { + this.maxResult = Integer.parseInt(getProperty(MAX_RESULTS)); + } - while (p instanceof WrappedInterpreter) { - if (p instanceof LazyOpenInterpreter) { - lazy = (LazyOpenInterpreter) p; - } - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - spark = (SparkInterpreter) p; - - if (lazy != null) { - lazy.open(); - } - return spark; - } + private SparkInterpreter getSparkInterpreter() { + LazyOpenInterpreter lazy = null; + SparkInterpreter spark = null; + Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); - public boolean concurrentSQL() { - return Boolean.parseBoolean(getProperty("zeppelin.spark.concurrentSQL")); + while (p instanceof WrappedInterpreter) { + if (p instanceof LazyOpenInterpreter) { + lazy = (LazyOpenInterpreter) p; + } + p = ((WrappedInterpreter) p).getInnerInterpreter(); } + spark = (SparkInterpreter) p; - @Override - public void close() { + if (lazy != null) { + lazy.open(); } + return spark; + } - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - SQLContext sqlc = null; - SparkInterpreter sparkInterpreter = getSparkInterpreter(); + public boolean concurrentSQL() { + return Boolean.parseBoolean(getProperty("zeppelin.spark.concurrentSQL")); + } - if (sparkInterpreter.isUnsupportedSparkVersion()) { - return new InterpreterResult(Code.ERROR, "Spark " - + sparkInterpreter.getSparkVersion().toString() + " is not supported"); - } + @Override + public void close() {} - sparkInterpreter.populateSparkWebUrl(context); - sqlc = getSparkInterpreter().getSQLContext(); - SparkContext sc = sqlc.sparkContext(); - if (concurrentSQL()) { - sc.setLocalProperty("spark.scheduler.pool", "fair"); - } else { - sc.setLocalProperty("spark.scheduler.pool", null); - } + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + SQLContext sqlc = null; + SparkInterpreter sparkInterpreter = getSparkInterpreter(); - sc.setJobGroup(Utils.buildJobGroupId(context), "Zeppelin", false); - Object rdd = null; - try { - // method signature of sqlc.sql() is changed - // from def sql(sqlText: String): SchemaRDD (1.2 and prior) - // to def sql(sqlText: String): DataFrame (1.3 and later). - // Therefore need to use reflection to keep binary compatibility for all spark versions. - Method sqlMethod = sqlc.getClass().getMethod("sql", String.class); - for (String statement : splitSqlScript(st)) { - if (StringUtils.isNotBlank(statement)) { - rdd = sqlMethod.invoke(sqlc, statement); - } - } - } catch (InvocationTargetException ite) { - if (Boolean.parseBoolean(getProperty("zeppelin.spark.sql.stacktrace"))) { - throw new InterpreterException(ite); - } - logger.error("Invocation target exception", ite); - String msg = ite.getTargetException().getMessage() - + "\nset zeppelin.spark.sql.stacktrace = true to see full stacktrace"; - return new InterpreterResult(Code.ERROR, msg); - } catch (NoSuchMethodException | SecurityException | IllegalAccessException - | IllegalArgumentException e) { - throw new InterpreterException(e); - } - - String msg = ZeppelinContext.showDF(sc, context, rdd, maxResult); - sc.clearJobGroup(); - return new InterpreterResult(Code.SUCCESS, msg); + if (sparkInterpreter.isUnsupportedSparkVersion()) { + return new InterpreterResult(Code.ERROR, "Spark " + + sparkInterpreter.getSparkVersion().toString() + " is not supported"); } - //split sql script to single statement list - public static List splitSqlScript(String script) { - List queries = new LinkedList(); - StringBuilder query = new StringBuilder(); - List quoteList = Arrays.asList('\'', '"', '`'); - Stack operatorStack = new Stack(); - char lastCharacter = ' '; - for (char character : script.toCharArray()) { - if (';' == character && lastCharacter != '\\' && operatorStack.isEmpty()) { - if (query.length() > 0) { - queries.add(query.toString()); - query.setLength(0); - } - } else { - query.append(character); - if (operatorStack.isEmpty()) { - if ('-' == character && '-' == lastCharacter) { - operatorStack.push("--"); - } else if (lastCharacter == '/' && '*' == character) { - operatorStack.push("/*"); - } else if (quoteList.contains(character) && lastCharacter != '\\') { - operatorStack.push(String.valueOf(character)); - } - } else { - if ('\n' == character && "--".equals(operatorStack.peek())) { - operatorStack.pop(); - } else if (lastCharacter == '*' && character == '/' - && operatorStack.peek().equals("/*")) { - operatorStack.pop(); - } else if (quoteList.contains(character) - && operatorStack.peek().equals(String.valueOf(character))) { - operatorStack.pop(); - } - } - } - lastCharacter = character; - } - if (query.length() > 0) { - queries.add(query.toString()); - } - return queries; - } - - @Override - public void cancel(InterpreterContext context) { - SQLContext sqlc = getSparkInterpreter().getSQLContext(); - SparkContext sc = sqlc.sparkContext(); - - sc.cancelJobGroup(Utils.buildJobGroupId(context)); + sparkInterpreter.populateSparkWebUrl(context); + sqlc = getSparkInterpreter().getSQLContext(); + SparkContext sc = sqlc.sparkContext(); + if (concurrentSQL()) { + sc.setLocalProperty("spark.scheduler.pool", "fair"); + } else { + sc.setLocalProperty("spark.scheduler.pool", null); } - @Override - public FormType getFormType() { - return FormType.SIMPLE; - } - - - @Override - public int getProgress(InterpreterContext context) { - SparkInterpreter sparkInterpreter = getSparkInterpreter(); - return sparkInterpreter.getProgress(context); + sc.setJobGroup(Utils.buildJobGroupId(context), "Zeppelin", false); + Object rdd = null; + try { + // method signature of sqlc.sql() is changed + // from def sql(sqlText: String): SchemaRDD (1.2 and prior) + // to def sql(sqlText: String): DataFrame (1.3 and later). + // Therefore need to use reflection to keep binary compatibility for all spark versions. + Method sqlMethod = sqlc.getClass().getMethod("sql", String.class); + for (String statement : splitSqlScript(st)) { + if (StringUtils.isNotBlank(statement)) { + rdd = sqlMethod.invoke(sqlc, statement); + } + } + } catch (InvocationTargetException ite) { + if (Boolean.parseBoolean(getProperty("zeppelin.spark.sql.stacktrace"))) { + throw new InterpreterException(ite); + } + logger.error("Invocation target exception", ite); + String msg = ite.getTargetException().getMessage() + + "\nset zeppelin.spark.sql.stacktrace = true to see full stacktrace"; + return new InterpreterResult(Code.ERROR, msg); + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException e) { + throw new InterpreterException(e); } - @Override - public Scheduler getScheduler() { - if (concurrentSQL()) { - int maxConcurrency = 10; - return SchedulerFactory.singleton().createOrGetParallelScheduler( - SparkSqlInterpreter.class.getName() + this.hashCode(), maxConcurrency); + String msg = ZeppelinContext.showDF(sc, context, rdd, maxResult); + sc.clearJobGroup(); + return new InterpreterResult(Code.SUCCESS, msg); + } + + //split sql script to single statement list + public static List splitSqlScript(String script) { + List queries = new LinkedList(); + StringBuilder query = new StringBuilder(); + List quoteList = Arrays.asList('\'', '"', '`'); + Stack operatorStack = new Stack(); + char lastCharacter = ' '; + for (char character : script.toCharArray()) { + if (';' == character && lastCharacter != '\\' && operatorStack.isEmpty()) { + if (query.length() > 0) { + queries.add(query.toString()); + query.setLength(0); + } + } else { + query.append(character); + if (operatorStack.isEmpty()) { + if ('-' == character && '-' == lastCharacter) { + operatorStack.push("--"); + } else if (lastCharacter == '/' && '*' == character) { + operatorStack.push("/*"); + } else if (quoteList.contains(character) && lastCharacter != '\\') { + operatorStack.push(String.valueOf(character)); + } } else { - // getSparkInterpreter() calls open() inside. - // That means if SparkInterpreter is not opened, it'll wait until SparkInterpreter open. - // In this moment UI displays 'READY' or 'FINISHED' instead of 'PENDING' or 'RUNNING'. - // It's because of scheduler is not created yet, and scheduler is created by this function. - // Therefore, we can still use getSparkInterpreter() here, but it's better and safe - // to getSparkInterpreter without opening it. - - Interpreter intp = - getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); - if (intp != null) { - return intp.getScheduler(); - } else { - return null; - } + if ('\n' == character && "--".equals(operatorStack.peek())) { + operatorStack.pop(); + } else if (lastCharacter == '*' && character == '/' + && operatorStack.peek().equals("/*")) { + operatorStack.pop(); + } else if (quoteList.contains(character) + && operatorStack.peek().equals(String.valueOf(character))) { + operatorStack.pop(); + } } + } + lastCharacter = character; } - - @Override - public List completion(String buf, int cursor) { + if (query.length() > 0) { + queries.add(query.toString()); + } + return queries; + } + + @Override + public void cancel(InterpreterContext context) { + SQLContext sqlc = getSparkInterpreter().getSQLContext(); + SparkContext sc = sqlc.sparkContext(); + sc.cancelJobGroup(Utils.buildJobGroupId(context)); + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + + @Override + public int getProgress(InterpreterContext context) { + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + return sparkInterpreter.getProgress(context); + } + + @Override + public Scheduler getScheduler() { + if (concurrentSQL()) { + int maxConcurrency = 10; + return SchedulerFactory.singleton().createOrGetParallelScheduler( + SparkSqlInterpreter.class.getName() + this.hashCode(), maxConcurrency); + } else { + // getSparkInterpreter() calls open() inside. + // That means if SparkInterpreter is not opened, it'll wait until SparkInterpreter open. + // In this moment UI displays 'READY' or 'FINISHED' instead of 'PENDING' or 'RUNNING'. + // It's because of scheduler is not created yet, and scheduler is created by this function. + // Therefore, we can still use getSparkInterpreter() here, but it's better and safe + // to getSparkInterpreter without opening it. + + Interpreter intp = + getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); + if (intp != null) { + return intp.getScheduler(); + } else { return null; + } } + } + + @Override + public List completion(String buf, int cursor) { + return null; + } } From 796ff8fb33eae97bfd1a2d1affc7189ff4fc6ffa Mon Sep 17 00:00:00 2001 From: LeiWang Date: Tue, 11 Apr 2017 23:54:02 +0800 Subject: [PATCH 4/6] Update SparkSqlInterpreter.java update code style --- .../java/org/apache/zeppelin/spark/SparkSqlInterpreter.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index e929ac0d4a4..d6c8639b462 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -180,8 +180,10 @@ public static List splitSqlScript(String script) { @Override public void cancel(InterpreterContext context) { - SQLContext sqlc = getSparkInterpreter().getSQLContext(); + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + SQLContext sqlc = sparkInterpreter.getSQLContext(); SparkContext sc = sqlc.sparkContext(); + sc.cancelJobGroup(Utils.buildJobGroupId(context)); } From 2f87249b481b498144a4b6574482be3a31051afe Mon Sep 17 00:00:00 2001 From: LeiWang Date: Tue, 11 Apr 2017 23:57:49 +0800 Subject: [PATCH 5/6] Update SparkSqlInterpreter.java remove comments --- .../main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index d6c8639b462..ac409a95c78 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -135,7 +135,6 @@ public InterpreterResult interpret(String st, InterpreterContext context) { return new InterpreterResult(Code.SUCCESS, msg); } - //split sql script to single statement list public static List splitSqlScript(String script) { List queries = new LinkedList(); StringBuilder query = new StringBuilder(); From f90ee721ade766e2ef4b8ebc450e575778d130cb Mon Sep 17 00:00:00 2001 From: LeiWang Date: Wed, 12 Apr 2017 00:51:30 +0800 Subject: [PATCH 6/6] Update SparkSqlInterpreter.java update code style --- .../java/org/apache/zeppelin/spark/SparkSqlInterpreter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index ac409a95c78..490a64076fc 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -142,7 +142,7 @@ public static List splitSqlScript(String script) { Stack operatorStack = new Stack(); char lastCharacter = ' '; for (char character : script.toCharArray()) { - if (';' == character && lastCharacter != '\\' && operatorStack.isEmpty()) { + if (';' == character && operatorStack.isEmpty()) { if (query.length() > 0) { queries.add(query.toString()); query.setLength(0); @@ -154,7 +154,7 @@ public static List splitSqlScript(String script) { operatorStack.push("--"); } else if (lastCharacter == '/' && '*' == character) { operatorStack.push("/*"); - } else if (quoteList.contains(character) && lastCharacter != '\\') { + } else if (quoteList.contains(character)) { operatorStack.push(String.valueOf(character)); } } else {