From 2b9fe2ca8b2e32864c7c59291709d330b00a92c7 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Fri, 8 Apr 2022 00:05:00 +0800 Subject: [PATCH] update --- spark/README.md | 18 +++++++ .../spark/AbstractSparkScalaInterpreter.java | 50 ++++++++++++++++--- .../zeppelin/spark/JobProgressUtil.java | 37 ++++++++++++++ .../zeppelin/spark/SparkInterpreter.java | 2 +- .../zeppelin/spark/JobProgressUtil.scala | 49 ------------------ .../spark/SparkScala211Interpreter.scala | 4 -- .../zeppelin/spark/JobProgressUtil.scala | 49 ------------------ .../spark/SparkScala212Interpreter.scala | 4 -- .../zeppelin/spark/JobProgressUtil.scala | 49 ------------------ .../spark/SparkScala213Interpreter.scala | 4 -- 10 files changed, 98 insertions(+), 168 deletions(-) create mode 100644 spark/README.md create mode 100644 spark/interpreter/src/main/java/org/apache/zeppelin/spark/JobProgressUtil.java delete mode 100644 spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala delete mode 100644 spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala delete mode 100644 spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala diff --git a/spark/README.md b/spark/README.md new file mode 100644 index 00000000000..ceec7ecabd0 --- /dev/null +++ b/spark/README.md @@ -0,0 +1,18 @@ +# Spark Interpreter + +Spark interpreter is the first and most important interpreter of Zeppelin. It supports multiple versions of Spark and multiple versions of Scala. + + +# Module structure of Spark interpreter + + +* interpreter - This module is the entry module of Spark interpreter. All the interpreter interfaces are defined here, but the implementation will be delegated to the scala-xxx module depends on the Scala version of current Spark. +* spark-scala-parent - Parent module for each scala module +* scala-2.11 - Scala module for scala 2.11 +* scala-2.12 +* scala-2.13 +* spark-shims +* spark2-shims +* spark3-shims + +# How to build Spark interpreter diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java index c057e9f61fc..aefc31195c9 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java @@ -19,17 +19,24 @@ import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; +import org.apache.spark.SparkJobInfo; +import org.apache.spark.SparkStageInfo; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.kotlin.KotlinInterpreter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.net.URLClassLoader; +import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * This is bridge class which bridge the communication between java side and scala side. @@ -37,7 +44,8 @@ */ public abstract class AbstractSparkScalaInterpreter { - private static AtomicInteger SESSION_NUM = new AtomicInteger(0); + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSparkScalaInterpreter.class); + private static final AtomicInteger SESSION_NUM = new AtomicInteger(0); protected SparkConf conf; protected SparkContext sc; @@ -92,13 +100,6 @@ protected void open() { public abstract void close() throws InterpreterException; - public int getProgress(InterpreterContext context) throws InterpreterException { - return getProgress(Utils.buildJobGroupId(context), context); - } - - public abstract int getProgress(String jobGroup, - InterpreterContext context) throws InterpreterException; - public void cancel(InterpreterContext context) throws InterpreterException { getSparkContext().cancelJobGroup(Utils.buildJobGroupId(context)); } @@ -118,4 +119,37 @@ public abstract List completion(String buf, InterpreterContext interpreterContext); public abstract ClassLoader getScalaShellClassLoader(); + + public int getProgress(InterpreterContext context) throws InterpreterException { + String jobGroup = Utils.buildJobGroupId(context); + // Each paragraph has one unique jobGroup, and one paragraph may run multiple times. + // So only look for the first job which match the jobGroup + Optional jobInfoOptional = Arrays.stream(sc.statusTracker().getJobIdsForGroup(jobGroup)) + .mapToObj(jobId -> sc.statusTracker().getJobInfo(jobId)) + .filter(jobInfo -> jobInfo.isDefined()) + .map(jobInfo -> jobInfo.get()) + .findFirst(); + if (jobInfoOptional.isPresent()) { + List stageInfoList = Arrays.stream(jobInfoOptional.get().stageIds()) + .mapToObj(stageId -> sc.statusTracker().getStageInfo(stageId)) + .filter(stageInfo -> stageInfo.isDefined()) + .map(stageInfo -> stageInfo.get()) + .collect(Collectors.toList()); + int taskCount = stageInfoList.stream() + .map(stageInfo -> stageInfo.numTasks()) + .collect(Collectors.summingInt(Integer::intValue)); + int completedTaskCount = stageInfoList.stream() + .map(stageInfo -> stageInfo.numCompletedTasks()) + .collect(Collectors.summingInt(Integer::intValue)); + LOGGER.debug("Total TaskCount: " + taskCount); + LOGGER.debug("Completed TaskCount: " + completedTaskCount); + if (taskCount == 0) { + return 0; + } else { + return 100 * completedTaskCount / taskCount; + } + } else { + return 0; + } + } } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/JobProgressUtil.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/JobProgressUtil.java new file mode 100644 index 00000000000..e4547e53aad --- /dev/null +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/JobProgressUtil.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark; + +import org.apache.spark.SparkContext; +import org.apache.spark.SparkJobInfo; +import org.apache.spark.SparkStageInfo; +import org.apache.spark.scheduler.StageInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + + +public class JobProgressUtil { + private static final Logger LOGGER = LoggerFactory.getLogger(JobProgressUtil.class); + + +} diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 7701ebf52d0..8696bf3f72a 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -228,7 +228,7 @@ public FormType getFormType() { @Override public int getProgress(InterpreterContext context) throws InterpreterException { - return innerInterpreter.getProgress(Utils.buildJobGroupId(context), context); + return innerInterpreter.getProgress(context); } public ZeppelinContext getZeppelinContext() { diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala deleted file mode 100644 index 79018c89a0f..00000000000 --- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark - -import org.apache.spark.SparkContext -import org.slf4j.{Logger, LoggerFactory} - -object JobProgressUtil { - - protected lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) - - def progress(sc: SparkContext, jobGroup : String):Int = { - // Each paragraph has one unique jobGroup, and one paragraph may run multiple times. - // So only look for the first job which match the jobGroup - val jobInfo = sc.statusTracker - .getJobIdsForGroup(jobGroup) - .headOption - .flatMap(jobId => sc.statusTracker.getJobInfo(jobId)) - val stagesInfoOption = jobInfo.flatMap( jobInfo => Some(jobInfo.stageIds().flatMap(sc.statusTracker.getStageInfo))) - stagesInfoOption match { - case None => 0 - case Some(stagesInfo) => - val taskCount = stagesInfo.map(_.numTasks).sum - val completedTaskCount = stagesInfo.map(_.numCompletedTasks).sum - LOGGER.debug("Total TaskCount: " + taskCount) - LOGGER.debug("Completed TaskCount: " + completedTaskCount) - if (taskCount == 0) { - 0 - } else { - (100 * completedTaskCount.toDouble / taskCount).toInt - } - } - } -} diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala index cb8e99da4fd..01b59e34874 100644 --- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala +++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala @@ -160,10 +160,6 @@ class SparkScala211Interpreter(conf: SparkConf, } } - override def getProgress(jobGroup: String, context: InterpreterContext): Int = { - JobProgressUtil.progress(sc, jobGroup) - } - // for use in java side private def bind(name: String, tpe: String, diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala deleted file mode 100644 index 79018c89a0f..00000000000 --- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark - -import org.apache.spark.SparkContext -import org.slf4j.{Logger, LoggerFactory} - -object JobProgressUtil { - - protected lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) - - def progress(sc: SparkContext, jobGroup : String):Int = { - // Each paragraph has one unique jobGroup, and one paragraph may run multiple times. - // So only look for the first job which match the jobGroup - val jobInfo = sc.statusTracker - .getJobIdsForGroup(jobGroup) - .headOption - .flatMap(jobId => sc.statusTracker.getJobInfo(jobId)) - val stagesInfoOption = jobInfo.flatMap( jobInfo => Some(jobInfo.stageIds().flatMap(sc.statusTracker.getStageInfo))) - stagesInfoOption match { - case None => 0 - case Some(stagesInfo) => - val taskCount = stagesInfo.map(_.numTasks).sum - val completedTaskCount = stagesInfo.map(_.numCompletedTasks).sum - LOGGER.debug("Total TaskCount: " + taskCount) - LOGGER.debug("Completed TaskCount: " + completedTaskCount) - if (taskCount == 0) { - 0 - } else { - (100 * completedTaskCount.toDouble / taskCount).toInt - } - } - } -} diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala index 1d3870878cb..c6d9c5bd431 100644 --- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala +++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala @@ -163,10 +163,6 @@ class SparkScala212Interpreter(conf: SparkConf, def interpret(code: String): InterpreterResult = interpret(code, InterpreterContext.get()) - override def getProgress(jobGroup: String, context: InterpreterContext): Int = { - JobProgressUtil.progress(sc, jobGroup) - } - // for use in java side private def bind(name: String, tpe: String, diff --git a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala deleted file mode 100644 index 79018c89a0f..00000000000 --- a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark - -import org.apache.spark.SparkContext -import org.slf4j.{Logger, LoggerFactory} - -object JobProgressUtil { - - protected lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) - - def progress(sc: SparkContext, jobGroup : String):Int = { - // Each paragraph has one unique jobGroup, and one paragraph may run multiple times. - // So only look for the first job which match the jobGroup - val jobInfo = sc.statusTracker - .getJobIdsForGroup(jobGroup) - .headOption - .flatMap(jobId => sc.statusTracker.getJobInfo(jobId)) - val stagesInfoOption = jobInfo.flatMap( jobInfo => Some(jobInfo.stageIds().flatMap(sc.statusTracker.getStageInfo))) - stagesInfoOption match { - case None => 0 - case Some(stagesInfo) => - val taskCount = stagesInfo.map(_.numTasks).sum - val completedTaskCount = stagesInfo.map(_.numCompletedTasks).sum - LOGGER.debug("Total TaskCount: " + taskCount) - LOGGER.debug("Completed TaskCount: " + completedTaskCount) - if (taskCount == 0) { - 0 - } else { - (100 * completedTaskCount.toDouble / taskCount).toInt - } - } - } -} diff --git a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala index f279e7ed16a..4eeccff881d 100644 --- a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala +++ b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala @@ -159,10 +159,6 @@ class SparkScala213Interpreter(conf: SparkConf, } } - override def getProgress(jobGroup: String, context: InterpreterContext): Int = { - JobProgressUtil.progress(sc, jobGroup) - } - // for use in java side private def bind(name: String, tpe: String,