Skip to content

Commit

Permalink
[SPARK-42748][CONNECT] Server-side Artifact Management
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR adds server-side artifact management as a follow up to the client-side artifact transfer introduced in #40256.

Note: The artifacts added on the server are visible to **all users** of the cluster. This is a limitation of the current spark architecture (unisolated classloaders).

Apart from storing generic artifacts, we handle jars and classfiles in specific ways:

- Jars:
  - Jars may be added but not removed or overwritten.
  - Added jars would be visible to **all** users/tasks/queries.
- Classfiles:
  - Classfiles may not be explicitly removed but are allowed to be overwritten.
  - We piggyback on top of the REPL architecture to serve classfiles to the executors
    -  If a REPL is initialized, classfiles are stored in the existing `spark.repl.class.outputDir` and share the URI with `spark.repl.class.uri`.
    - If a REPL is not being used, we use a custom directory (root: `sparkContext. sparkConnectArtifactDirectory`) to store classfiles and point the `spark.repl.class.uri` towards it.
  - Class files are visible to **all** users/tasks/queries.

### Why are the changes needed?

#40256 implements the client-side transfer of artifacts to the server but currently, the server does not process these requests.

We need to implement a server-side management mechanism to handle the storage of these artifacts on the driver as well as perform further processing (such as adding jars and moving class files to the right directories).

### Does this PR introduce _any_ user-facing change?

Yes, a new experimental API but no behavioural changes.
A new method called `sparkConnectArtifactDirectory` is accessible through SparkContext (the directory storing all artifacts from SparkConnect)

### How was this patch tested?

New unit tests.

Closes #40368 from vicennial/SPARK-42748.

Lead-authored-by: vicennial <venkata.gudesa@databricks.com>
Co-authored-by: Venkata Sai Akhil Gudesa <venkata.gudesa@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
  • Loading branch information
vicennial authored and hvanhovell committed Mar 23, 2023
1 parent 631e8eb commit ec02224
Show file tree
Hide file tree
Showing 26 changed files with 930 additions and 46 deletions.
Expand Up @@ -69,7 +69,7 @@ class PlanGenerationTestSuite
// Borrowed from SparkFunSuite
private val regenerateGoldenFiles: Boolean = System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1"

protected val queryFilePath: Path = commonResourcePath.resolve("queries")
protected val queryFilePath: Path = commonResourcePath.resolve("query-tests/queries")

// A relative path to /connector/connect/server, used by `ProtoToParsedPlanTestSuite` to run
// with the datasource.
Expand Down
Expand Up @@ -75,7 +75,7 @@ class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach {
}

private val CHUNK_SIZE: Int = 32 * 1024
protected def artifactFilePath: Path = baseResourcePath.resolve("artifact-tests")
protected def artifactFilePath: Path = commonResourcePath.resolve("artifact-tests")
protected def artifactCrcPath: Path = artifactFilePath.resolve("crc")

private def getCrcValues(filePath: Path): Seq[Long] = {
Expand Down
Expand Up @@ -52,7 +52,6 @@ trait ConnectFunSuite extends AnyFunSuite { // scalastyle:ignore funsuite
"common",
"src",
"test",
"resources",
"query-tests").toAbsolutePath
"resources").toAbsolutePath
}
}
Binary file not shown.
@@ -0,0 +1 @@
553633018
7 changes: 7 additions & 0 deletions connector/connect/server/pom.xml
Expand Up @@ -233,6 +233,13 @@
<version>2.1.214</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connect.artifact

import java.net.{URL, URLClassLoader}
import java.nio.file.{Files, Path, Paths, StandardCopyOption}
import java.util.concurrent.CopyOnWriteArrayList

import scala.collection.JavaConverters._

import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.Utils

