Skip to content

Commit

Permalink
[SPARK-39601][YARN][FOLLOWUP] YarnClusterSchedulerBackend should call…
Browse files Browse the repository at this point in the history
… super.stop()

### What changes were proposed in this pull request?

This is a followup of apache/spark#38622, I just notice that the `YarnClusterSchedulerBackend#stop` missed calling `super.stop()`.

### Why are the changes needed?

Followup previous change, otherwise Spark may not shutdown properly on Yarn cluster mode.

### Does this PR introduce _any_ user-facing change?

No, unreleased change.

### How was this patch tested?

Existing UT.

Closes #39053 from pan3793/SPARK-39601-followup.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
a0x8o committed Dec 13, 2022
1 parent f8da47d commit 1abe784
Show file tree
Hide file tree
Showing 306 changed files with 8,725 additions and 1,453 deletions.
1 change: 1 addition & 0 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ only_commits:
files:
- appveyor.yml
- dev/appveyor-install-dependencies.ps1
- build/spark-build-info.ps1
- R/
- sql/core/src/main/scala/org/apache/spark/sql/api/r/
- core/src/main/scala/org/apache/spark/api/r/
Expand Down
46 changes: 46 additions & 0 deletions build/spark-build-info.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# This script generates the build info for spark and places it into the spark-version-info.properties file.
# Arguments:
# ResourceDir - The target directory where properties file would be created. [./core/target/extra-resources]
# SparkVersion - The current version of spark

param(
# The resource directory.
[Parameter(Position = 0)]
[String]
$ResourceDir,

# The Spark version.
[Parameter(Position = 1)]
[String]
$SparkVersion
)

$null = New-Item -Type Directory -Force $ResourceDir
$SparkBuildInfoPath = $ResourceDir.TrimEnd('\').TrimEnd('/') + '\spark-version-info.properties'

$SparkBuildInfoContent =
"version=$SparkVersion
user=$($Env:USERNAME)
revision=$(git rev-parse HEAD)
branch=$(git rev-parse --abbrev-ref HEAD)
date=$([DateTime]::UtcNow | Get-Date -UFormat +%Y-%m-%dT%H:%M:%SZ)
url=$(git config --get remote.origin.url)"

Set-Content -Path $SparkBuildInfoPath -Value $SparkBuildInfoContent
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,18 @@ public static PooledByteBufAllocator createPooledByteBufAllocator(
allowCache ? PooledByteBufAllocator.defaultUseCacheForAllThreads() : false
);
}

/**
* ByteBuf allocator prefers to allocate direct ByteBuf iif both Spark allows to create direct
* ByteBuf and Netty enables directBufferPreferred.
*/
public static boolean preferDirectBufs(TransportConf conf) {
boolean allowDirectBufs;
if (conf.sharedByteBufAllocators()) {
allowDirectBufs = conf.preferDirectBufsForSharedByteBufAllocators();
} else {
allowDirectBufs = conf.preferDirectBufs();
}
return allowDirectBufs && PlatformDependent.directBufferPreferred();
}
}
5 changes: 5 additions & 0 deletions common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ case class AvroScan(

override def hashCode(): Int = super.hashCode()

override def description(): String = {
super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]")
}

override def getMetaData(): Map[String, String] = {
super.getMetaData() ++ Map("PushedFilters" -> seqToString(pushedFilters))
}
Expand Down
6 changes: 3 additions & 3 deletions connector/connect/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ for example, compiling `connect` module on CentOS 6 or CentOS 7 which the defaul
specifying the user-defined `protoc` and `protoc-gen-grpc-java` binary files as follows:

```bash
export CONNECT_PROTOC_EXEC_PATH=/path-to-protoc-exe
export SPARK_PROTOC_EXEC_PATH=/path-to-protoc-exe
export CONNECT_PLUGIN_EXEC_PATH=/path-to-protoc-gen-grpc-java-exe
./build/mvn -Phive -Puser-defined-protoc clean package
```

or

```bash
export CONNECT_PROTOC_EXEC_PATH=/path-to-protoc-exe
export SPARK_PROTOC_EXEC_PATH=/path-to-protoc-exe
export CONNECT_PLUGIN_EXEC_PATH=/path-to-protoc-gen-grpc-java-exe
./build/sbt -Puser-defined-protoc clean package
```
Expand Down Expand Up @@ -82,7 +82,7 @@ To use the release version of Spark Connect:

```bash
# Run all Spark Connect Python tests as a module.
./python/run-tests --module pyspark-connect
./python/run-tests --module pyspark-connect --parallelism 1
```


