From 5c0cca08545afae0da7646f990218e3214f22616 Mon Sep 17 00:00:00 2001 From: Alexandre Archambault Date: Thu, 13 Oct 2022 11:42:35 +0200 Subject: [PATCH 1/6] Run spark standalone tests in 2.13 too --- .../integration/SparkTestDefinitions.scala | 143 +++++++++++++++++- .../scala/cli/integration/SparkTests212.scala | 89 +---------- .../scala/cli/integration/SparkTests213.scala | 2 +- 3 files changed, 146 insertions(+), 88 deletions(-) diff --git a/modules/integration/src/test/scala/scala/cli/integration/SparkTestDefinitions.scala b/modules/integration/src/test/scala/scala/cli/integration/SparkTestDefinitions.scala index 11264f88fc..ffe2606634 100644 --- a/modules/integration/src/test/scala/scala/cli/integration/SparkTestDefinitions.scala +++ b/modules/integration/src/test/scala/scala/cli/integration/SparkTestDefinitions.scala @@ -1,7 +1,146 @@ package scala.cli.integration -class SparkTestDefinitions extends ScalaCliSuite { +import com.eed3si9n.expecty.Expecty.expect - protected lazy val extraOptions: Seq[String] = TestUtil.extraOptions +import java.io.File +import java.util.Locale + +import scala.util.Properties + +object SparkTestDefinitions { + + def lightweightSparkDistribVersionOpt = Option("0.0.4") + + final class Spark(val sparkVersion: String, val scalaVersion: String) { + private def sbv = scalaVersion.split('.').take(2).mkString(".") + private var toDeleteOpt = Option.empty[os.Path] + lazy val sparkHome: os.Path = { + val url = lightweightSparkDistribVersionOpt match { + case Some(lightweightSparkDistribVersion) => + s"https://github.com/scala-cli/lightweight-spark-distrib/releases/download/v$lightweightSparkDistribVersion/spark-$sparkVersion-bin-hadoop2.7-scala$sbv.tgz" + case None => + // original URL (too heavyweight, often fails / times out…) + s"https://archive.apache.org/dist/spark/spark-$sparkVersion/spark-$sparkVersion-bin-hadoop2.7.tgz" + } + val baseDir = + os.Path(os.proc(TestUtil.cs, "get", "--archive", url).call().out.trim(), os.pwd) + val home = os.list(baseDir) match { + case Seq(dir) if os.isDir(dir) => dir + case _ => baseDir + } + if (lightweightSparkDistribVersionOpt.nonEmpty) { + val copy = os.temp.dir(prefix = home.last) / "home" + toDeleteOpt = Some(copy) + System.err.println(s"Copying $home over to $copy") + os.copy(home, copy) + val fetchJarsScript0 = copy / "fetch-jars.sh" + val cmd: Seq[os.Shellable] = + if (Properties.isWin) Seq("""C:\Program Files\Git\bin\bash.EXE""", fetchJarsScript0) + else Seq(fetchJarsScript0) + + System.err.println(s"Running $cmd") + os.proc(cmd).call(stdin = os.Inherit, stdout = os.Inherit) + System.err.println(s"Spark home $copy ready") + copy + } + else + home + } + def cleanUp(): Unit = + toDeleteOpt.foreach(os.remove.all(_)) + } + +} + +abstract class SparkTestDefinitions(val scalaVersionOpt: Option[String]) extends ScalaCliSuite + with TestScalaVersionArgs { + + import SparkTestDefinitions.* + + protected lazy val extraOptions: Seq[String] = scalaVersionArgs ++ TestUtil.extraOptions + + protected def defaultMaster = "local[4]" + protected def simpleJobInputs(spark: Spark) = TestInputs( + os.rel / "SparkJob.scala" -> + s"""//> using lib "org.apache.spark::spark-sql:${spark.sparkVersion}" + | + |import org.apache.spark._ + |import org.apache.spark.sql._ + | + |object SparkJob { + | def main(args: Array[String]): Unit = { + | val spark = SparkSession.builder() + | .appName("Test job") + | .getOrCreate() + | import spark.implicits._ + | def sc = spark.sparkContext + | val accum = sc.longAccumulator + | sc.parallelize(1 to 10).foreach(x => accum.add(x)) + | println("Result: " + accum.value) + | } + |} + |""".stripMargin + ) + + private def maybeSetupWinutils(hadoopHome: os.Path): Unit = + if (Properties.isWin) { + val bin = hadoopHome / "bin" + os.makeDir.all(bin) + val res = os.proc( + TestUtil.cs, + "get", + "--archive", + "https://github.com/steveloughran/winutils/releases/download/tag_2017-08-29-hadoop-2.8.1-native/hadoop-2.8.1.zip" + ) + .call(cwd = hadoopHome) + val dataDir = os.Path(res.out.trim()) + val binStuffDir = os.list(dataDir) + .filter(os.isDir(_)) + .filter(_.last.startsWith("hadoop-")) + .headOption + .getOrElse { + sys.error(s"No hadoop-* directory found under $dataDir") + } + for (elem <- os.list(binStuffDir)) + os.copy.into(elem, bin) + } + + private def maybeHadoopHomeForWinutils(hadoopHome: os.Path): Map[String, String] = + if (Properties.isWin) { + // FIXME Maybe Scala CLI should handle that itself, when on Windows, + // for Spark >= 3.3.0 (maybe 3.3.0, > 3.0 for sure) + maybeSetupWinutils(hadoopHome) + val (pathVarName, currentPath) = + sys.env.find(_._1.toLowerCase(Locale.ROOT) == "path").getOrElse(("PATH", "")) + Map( + pathVarName -> s"${hadoopHome / "bin"}${File.pathSeparator}$currentPath", + "HADOOP_HOME" -> hadoopHome.toString + ) + } + else + Map.empty[String, String] + + def simpleRunStandaloneSparkJobTest( + scalaVersion: String, + sparkVersion: String, + needsWinUtils: Boolean = false + ): Unit = + simpleJobInputs(new Spark(sparkVersion, scalaVersion)).fromRoot { root => + val extraEnv = + if (needsWinUtils) maybeHadoopHomeForWinutils(root / "hadoop-home") + else Map.empty[String, String] + val res = os.proc(TestUtil.cli, "run", extraOptions, "--spark-standalone", "--jvm", "8", ".") + .call(cwd = root, env = extraEnv) + + val expectedOutput = "Result: 55" + + val output = res.out.trim().linesIterator.toVector + + expect(output.contains(expectedOutput)) + } + + test("run spark 3.3 standalone") { + simpleRunStandaloneSparkJobTest(actualScalaVersion, "3.3.0", needsWinUtils = true) + } } diff --git a/modules/integration/src/test/scala/scala/cli/integration/SparkTests212.scala b/modules/integration/src/test/scala/scala/cli/integration/SparkTests212.scala index f6fbe7cf1d..a4af156fb9 100644 --- a/modules/integration/src/test/scala/scala/cli/integration/SparkTests212.scala +++ b/modules/integration/src/test/scala/scala/cli/integration/SparkTests212.scala @@ -8,54 +8,9 @@ import java.util.Locale import scala.jdk.CollectionConverters._ import scala.util.Properties -object SparkTests212 { - - private def lightweightSparkDistribVersionOpt = Option("0.0.4") - - private final class Spark(val sparkVersion: String, val scalaVersion: String) { - private def sbv = scalaVersion.split('.').take(2).mkString(".") - private var toDeleteOpt = Option.empty[os.Path] - lazy val sparkHome: os.Path = { - val url = lightweightSparkDistribVersionOpt match { - case Some(lightweightSparkDistribVersion) => - s"https://github.com/scala-cli/lightweight-spark-distrib/releases/download/v$lightweightSparkDistribVersion/spark-$sparkVersion-bin-hadoop2.7-scala$sbv.tgz" - case None => - // original URL (too heavyweight, often fails / times out…) - s"https://archive.apache.org/dist/spark/spark-$sparkVersion/spark-$sparkVersion-bin-hadoop2.7.tgz" - } - val baseDir = - os.Path(os.proc(TestUtil.cs, "get", "--archive", url).call().out.trim(), os.pwd) - val home = os.list(baseDir) match { - case Seq(dir) if os.isDir(dir) => dir - case _ => baseDir - } - if (lightweightSparkDistribVersionOpt.nonEmpty) { - val copy = os.temp.dir(prefix = home.last) / "home" - toDeleteOpt = Some(copy) - System.err.println(s"Copying $home over to $copy") - os.copy(home, copy) - val fetchJarsScript0 = copy / "fetch-jars.sh" - val cmd: Seq[os.Shellable] = - if (Properties.isWin) Seq("""C:\Program Files\Git\bin\bash.EXE""", fetchJarsScript0) - else Seq(fetchJarsScript0) - - System.err.println(s"Running $cmd") - os.proc(cmd).call(stdin = os.Inherit, stdout = os.Inherit) - System.err.println(s"Spark home $copy ready") - copy - } - else - home - } - def cleanUp(): Unit = - toDeleteOpt.foreach(os.remove.all(_)) - } - -} +class SparkTests212 extends SparkTestDefinitions(scalaVersionOpt = Some(Constants.scala212)) { -class SparkTests212 extends SparkTestDefinitions { - - import SparkTests212.* + import SparkTestDefinitions.* private val spark30 = new Spark( "3.0.3", @@ -74,30 +29,6 @@ class SparkTests212 extends SparkTestDefinitions { spark24.cleanUp() } - private def defaultMaster = "local[4]" - private def simpleJobInputs(spark: Spark) = TestInputs( - os.rel / "SparkJob.scala" -> - s"""//> using lib "org.apache.spark::spark-sql:${spark.sparkVersion}" - |//> using scala "${spark.scalaVersion}" - | - |import org.apache.spark._ - |import org.apache.spark.sql._ - | - |object SparkJob { - | def main(args: Array[String]): Unit = { - | val spark = SparkSession.builder() - | .appName("Test job") - | .getOrCreate() - | import spark.implicits._ - | def sc = spark.sparkContext - | val accum = sc.longAccumulator - | sc.parallelize(1 to 10).foreach(x => accum.add(x)) - | println("Result: " + accum.value) - | } - |} - |""".stripMargin - ) - def simplePackageSparkJobTest(spark: Spark): Unit = simpleJobInputs(spark).fromRoot { root => val dest = os.rel / "SparkJob.jar" @@ -154,18 +85,6 @@ class SparkTests212 extends SparkTestDefinitions { expect(output.contains(expectedOutput)) } - def simpleRunStandaloneSparkJobTest(spark: Spark): Unit = - simpleJobInputs(spark).fromRoot { root => - val res = os.proc(TestUtil.cli, "run", extraOptions, "--spark-standalone", "--jvm", "8", ".") - .call(cwd = root) - - val expectedOutput = "Result: 55" - - val output = res.out.trim().linesIterator.toVector - - expect(output.contains(expectedOutput)) - } - test("package spark 2.4") { simplePackageSparkJobTest(spark24) } @@ -187,11 +106,11 @@ class SparkTests212 extends SparkTestDefinitions { } test("run spark 2.4 standalone") { - simpleRunStandaloneSparkJobTest(spark24) + simpleRunStandaloneSparkJobTest(spark24.scalaVersion, spark24.sparkVersion) } test("run spark 3.0 standalone") { - simpleRunStandaloneSparkJobTest(spark30) + simpleRunStandaloneSparkJobTest(spark30.scalaVersion, spark30.sparkVersion) } } diff --git a/modules/integration/src/test/scala/scala/cli/integration/SparkTests213.scala b/modules/integration/src/test/scala/scala/cli/integration/SparkTests213.scala index 1cbd5410d1..9e024dafcf 100644 --- a/modules/integration/src/test/scala/scala/cli/integration/SparkTests213.scala +++ b/modules/integration/src/test/scala/scala/cli/integration/SparkTests213.scala @@ -1,3 +1,3 @@ package scala.cli.integration -class SparkTests213 extends SparkTestDefinitions +class SparkTests213 extends SparkTestDefinitions(scalaVersionOpt = Some(Constants.scala213)) From a8f3dfbc2e467f7f3bcbb6c78708ba65d9b35f27 Mon Sep 17 00:00:00 2001 From: Alexandre Archambault Date: Thu, 25 Aug 2022 18:43:49 +0200 Subject: [PATCH 2/6] Add RunMode.HasRepl Only implemented by RunMode.Default for now. Makes subsequent commits more readable. --- .../main/scala/scala/cli/commands/Repl.scala | 50 ++++++++++++------- .../scala/cli/commands/run/RunMode.scala | 5 +- 2 files changed, 37 insertions(+), 18 deletions(-) diff --git a/modules/cli/src/main/scala/scala/cli/commands/Repl.scala b/modules/cli/src/main/scala/scala/cli/commands/Repl.scala index 0a4ae93b9c..c24d232d85 100644 --- a/modules/cli/src/main/scala/scala/cli/commands/Repl.scala +++ b/modules/cli/src/main/scala/scala/cli/commands/Repl.scala @@ -14,6 +14,7 @@ import scala.build.options.{BuildOptions, JavaOpt, Scope} import scala.cli.CurrentParams import scala.cli.commands.Run.{maybePrintSimpleScalacOutput, orPythonDetectionError} import scala.cli.commands.publish.ConfigUtil.* +import scala.cli.commands.run.RunMode import scala.cli.commands.util.CommonOps.* import scala.cli.commands.util.SharedOptionsUtil.* import scala.cli.config.{ConfigDb, Keys} @@ -52,6 +53,9 @@ object Repl extends ScalaCommand[ReplOptions] { ) } + private def runMode(options: ReplOptions): RunMode.HasRepl = + RunMode.Default + override def runCommand(options: ReplOptions, args: RemainingArgs): Unit = { val initialBuildOptions = buildOptionsOrExit(options) def default = Inputs.default().getOrElse { @@ -85,6 +89,7 @@ object Repl extends ScalaCommand[ReplOptions] { artifacts: Artifacts, classDir: Option[os.Path], allowExit: Boolean, + runMode: RunMode.HasRepl, buildOpt: Option[Build.Successful] ): Unit = { val res = runRepl( @@ -96,6 +101,7 @@ object Repl extends ScalaCommand[ReplOptions] { logger, allowExit = allowExit, options.sharedRepl.replDryRun, + runMode, buildOpt ) res match { @@ -107,13 +113,15 @@ object Repl extends ScalaCommand[ReplOptions] { } def doRunReplFromBuild( build: Build.Successful, - allowExit: Boolean + allowExit: Boolean, + runMode: RunMode.HasRepl ): Unit = doRunRepl( build.options, build.artifacts, build.outputOpt, allowExit, + runMode, Some(build) ) @@ -135,6 +143,7 @@ object Repl extends ScalaCommand[ReplOptions] { artifacts, None, allowExit = !options.sharedRepl.watch.watchMode, + runMode = runMode(options), buildOpt = None ) } @@ -160,9 +169,10 @@ object Repl extends ScalaCommand[ReplOptions] { ) { res => for (builds <- res.orReport(logger)) builds.main match { - case s: Build.Successful => doRunReplFromBuild(s, allowExit = false) - case _: Build.Failed => buildFailed(allowExit = false) - case _: Build.Cancelled => buildCancelled(allowExit = false) + case s: Build.Successful => + doRunReplFromBuild(s, allowExit = false, runMode = runMode(options)) + case _: Build.Failed => buildFailed(allowExit = false) + case _: Build.Cancelled => buildCancelled(allowExit = false) } } try WatchUtil.waitForCtrlC(() => watcher.schedule()) @@ -183,9 +193,10 @@ object Repl extends ScalaCommand[ReplOptions] { ) .orExit(logger) builds.main match { - case s: Build.Successful => doRunReplFromBuild(s, allowExit = true) - case _: Build.Failed => buildFailed(allowExit = true) - case _: Build.Cancelled => buildCancelled(allowExit = true) + case s: Build.Successful => + doRunReplFromBuild(s, allowExit = true, runMode = runMode(options)) + case _: Build.Failed => buildFailed(allowExit = true) + case _: Build.Cancelled => buildCancelled(allowExit = true) } } } @@ -208,6 +219,7 @@ object Repl extends ScalaCommand[ReplOptions] { logger: Logger, allowExit: Boolean, dryRun: Boolean, + runMode: RunMode.HasRepl, buildOpt: Option[Build.Successful] ): Either[BuildException, Unit] = either { @@ -363,16 +375,20 @@ object Repl extends ScalaCommand[ReplOptions] { case other => other } - if (shouldUseAmmonite) { - val replArtifacts = value(ammoniteArtifacts()) - val replArgs = ammoniteAdditionalArgs() ++ programArgs - maybeRunRepl(replArtifacts, replArgs) - } - else { - val replArtifacts = value(defaultArtifacts()) - val replArgs = additionalArgs ++ programArgs - maybeRunRepl(replArtifacts, replArgs) - } + if (shouldUseAmmonite) + runMode match { + case RunMode.Default => + val replArtifacts = value(ammoniteArtifacts()) + val replArgs = ammoniteAdditionalArgs() ++ programArgs + maybeRunRepl(replArtifacts, replArgs) + } + else + runMode match { + case RunMode.Default => + val replArtifacts = value(defaultArtifacts()) + val replArgs = additionalArgs ++ programArgs + maybeRunRepl(replArtifacts, replArgs) + } } final class ReplError(retCode: Int) diff --git a/modules/cli/src/main/scala/scala/cli/commands/run/RunMode.scala b/modules/cli/src/main/scala/scala/cli/commands/run/RunMode.scala index df883f7fa8..01eed9261c 100644 --- a/modules/cli/src/main/scala/scala/cli/commands/run/RunMode.scala +++ b/modules/cli/src/main/scala/scala/cli/commands/run/RunMode.scala @@ -3,7 +3,10 @@ package scala.cli.commands.run sealed abstract class RunMode extends Product with Serializable object RunMode { - case object Default extends RunMode + + sealed abstract class HasRepl extends RunMode + + case object Default extends HasRepl case object SparkSubmit extends RunMode case object StandaloneSparkSubmit extends RunMode case object HadoopJar extends RunMode From 803c64485fe22fe8c21b9d8641fd441f09fc802f Mon Sep 17 00:00:00 2001 From: Alexandre Archambault Date: Thu, 25 Aug 2022 18:43:56 +0200 Subject: [PATCH 3/6] Add RunMode.Spark --- modules/cli/src/main/scala/scala/cli/commands/Run.scala | 8 ++++---- .../src/main/scala/scala/cli/commands/run/RunMode.scala | 5 +++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/modules/cli/src/main/scala/scala/cli/commands/Run.scala b/modules/cli/src/main/scala/scala/cli/commands/Run.scala index a5bf154761..c69403f660 100644 --- a/modules/cli/src/main/scala/scala/cli/commands/Run.scala +++ b/modules/cli/src/main/scala/scala/cli/commands/Run.scala @@ -69,7 +69,7 @@ object Run extends ScalaCommand[RunOptions] with BuildCommandHelpers { sharedJava.allJavaOpts.map(JavaOpt(_)).map(Positioned.commandLine), jvmIdOpt = baseOptions.javaOptions.jvmIdOpt.orElse { runMode(options) match { - case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit | RunMode.HadoopJar => + case _: RunMode.Spark | RunMode.HadoopJar => Some("8") case RunMode.Default => None } @@ -78,8 +78,8 @@ object Run extends ScalaCommand[RunOptions] with BuildCommandHelpers { internal = baseOptions.internal.copy( keepResolution = baseOptions.internal.keepResolution || { runMode(options) match { - case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit | RunMode.HadoopJar => true - case RunMode.Default => false + case _: RunMode.Spark | RunMode.HadoopJar => true + case RunMode.Default => false } } ), @@ -90,7 +90,7 @@ object Run extends ScalaCommand[RunOptions] with BuildCommandHelpers { scalaPyVersion = options.sharedRun.sharedPython.scalaPyVersion, addRunnerDependencyOpt = baseOptions.notForBloopOptions.addRunnerDependencyOpt.orElse { runMode(options) match { - case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit | RunMode.HadoopJar => + case _: RunMode.Spark | RunMode.HadoopJar => Some(false) case RunMode.Default => None } diff --git a/modules/cli/src/main/scala/scala/cli/commands/run/RunMode.scala b/modules/cli/src/main/scala/scala/cli/commands/run/RunMode.scala index 01eed9261c..314cda133c 100644 --- a/modules/cli/src/main/scala/scala/cli/commands/run/RunMode.scala +++ b/modules/cli/src/main/scala/scala/cli/commands/run/RunMode.scala @@ -5,9 +5,10 @@ sealed abstract class RunMode extends Product with Serializable object RunMode { sealed abstract class HasRepl extends RunMode + sealed abstract class Spark extends RunMode case object Default extends HasRepl - case object SparkSubmit extends RunMode - case object StandaloneSparkSubmit extends RunMode + case object SparkSubmit extends Spark + case object StandaloneSparkSubmit extends Spark case object HadoopJar extends RunMode } From aa3854f86aa936fda26ca303a72eb6d46d80f0f9 Mon Sep 17 00:00:00 2001 From: Alexandre Archambault Date: Thu, 13 Oct 2022 16:28:50 +0200 Subject: [PATCH 4/6] No need of explicit Java 8 when passing --spark --- .../test/scala/scala/cli/integration/SparkTestDefinitions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/integration/src/test/scala/scala/cli/integration/SparkTestDefinitions.scala b/modules/integration/src/test/scala/scala/cli/integration/SparkTestDefinitions.scala index ffe2606634..9cb9034d50 100644 --- a/modules/integration/src/test/scala/scala/cli/integration/SparkTestDefinitions.scala +++ b/modules/integration/src/test/scala/scala/cli/integration/SparkTestDefinitions.scala @@ -129,7 +129,7 @@ abstract class SparkTestDefinitions(val scalaVersionOpt: Option[String]) extends val extraEnv = if (needsWinUtils) maybeHadoopHomeForWinutils(root / "hadoop-home") else Map.empty[String, String] - val res = os.proc(TestUtil.cli, "run", extraOptions, "--spark-standalone", "--jvm", "8", ".") + val res = os.proc(TestUtil.cli, "run", extraOptions, "--spark-standalone", ".") .call(cwd = root, env = extraEnv) val expectedOutput = "Result: 55" From b3a2e5d0fc5cc35a3b53b16a1b4ec7bdcf29fc86 Mon Sep 17 00:00:00 2001 From: Alexandre Archambault Date: Thu, 25 Aug 2022 18:44:03 +0200 Subject: [PATCH 5/6] Accept spark-submit arguments on the command-line --- .../scala/cli/commands/SharedRunOptions.scala | 5 ++ .../main/scala/scala/cli/commands/Run.scala | 10 ++-- .../scala/cli/commands/run/RunMode.scala | 19 ++++++-- .../scala/cli/commands/util/RunSpark.scala | 4 ++ .../integration/SparkTestDefinitions.scala | 46 +++++++++++++++++++ website/docs/reference/cli-options.md | 7 +++ 6 files changed, 82 insertions(+), 9 deletions(-) diff --git a/modules/cli-options/src/main/scala/scala/cli/commands/SharedRunOptions.scala b/modules/cli-options/src/main/scala/scala/cli/commands/SharedRunOptions.scala index 17cd9c4f95..3be8adc761 100644 --- a/modules/cli-options/src/main/scala/scala/cli/commands/SharedRunOptions.scala +++ b/modules/cli-options/src/main/scala/scala/cli/commands/SharedRunOptions.scala @@ -25,6 +25,11 @@ final case class SharedRunOptions( @ExtraName("spark") sparkSubmit: Option[Boolean] = None, @Group("Run") + @Hidden + @HelpMessage("[experimental] spark-submit arguments") + @ExtraName("submitArg") + submitArgument: List[String] = Nil, + @Group("Run") @HelpMessage("[experimental] Run as a Spark job, using a vanilla Spark distribution downloaded by Scala CLI") @ExtraName("sparkStandalone") standaloneSpark: Option[Boolean] = None, diff --git a/modules/cli/src/main/scala/scala/cli/commands/Run.scala b/modules/cli/src/main/scala/scala/cli/commands/Run.scala index c69403f660..3f48a2052b 100644 --- a/modules/cli/src/main/scala/scala/cli/commands/Run.scala +++ b/modules/cli/src/main/scala/scala/cli/commands/Run.scala @@ -32,9 +32,9 @@ object Run extends ScalaCommand[RunOptions] with BuildCommandHelpers { options.sharedRun.standaloneSpark.getOrElse(false) && !options.sharedRun.sparkSubmit.contains(false) ) - RunMode.StandaloneSparkSubmit + RunMode.StandaloneSparkSubmit(options.sharedRun.submitArgument) else if (options.sharedRun.sparkSubmit.getOrElse(false)) - RunMode.SparkSubmit + RunMode.SparkSubmit(options.sharedRun.submitArgument) else if (options.sharedRun.hadoopJar) RunMode.HadoopJar else @@ -472,24 +472,26 @@ object Run extends ScalaCommand[RunOptions] with BuildCommandHelpers { ) Right((proc, None)) } - case RunMode.SparkSubmit => + case mode: RunMode.SparkSubmit => value { RunSpark.run( build, mainClass, args, + mode.submitArgs, logger, allowExecve, showCommand, scratchDirOpt ) } - case RunMode.StandaloneSparkSubmit => + case mode: RunMode.StandaloneSparkSubmit => value { RunSpark.runStandalone( build, mainClass, args, + mode.submitArgs, logger, allowExecve, showCommand, diff --git a/modules/cli/src/main/scala/scala/cli/commands/run/RunMode.scala b/modules/cli/src/main/scala/scala/cli/commands/run/RunMode.scala index 314cda133c..9409d6e1ff 100644 --- a/modules/cli/src/main/scala/scala/cli/commands/run/RunMode.scala +++ b/modules/cli/src/main/scala/scala/cli/commands/run/RunMode.scala @@ -5,10 +5,19 @@ sealed abstract class RunMode extends Product with Serializable object RunMode { sealed abstract class HasRepl extends RunMode - sealed abstract class Spark extends RunMode + sealed abstract class Spark extends RunMode { + def submitArgs: Seq[String] + def withSubmitArgs(args: Seq[String]): Spark + } - case object Default extends HasRepl - case object SparkSubmit extends Spark - case object StandaloneSparkSubmit extends Spark - case object HadoopJar extends RunMode + case object Default extends HasRepl + final case class SparkSubmit(submitArgs: Seq[String]) extends Spark { + def withSubmitArgs(args: Seq[String]): SparkSubmit = + copy(submitArgs = args) + } + final case class StandaloneSparkSubmit(submitArgs: Seq[String]) extends Spark { + def withSubmitArgs(args: Seq[String]): StandaloneSparkSubmit = + copy(submitArgs = args) + } + case object HadoopJar extends RunMode } diff --git a/modules/cli/src/main/scala/scala/cli/commands/util/RunSpark.scala b/modules/cli/src/main/scala/scala/cli/commands/util/RunSpark.scala index 38d7b86d2a..c0fa38fcaa 100644 --- a/modules/cli/src/main/scala/scala/cli/commands/util/RunSpark.scala +++ b/modules/cli/src/main/scala/scala/cli/commands/util/RunSpark.scala @@ -16,6 +16,7 @@ object RunSpark { build: Build.Successful, mainClass: String, args: Seq[String], + submitArgs: Seq[String], logger: Logger, allowExecve: Boolean, showCommand: Boolean, @@ -54,6 +55,7 @@ object RunSpark { Seq(submitCommand, "--class", mainClass) ++ jarsArgs ++ javaOpts.flatMap(opt => Seq("--driver-java-options", opt)) ++ + submitArgs ++ Seq(library.toString) ++ args val envUpdates = javaHomeInfo.envUpdates(sys.env) @@ -77,6 +79,7 @@ object RunSpark { build: Build.Successful, mainClass: String, args: Seq[String], + submitArgs: Seq[String], logger: Logger, allowExecve: Boolean, showCommand: Boolean, @@ -107,6 +110,7 @@ object RunSpark { Seq("--class", mainClass) ++ jarsArgs ++ javaOpts.flatMap(opt => Seq("--driver-java-options", opt)) ++ + submitArgs ++ Seq(library.toString) ++ args val envUpdates = javaHomeInfo.envUpdates(sys.env) diff --git a/modules/integration/src/test/scala/scala/cli/integration/SparkTestDefinitions.scala b/modules/integration/src/test/scala/scala/cli/integration/SparkTestDefinitions.scala index 9cb9034d50..52d633bd97 100644 --- a/modules/integration/src/test/scala/scala/cli/integration/SparkTestDefinitions.scala +++ b/modules/integration/src/test/scala/scala/cli/integration/SparkTestDefinitions.scala @@ -143,4 +143,50 @@ abstract class SparkTestDefinitions(val scalaVersionOpt: Option[String]) extends simpleRunStandaloneSparkJobTest(actualScalaVersion, "3.3.0", needsWinUtils = true) } + test("run spark spark-submit args") { + val jobName = "the test spark job" + val inputs = TestInputs( + os.rel / "SparkJob.scala" -> + s"""//> using lib "org.apache.spark::spark-sql:3.3.0" + | + |import org.apache.spark._ + |import org.apache.spark.sql._ + | + |object SparkJob { + | def main(args: Array[String]): Unit = { + | val spark = SparkSession.builder().getOrCreate() + | val name = spark.conf.get("spark.app.name") + | assert(name == "$jobName") + | import spark.implicits._ + | def sc = spark.sparkContext + | val accum = sc.longAccumulator + | sc.parallelize(1 to 10).foreach(x => accum.add(x)) + | println("Result: " + accum.value) + | } + |} + |""".stripMargin + ) + inputs.fromRoot { root => + val extraEnv = maybeHadoopHomeForWinutils(root / "hadoop-home") + val res = os.proc( + TestUtil.cli, + "run", + extraOptions, + "--spark-standalone", + ".", + "--submit-arg", + "--name", + "--submit-arg", + jobName + ) + .call(cwd = root, env = extraEnv) + + val expectedOutput = "Result: 55" + + val output = res.out.trim().linesIterator.toVector + + expect(output.contains(expectedOutput)) + } + } + } diff --git a/website/docs/reference/cli-options.md b/website/docs/reference/cli-options.md index d595899418..a70317b6a6 100644 --- a/website/docs/reference/cli-options.md +++ b/website/docs/reference/cli-options.md @@ -1021,6 +1021,13 @@ Aliases: `--spark` [Internal] [experimental] Run as a Spark job, using the spark-submit command +### `--submit-argument` + +Aliases: `--submit-arg` + +[Internal] +[experimental] spark-submit arguments + ### `--standalone-spark` Aliases: `--spark-standalone` From 109b1a2bbe6e30da241d52d483b0a5cd93a019fd Mon Sep 17 00:00:00 2001 From: Alexandre Archambault Date: Thu, 13 Oct 2022 11:02:17 +0200 Subject: [PATCH 6/6] Add dependencies in run --spark-standalone tests --- .../scala/cli/integration/SparkTestDefinitions.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/modules/integration/src/test/scala/scala/cli/integration/SparkTestDefinitions.scala b/modules/integration/src/test/scala/scala/cli/integration/SparkTestDefinitions.scala index 52d633bd97..ac3db8b6ef 100644 --- a/modules/integration/src/test/scala/scala/cli/integration/SparkTestDefinitions.scala +++ b/modules/integration/src/test/scala/scala/cli/integration/SparkTestDefinitions.scala @@ -63,6 +63,8 @@ abstract class SparkTestDefinitions(val scalaVersionOpt: Option[String]) extends protected def simpleJobInputs(spark: Spark) = TestInputs( os.rel / "SparkJob.scala" -> s"""//> using lib "org.apache.spark::spark-sql:${spark.sparkVersion}" + |//> using lib "com.chuusai::shapeless:2.3.10" + |//> using lib "com.lihaoyi::pprint:0.7.3" | |import org.apache.spark._ |import org.apache.spark.sql._ @@ -75,7 +77,12 @@ abstract class SparkTestDefinitions(val scalaVersionOpt: Option[String]) extends | import spark.implicits._ | def sc = spark.sparkContext | val accum = sc.longAccumulator - | sc.parallelize(1 to 10).foreach(x => accum.add(x)) + | sc.parallelize(1 to 10).foreach { x => + | import shapeless._ + | val l = x :: HNil + | accum.add(l.head) + | } + | pprint.err.log(accum.value) | println("Result: " + accum.value) | } |}