/**
* The Artifact Manager for the [[SparkConnectService]].
*
* This class handles the storage of artifacts as well as preparing the artifacts for use.
* Currently, jars and classfile artifacts undergo additional processing:
* - Jars are automatically added to the underlying [[SparkContext]] and are accessible by all
* users of the cluster.
* - Class files are moved into a common directory that is shared among all users of the
* cluster. Note: Under a multi-user setup, class file conflicts may occur between user
* classes as the class file directory is shared.
*/
class SparkConnectArtifactManager private[connect] {

// The base directory where all artifacts are stored.
// Note: If a REPL is attached to the cluster, class file artifacts are stored in the
// REPL's output directory.
private[connect] lazy val artifactRootPath = SparkContext.getActive match {
case Some(sc) =>
sc.sparkConnectArtifactDirectory.toPath
case None =>
throw new RuntimeException("SparkContext is uninitialized!")
}
private[connect] lazy val artifactRootURI = {
val fileServer = SparkEnv.get.rpcEnv.fileServer
fileServer.addDirectory("artifacts", artifactRootPath.toFile)
}

// The base directory where all class files are stored.
// Note: If a REPL is attached to the cluster, we piggyback on the existing REPL output
// directory to store class file artifacts.
private[connect] lazy val classArtifactDir = SparkEnv.get.conf
.getOption("spark.repl.class.outputDir")
.map(p => Paths.get(p))
.getOrElse(artifactRootPath.resolve("classes"))

private[connect] lazy val classArtifactUri: String =
SparkEnv.get.conf.getOption("spark.repl.class.uri") match {
case Some(uri) => uri
case None =>
throw new RuntimeException("Class artifact URI had not been initialised in SparkContext!")
}

private val jarsList = new CopyOnWriteArrayList[Path]

/**
* Get the URLs of all jar artifacts added through the [[SparkConnectService]].
*
* @return
*/
def getSparkConnectAddedJars: Seq[URL] = jarsList.asScala.map(_.toUri.toURL).toSeq

/**
* Add and prepare a staged artifact (i.e an artifact that has been rebuilt locally from bytes
* over the wire) for use.
*
* @param session
* @param remoteRelativePath
* @param serverLocalStagingPath
*/
private[connect] def addArtifact(
session: SparkSession,
remoteRelativePath: Path,
serverLocalStagingPath: Path): Unit = {
require(!remoteRelativePath.isAbsolute)
if (remoteRelativePath.startsWith("classes/")) {
// Move class files to common location (shared among all users)
val target = classArtifactDir.resolve(remoteRelativePath.toString.stripPrefix("classes/"))
Files.createDirectories(target.getParent)
// Allow overwriting class files to capture updates to classes.
Files.move(serverLocalStagingPath, target, StandardCopyOption.REPLACE_EXISTING)
} else {
val target = artifactRootPath.resolve(remoteRelativePath)
Files.createDirectories(target.getParent)
// Disallow overwriting jars because spark doesn't support removing jars that were
// previously added,
if (Files.exists(target)) {
throw new RuntimeException(
s"Duplicate Jar: $remoteRelativePath. " +
s"Jars cannot be overwritten.")
}
Files.move(serverLocalStagingPath, target)
if (remoteRelativePath.startsWith("jars")) {
// Adding Jars to the underlying spark context (visible to all users)
session.sessionState.resourceLoader.addJar(target.toString)
jarsList.add(target)
}
}
}
}

object SparkConnectArtifactManager {

private var _activeArtifactManager: SparkConnectArtifactManager = _

/**
* Obtain the active artifact manager or create a new artifact manager.
*
* @return
*/
def getOrCreateArtifactManager: SparkConnectArtifactManager = {
if (_activeArtifactManager == null) {
_activeArtifactManager = new SparkConnectArtifactManager
}
_activeArtifactManager
}

private lazy val artifactManager = getOrCreateArtifactManager

/**
* Obtain a classloader that contains jar and classfile artifacts on the classpath.
*
* @return
*/
def classLoaderWithArtifacts: ClassLoader = {
val urls = artifactManager.getSparkConnectAddedJars :+
artifactManager.classArtifactDir.toUri.toURL
new URLClassLoader(urls.toArray, Utils.getContextOrSparkClassLoader)
}

/**
* Run a segment of code utilising a classloader that contains jar and classfile artifacts on
* the classpath.
*
* @param thunk
* @tparam T
* @return
*/
def withArtifactClassLoader[T](thunk: => T): T = {
Utils.withContextClassLoader(classLoaderWithArtifacts) {
thunk
}
}
}
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, JoinType, L
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.{CollectMetrics, CommandResult, Deduplicate, Except, Intersect, LocalRelation, LogicalPlan, Project, Sample, Sort, SubqueryAlias, Union, Unpivot, UnresolvedHint}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput, LiteralValueProtoConverter, UdfPacket}
import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_ARROW_MAX_BATCH_SIZE
import org.apache.spark.sql.connect.plugin.SparkConnectPluginRegistry
Expand Down Expand Up @@ -988,7 +989,9 @@ class SparkConnectPlanner(val session: SparkSession) {
private def transformScalarScalaUDF(fun: proto.CommonInlineUserDefinedFunction): ScalaUDF = {
val udf = fun.getScalarScalaUdf
val udfPacket =
Utils.deserialize[UdfPacket](udf.getPayload.toByteArray, Utils.getContextOrSparkClassLoader)
Utils.deserialize[UdfPacket](
udf.getPayload.toByteArray,
SparkConnectArtifactManager.classLoaderWithArtifacts)
ScalaUDF(
function = udfPacket.function,
dataType = udfPacket.outputEncoder.dataType,
Expand Down

0 comments on commit ec02224

Please sign in to comment.