Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zjffdu committed Apr 7, 2022
1 parent 3aab59b commit 2b9fe2c
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 168 deletions.
18 changes: 18 additions & 0 deletions spark/README.md
@@ -0,0 +1,18 @@
# Spark Interpreter

Spark interpreter is the first and most important interpreter of Zeppelin. It supports multiple versions of Spark and multiple versions of Scala.


# Module structure of Spark interpreter


* interpreter - This module is the entry module of Spark interpreter. All the interpreter interfaces are defined here, but the implementation will be delegated to the scala-xxx module depends on the Scala version of current Spark.
* spark-scala-parent - Parent module for each scala module
* scala-2.11 - Scala module for scala 2.11
* scala-2.12
* scala-2.13
* spark-shims
* spark2-shims
* spark3-shims

# How to build Spark interpreter
Expand Up @@ -19,25 +19,33 @@

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkJobInfo;
import org.apache.spark.SparkStageInfo;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.kotlin.KotlinInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
* This is bridge class which bridge the communication between java side and scala side.
* Java side reply on this abstract class which is implemented by different scala versions.
*/
public abstract class AbstractSparkScalaInterpreter {

private static AtomicInteger SESSION_NUM = new AtomicInteger(0);
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSparkScalaInterpreter.class);
private static final AtomicInteger SESSION_NUM = new AtomicInteger(0);

protected SparkConf conf;
protected SparkContext sc;
Expand Down Expand Up @@ -92,13 +100,6 @@ protected void open() {

public abstract void close() throws InterpreterException;

public int getProgress(InterpreterContext context) throws InterpreterException {
return getProgress(Utils.buildJobGroupId(context), context);
}

public abstract int getProgress(String jobGroup,
InterpreterContext context) throws InterpreterException;

public void cancel(InterpreterContext context) throws InterpreterException {
getSparkContext().cancelJobGroup(Utils.buildJobGroupId(context));
}
Expand All @@ -118,4 +119,37 @@ public abstract List<InterpreterCompletion> completion(String buf,
InterpreterContext interpreterContext);

public abstract ClassLoader getScalaShellClassLoader();

public int getProgress(InterpreterContext context) throws InterpreterException {
String jobGroup = Utils.buildJobGroupId(context);
// Each paragraph has one unique jobGroup, and one paragraph may run multiple times.
// So only look for the first job which match the jobGroup
Optional<SparkJobInfo> jobInfoOptional = Arrays.stream(sc.statusTracker().getJobIdsForGroup(jobGroup))
.mapToObj(jobId -> sc.statusTracker().getJobInfo(jobId))
.filter(jobInfo -> jobInfo.isDefined())
.map(jobInfo -> jobInfo.get())
.findFirst();
if (jobInfoOptional.isPresent()) {
List<SparkStageInfo> stageInfoList = Arrays.stream(jobInfoOptional.get().stageIds())
.mapToObj(stageId -> sc.statusTracker().getStageInfo(stageId))
.filter(stageInfo -> stageInfo.isDefined())
.map(stageInfo -> stageInfo.get())
.collect(Collectors.toList());
int taskCount = stageInfoList.stream()
.map(stageInfo -> stageInfo.numTasks())
.collect(Collectors.summingInt(Integer::intValue));
int completedTaskCount = stageInfoList.stream()
.map(stageInfo -> stageInfo.numCompletedTasks())
.collect(Collectors.summingInt(Integer::intValue));
LOGGER.debug("Total TaskCount: " + taskCount);
LOGGER.debug("Completed TaskCount: " + completedTaskCount);
if (taskCount == 0) {
return 0;
} else {
return 100 * completedTaskCount / taskCount;
}
} else {
return 0;
}
}
}
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.spark;

import org.apache.spark.SparkContext;
import org.apache.spark.SparkJobInfo;
import org.apache.spark.SparkStageInfo;
import org.apache.spark.scheduler.StageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;


public class JobProgressUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(JobProgressUtil.class);


}
Expand Up @@ -228,7 +228,7 @@ public FormType getFormType() {

@Override
public int getProgress(InterpreterContext context) throws InterpreterException {
return innerInterpreter.getProgress(Utils.buildJobGroupId(context), context);
return innerInterpreter.getProgress(context);
}

public ZeppelinContext getZeppelinContext() {
Expand Down

This file was deleted.

Expand Up @@ -160,10 +160,6 @@ class SparkScala211Interpreter(conf: SparkConf,
}
}

override def getProgress(jobGroup: String, context: InterpreterContext): Int = {
JobProgressUtil.progress(sc, jobGroup)
}

// for use in java side
private def bind(name: String,
tpe: String,
Expand Down

This file was deleted.

Expand Up @@ -163,10 +163,6 @@ class SparkScala212Interpreter(conf: SparkConf,
def interpret(code: String): InterpreterResult =
interpret(code, InterpreterContext.get())

override def getProgress(jobGroup: String, context: InterpreterContext): Int = {
JobProgressUtil.progress(sc, jobGroup)
}

// for use in java side
private def bind(name: String,
tpe: String,
Expand Down

This file was deleted.

Expand Up @@ -159,10 +159,6 @@ class SparkScala213Interpreter(conf: SparkConf,
}
}

override def getProgress(jobGroup: String, context: InterpreterContext): Int = {
JobProgressUtil.progress(sc, jobGroup)
}

// for use in java side
private def bind(name: String,
tpe: String,
Expand Down

0 comments on commit 2b9fe2c

Please sign in to comment.