Expand Down
4 changes: 2 additions & 2 deletions connector/connect/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@
<profile>
<id>user-defined-protoc</id>
<properties>
<connect.protoc.executable.path>${env.CONNECT_PROTOC_EXEC_PATH}</connect.protoc.executable.path>
<spark.protoc.executable.path>${env.SPARK_PROTOC_EXEC_PATH}</spark.protoc.executable.path>
<connect.plugin.executable.path>${env.CONNECT_PLUGIN_EXEC_PATH}</connect.plugin.executable.path>
</properties>
<build>
Expand All @@ -203,7 +203,7 @@
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocExecutable>${connect.protoc.executable.path}</protocExecutable>
<protocExecutable>${spark.protoc.executable.path}</protocExecutable>
<pluginId>grpc-java</pluginId>
<pluginExecutable>${connect.plugin.executable.path}</pluginExecutable>
<protoSourceRoot>src/main/protobuf</protoSourceRoot>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ message Expression {
Expression expr = 1;

// (Required) the data type that the expr to be casted to.
DataType cast_to_type = 2;
oneof cast_to_type {
DataType type = 2;
// If this is set, Server will use Catalyst parser to parse this string to DataType.
string type_str = 3;
}
}

message Literal {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ syntax = 'proto3';
package spark.connect;

import "spark/connect/expressions.proto";
import "spark/connect/types.proto";

option java_multiple_files = true;
option java_package = "org.apache.spark.connect.proto";
Expand Down Expand Up @@ -54,6 +55,7 @@ message Relation {
Tail tail = 22;
WithColumns with_columns = 23;
Hint hint = 24;
Unpivot unpivot = 25;

// NA functions
NAFill fill_na = 90;
Expand Down Expand Up @@ -304,6 +306,17 @@ message LocalRelation {
// Local collection data serialized into Arrow IPC streaming format which contains
// the schema of the data.
bytes data = 1;

// (Optional) The user provided schema.
//
// The Sever side will update the column names and data types according to this schema.
oneof schema {

DataType datatype = 2;

// Server will use Catalyst parser to parse this string to DataType.
string datatype_str = 3;
}
}

// Relation of type [[Sample]] that samples a fraction of the dataset.
Expand Down Expand Up @@ -570,3 +583,21 @@ message Hint {
// (Optional) Hint parameters.
repeated Expression.Literal parameters = 3;
}

// Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns set.
message Unpivot {
// (Required) The input relation.
Relation input = 1;

// (Required) Id columns.
repeated Expression ids = 2;

// (Optional) Value columns to unpivot.
repeated Expression values = 3;

// (Required) Name of the variable column.
string variable_column_name = 4;

// (Required) Name of the value column.
string value_column_name = 5;
}
67 changes: 67 additions & 0 deletions connector/connect/server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-connect-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -106,19 +112,80 @@
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- #if scala-2.13 --><!--
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
</dependency>
--><!-- #endif scala-2.13 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>failureaccess</artifactId>
<version>${guava.failureaccess.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${io.grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${io.grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
<version>${io.grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${io.grpc.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<version>${netty.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
<version>${netty.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-unix-common</artifactId>
<version>${netty.version}</version>
<scope>provided</scope>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>${tomcat.annotations.api.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ private[spark] object Connect {

val CONNECT_GRPC_ARROW_MAX_BATCH_SIZE =
ConfigBuilder("spark.connect.grpc.arrow.maxBatchSize")
.doc("When using Apache Arrow, limit the maximum size of one arrow batch that " +
"can be sent from server side to client side.")
.doc(
"When using Apache Arrow, limit the maximum size of one arrow batch that " +
"can be sent from server side to client side. Currently, we conservatively use 70% " +
"of it because the size is not accurate but estimated.")
.version("3.4.0")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("4m")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,17 @@ package object dsl {
Expression.Cast
.newBuilder()
.setExpr(expr)
.setCastToType(dataType))
.setType(dataType))
.build()

def cast(dataType: String): Expression =
Expression
.newBuilder()
.setCast(
Expression.Cast
.newBuilder()
.setExpr(expr)
.setTypeStr(dataType))
.build()
}

Expand Down Expand Up @@ -709,6 +719,53 @@ package object dsl {
.build()
}

def unpivot(
ids: Seq[Expression],
values: Seq[Expression],
variableColumnName: String,
valueColumnName: String): Relation = {
Relation
.newBuilder()
.setUnpivot(
Unpivot
.newBuilder()
.setInput(logicalPlan)
.addAllIds(ids.asJava)
.addAllValues(values.asJava)
.setVariableColumnName(variableColumnName)
.setValueColumnName(valueColumnName))
.build()
}

def unpivot(
ids: Seq[Expression],
variableColumnName: String,
valueColumnName: String): Relation = {
Relation
.newBuilder()
.setUnpivot(
Unpivot
.newBuilder()
.setInput(logicalPlan)
.addAllIds(ids.asJava)
.setVariableColumnName(variableColumnName)
.setValueColumnName(valueColumnName))
.build()
}

def melt(
ids: Seq[Expression],
values: Seq[Expression],
variableColumnName: String,
valueColumnName: String): Relation =
unpivot(ids, values, variableColumnName, valueColumnName)

def melt(
ids: Seq[Expression],
variableColumnName: String,
valueColumnName: String): Relation =
unpivot(ids, variableColumnName, valueColumnName)

private def createSetOperation(
left: Relation,
right: Relation,
Expand Down
Loading

0 comments on commit 1abe784

Please sign in to comment.