From 479468d296c1d87692a2269e8eff77ebb9909235 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Fri, 19 Mar 2021 22:37:13 +0530 Subject: [PATCH] add the support for the sparklens in spark-3.0.0 and later version of spark with scala-2.12 Author: SaurabhChawla Date: Fri Mar 19 22:37:13 2021 +0530 Committer: Saurabh Chawla --- README.md | 25 +++++++++++++++---- build.sbt | 6 ++--- .../qubole/sparklens/QuboleJobListener.scala | 3 ++- .../sparklens/helper/HDFSConfigHelper.scala | 17 ++++++++----- version.sbt | 2 +- 5 files changed, 37 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index f470999..6cc7257 100644 --- a/README.md +++ b/README.md @@ -92,16 +92,31 @@ Note: Apart from the console based report, you can also get an UI based report s `--conf spark.sparklens.report.email=` along with other relevant confs mentioned below. This functionality is available in Sparklens 0.3.2 and above. -Use the following arguments to `spark-submit` or `spark-shell`: +Use the following arguments to `spark-submit` or `spark-shell` for spark-3.0.0 and latest version of spark: +``` +--packages qubole:sparklens:0.4.0-s_2.12 +--conf spark.extraListeners=com.qubole.sparklens.QuboleJobListener +``` + +Use the following arguments to `spark-submit` or `spark-shell` for spark-2.4.x and lower version of spark: ``` --packages qubole:sparklens:0.3.2-s_2.11 --conf spark.extraListeners=com.qubole.sparklens.QuboleJobListener ``` + #### 2. Run from Sparklens offline data #### You can choose not to run sparklens inside the app, but at a later time. Run your app as above -with additional configuration parameters: +with additional configuration parameters +For spark-3.0.0 and latest version of spark: +``` +--packages qubole:sparklens:0.4.0-s_2.12 +--conf spark.extraListeners=com.qubole.sparklens.QuboleJobListener +--conf spark.sparklens.reporting.disabled=true +``` + +For spark-2.4.x and lower version of spark: ``` --packages qubole:sparklens:0.3.2-s_2.11 --conf spark.extraListeners=com.qubole.sparklens.QuboleJobListener @@ -111,7 +126,7 @@ with additional configuration parameters: This will not run reporting, but instead create a Sparklens JSON file for the application which is stored in the **spark.sparklens.data.dir** directory (by default, **/tmp/sparklens/**). Note that this will be stored on HDFS by default. To save this file to s3, please set **spark.sparklens.data.dir** to s3 path. This data file can now be used to run Sparklens reporting independently, using `spark-submit` command as follows: -`./bin/spark-submit --packages qubole:sparklens:0.3.2-s_2.11 --class com.qubole.sparklens.app.ReporterApp qubole-dummy-arg ` +`./bin/spark-submit --packages qubole:sparklens:0.4.0-s_2.12 --class com.qubole.sparklens.app.ReporterApp qubole-dummy-arg ` `` should be replaced by the full path of sparklens json file. If the file is on s3 use the full s3 path. For files on local file system, use file:// prefix with the local file location. HDFS is supported as well. @@ -124,11 +139,11 @@ running via `sparklens-json-file` above) with another option specifying that is event history file. This file can be in any of the formats the event history files supports, i.e. **text, snappy, lz4 or lzf**. Note the extra `source=history` parameter in this example: -`./bin/spark-submit --packages qubole:sparklens:0.3.2-s_2.11 --class com.qubole.sparklens.app.ReporterApp qubole-dummy-arg source=history` +`./bin/spark-submit --packages qubole:sparklens:0.4.0-s_2.12 --class com.qubole.sparklens.app.ReporterApp qubole-dummy-arg source=history` It is also possible to convert an event history file to a Sparklens json file using the following command: -`./bin/spark-submit --packages qubole:sparklens:0.3.2-s_2.11 --class com.qubole.sparklens.app.EventHistoryToSparklensJson qubole-dummy-arg ` +`./bin/spark-submit --packages qubole:sparklens:0.4.0-s_2.12 --class com.qubole.sparklens.app.EventHistoryToSparklensJson qubole-dummy-arg ` EventHistoryToSparklensJson is designed to work on local file system only. Please make sure that the source and target directories are on local file system. diff --git a/build.sbt b/build.sbt index 814fa1f..7fd29b5 100644 --- a/build.sbt +++ b/build.sbt @@ -1,13 +1,13 @@ name := "sparklens" organization := "com.qubole" -scalaVersion := "2.11.8" +scalaVersion := "2.12.10" -crossScalaVersions := Seq("2.10.6", "2.11.8") +crossScalaVersions := Seq("2.10.6", "2.11.12", "2.12.10") spName := "qubole/sparklens" -sparkVersion := "2.0.0" +sparkVersion := "3.0.1" spAppendScalaVersion := true diff --git a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala index 1a0c356..7953e82 100644 --- a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala +++ b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala @@ -257,7 +257,8 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { if (stageCompleted.stageInfo.failureReason.isDefined) { //stage failed val si = stageCompleted.stageInfo - failedStages += s""" Stage ${si.stageId} attempt ${si.attemptId} in job ${stageIDToJobID(si.stageId)} failed. + // attempt-id is deprecated and attemptNumber is used to get attempt-id from spark-3.0.0 + failedStages += s""" Stage ${si.stageId} attempt ${si.attemptNumber} in job ${stageIDToJobID(si.stageId)} failed. Stage tasks: ${si.numTasks} """ stageTimeSpan.finalUpdate() diff --git a/src/main/scala/com/qubole/sparklens/helper/HDFSConfigHelper.scala b/src/main/scala/com/qubole/sparklens/helper/HDFSConfigHelper.scala index ea93ba8..80dc525 100644 --- a/src/main/scala/com/qubole/sparklens/helper/HDFSConfigHelper.scala +++ b/src/main/scala/com/qubole/sparklens/helper/HDFSConfigHelper.scala @@ -24,11 +24,16 @@ import org.apache.spark.deploy.SparkHadoopUtil object HDFSConfigHelper { def getHadoopConf(sparkConfOptional:Option[SparkConf]): Configuration = { - if (sparkConfOptional.isDefined) { - SparkHadoopUtil.get.newConfiguration(sparkConfOptional.get) - }else { - val sparkConf = new SparkConf() - SparkHadoopUtil.get.newConfiguration(sparkConf) - } + // After Spark 3.0.0 SparkHadoopUtil is made private to make it work only within the spark + // using reflection code here to access the newConfiguration method of the SparkHadoopUtil + val sparkHadoopUtilClass = Class.forName("org.apache.spark.deploy.SparkHadoopUtil") + val sparkHadoopUtil = sparkHadoopUtilClass.newInstance() + val newConfigurationMethod = sparkHadoopUtilClass.getMethod("newConfiguration", classOf[SparkConf]) + if (sparkConfOptional.isDefined) { + newConfigurationMethod.invoke(sparkHadoopUtil, sparkConfOptional.get).asInstanceOf[Configuration] + } else { + val sparkConf = new SparkConf() + newConfigurationMethod.invoke(sparkHadoopUtil, sparkConf).asInstanceOf[Configuration] + } } } diff --git a/version.sbt b/version.sbt index 8ed576b..9f0e5a5 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.3.2" +version in ThisBuild := "0.4.0"