From a29a51844645b4facc1b76319145dbf234921e68 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Tue, 10 Sep 2024 16:26:08 -0400 Subject: [PATCH 1/8] Add shims to support SparkContext and RDD --- connector/connect/client/jvm/pom.xml | 5 + .../apache/spark/sql/DataFrameReader.scala | 10 ++ .../scala/org/apache/spark/sql/Dataset.scala | 8 ++ .../org/apache/spark/sql/SparkSession.scala | 33 ++++++- .../scala/org/apache/spark/sql/package.scala | 3 + pom.xml | 1 + sql/api/pom.xml | 6 ++ .../spark/sql/api/DataFrameReader.scala | 31 ++++++ .../org/apache/spark/sql/api/Dataset.scala | 29 ++++++ .../apache/spark/sql/api/SparkSession.scala | 97 +++++++++++++++++++ sql/catalyst/pom.xml | 6 ++ sql/connect/server/pom.xml | 4 + sql/connect/shims/README.md | 1 + sql/connect/shims/pom.xml | 41 ++++++++ .../org/apache/spark/api/java/shims.scala | 19 ++++ .../scala/org/apache/spark/rdd/shims.scala | 19 ++++ .../main/scala/org/apache/spark/shims.scala | 19 ++++ .../apache/spark/sql/DataFrameReader.scala | 23 +---- .../scala/org/apache/spark/sql/Dataset.scala | 20 +--- .../org/apache/spark/sql/SparkSession.scala | 74 ++------------ 20 files changed, 341 insertions(+), 108 deletions(-) create mode 100644 sql/connect/shims/README.md create mode 100644 sql/connect/shims/pom.xml create mode 100644 sql/connect/shims/src/main/scala/org/apache/spark/api/java/shims.scala create mode 100644 sql/connect/shims/src/main/scala/org/apache/spark/rdd/shims.scala create mode 100644 sql/connect/shims/src/main/scala/org/apache/spark/shims.scala diff --git a/connector/connect/client/jvm/pom.xml b/connector/connect/client/jvm/pom.xml index be358f317481e..31c15ee7964c5 100644 --- a/connector/connect/client/jvm/pom.xml +++ b/connector/connect/client/jvm/pom.xml @@ -45,6 +45,11 @@ spark-sql-api_${scala.binary.version} ${project.version} + + org.apache.spark + spark-connect-shims_${scala.binary.version} + ${project.version} + org.apache.spark spark-sketch_${scala.binary.version} diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index c3ee7030424eb..07543a2c887db 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -22,7 +22,9 @@ import java.util.Properties import scala.jdk.CollectionConverters._ import org.apache.spark.annotation.Stable +import org.apache.spark.api.java.JavaRDD import org.apache.spark.connect.proto.Parse.ParseFormat +import org.apache.spark.rdd.RDD import org.apache.spark.sql.connect.common.DataTypeProtoConverter import org.apache.spark.sql.types.StructType @@ -139,6 +141,14 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) def json(jsonDataset: Dataset[String]): DataFrame = parse(jsonDataset, ParseFormat.PARSE_FORMAT_JSON) + /** @inheritdoc */ + override def json(jsonRDD: JavaRDD[String]): Dataset[Row] = + throwRddNotSupportedException() + + /** @inheritdoc */ + override def json(jsonRDD: RDD[String]): Dataset[Row] = + throwRddNotSupportedException() + /** @inheritdoc */ override def csv(path: String): DataFrame = super.csv(path) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index f5606215be89d..1c91e0bf927d1 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -26,8 +26,10 @@ import scala.util.control.NonFatal import org.apache.spark.SparkException import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.java.function._ import org.apache.spark.connect.proto +import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._ @@ -1479,4 +1481,10 @@ class Dataset[T] private[sql] ( /** @inheritdoc */ @scala.annotation.varargs override def agg(expr: Column, exprs: Column*): DataFrame = super.agg(expr, exprs: _*) + + /** @inheritdoc */ + override def rdd: RDD[T] = throwRddNotSupportedException() + + /** @inheritdoc */ + override def toJavaRDD: JavaRDD[T] = throwRddNotSupportedException() } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index 209ec88618c43..b5e31bf6642e0 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -29,10 +29,13 @@ import com.google.common.cache.{CacheBuilder, CacheLoader} import io.grpc.ClientInterceptor import org.apache.arrow.memory.RootAllocator +import org.apache.spark.SparkContext import org.apache.spark.annotation.{DeveloperApi, Experimental, Since} +import org.apache.spark.api.java.JavaRDD import org.apache.spark.connect.proto import org.apache.spark.connect.proto.ExecutePlanResponse import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalog.Catalog import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection} import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder} @@ -84,10 +87,14 @@ class SparkSession private[sql] ( private[sql] val observationRegistry = new ConcurrentHashMap[Long, Observation]() - private[sql] def hijackServerSideSessionIdForTesting(suffix: String) = { + private[sql] def hijackServerSideSessionIdForTesting(suffix: String): Unit = { client.hijackServerSideSessionIdForTesting(suffix) } + /** @inheritdoc */ + override def sparkContext: SparkContext = + throw new UnsupportedOperationException("sparkContext is not supported in Spark Connect.") + /** * Runtime configuration interface for Spark. * @@ -152,6 +159,30 @@ class SparkSession private[sql] ( createDataset(data.asScala.toSeq) } + /** @inheritdoc */ + override def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = + throwRddNotSupportedException() + + /** @inheritdoc */ + override def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = + throwRddNotSupportedException() + + /** @inheritdoc */ + override def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = + throwRddNotSupportedException() + + /** @inheritdoc */ + override def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = + throwRddNotSupportedException() + + /** @inheritdoc */ + override def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = + throwRddNotSupportedException() + + /** @inheritdoc */ + override def createDataset[T: Encoder](data: RDD[T]): Dataset[T] = + throwRddNotSupportedException() + /** @inheritdoc */ @Experimental def sql(sqlText: String, args: Array[_]): DataFrame = newDataFrame { builder => diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala index 154f2b0405fcd..886d8cc6c9c5b 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala @@ -52,4 +52,7 @@ package object sql { f(builder) column(builder.build()) } + + private[sql] def throwRddNotSupportedException(): Nothing = + throw new UnsupportedOperationException("RDDs are not supported in Spark Connect.") } diff --git a/pom.xml b/pom.xml index 9780204a121bc..f02df742c70a5 100644 --- a/pom.xml +++ b/pom.xml @@ -84,6 +84,7 @@ common/utils common/variant common/tags + sql/connect/shims core graphx mllib diff --git a/sql/api/pom.xml b/sql/api/pom.xml index 54cdc96fc40a2..9c50a2567c5fe 100644 --- a/sql/api/pom.xml +++ b/sql/api/pom.xml @@ -58,6 +58,12 @@ spark-sketch_${scala.binary.version} ${project.version} + + org.apache.spark + spark-connect-shims_${scala.binary.version} + ${project.version} + compile + org.json4s json4s-jackson_${scala.binary.version} diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala index 6e6ab7b9d95a4..3ea59888b895e 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala @@ -21,6 +21,8 @@ import scala.jdk.CollectionConverters._ import _root_.java.util import org.apache.spark.annotation.Stable +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, SparkCharVarcharUtils} @@ -308,6 +310,35 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { */ def json(jsonDataset: DS[String]): DS[Row] + /** + * Loads a `JavaRDD[String]` storing JSON objects (JSON + * Lines text format or newline-delimited JSON) and returns the result as + * a `DataFrame`. + * + * Unless the schema is specified using `schema` function, this function goes through the + * input once to determine the input schema. + * + * @note this method is not supported in Spark Connect. + * @param jsonRDD input RDD with one JSON object per record + * @since 1.4.0 + */ + @deprecated("Use json(Dataset[String]) instead.", "2.2.0") + def json(jsonRDD: JavaRDD[String]): DS[Row] + + /** + * Loads an `RDD[String]` storing JSON objects (JSON Lines + * text format or newline-delimited JSON) and returns the result as a `DataFrame`. + * + * Unless the schema is specified using `schema` function, this function goes through the + * input once to determine the input schema. + * + * @note this method is not supported in Spark Connect. + * @param jsonRDD input RDD with one JSON object per record + * @since 1.4.0 + */ + @deprecated("Use json(Dataset[String]) instead.", "2.2.0") + def json(jsonRDD: RDD[String]): DS[Row] + /** * Loads a CSV file and returns the result as a `DataFrame`. See the documentation on the other * overloaded `csv()` method for more details. diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala index fb8b6f2f483a1..08bc50f757824 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala @@ -22,7 +22,9 @@ import scala.reflect.runtime.universe.TypeTag import _root_.java.util import org.apache.spark.annotation.{DeveloperApi, Stable} +import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.java.function.{FilterFunction, FlatMapFunction, ForeachFunction, ForeachPartitionFunction, MapFunction, MapPartitionsFunction, ReduceFunction} +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{functions, AnalysisException, Column, DataFrameWriter, DataFrameWriterV2, Encoder, MergeIntoWriter, Observation, Row, TypedColumn} import org.apache.spark.sql.internal.{ToScalaUDF, UDFAdaptors} import org.apache.spark.sql.types.{Metadata, StructType} @@ -3055,4 +3057,31 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 1.6.0 */ def write: DataFrameWriter[T] + + /** + * Represents the content of the Dataset as an `RDD` of `T`. + * + * @note this method is not supported in Spark Connect. + * @group basic + * @since 1.6.0 + */ + def rdd: RDD[T] + + /** + * Returns the content of the Dataset as a `JavaRDD` of `T`s. + * + * @note this method is not supported in Spark Connect. + * @group basic + * @since 1.6.0 + */ + def toJavaRDD: JavaRDD[T] + + /** + * Returns the content of the Dataset as a `JavaRDD` of `T`s. + * + * @note this method is not supported in Spark Connect. + * @group basic + * @since 1.6.0 + */ + def javaRDD: JavaRDD[T] = toJavaRDD } diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala index cf502c746d24e..70a608f2896a6 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala @@ -25,7 +25,10 @@ import _root_.java.lang import _root_.java.net.URI import _root_.java.util +import org.apache.spark.SparkContext import org.apache.spark.annotation.{DeveloperApi, Experimental} +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Encoder, Row} import org.apache.spark.sql.types.StructType @@ -51,6 +54,13 @@ import org.apache.spark.sql.types.StructType */ abstract class SparkSession[DS[U] <: Dataset[U, DS]] extends Serializable with Closeable { + /** + * The Spark context associated with this Spark session. + * + * @note this method is not supported in Spark Connect. + */ + def sparkContext: SparkContext + /** * The version of Spark on which this application is running. * @@ -134,6 +144,82 @@ abstract class SparkSession[DS[U] <: Dataset[U, DS]] extends Serializable with C */ def createDataFrame(data: util.List[_], beanClass: Class[_]): DS[Row] + /** + * Creates a `DataFrame` from an RDD of Product (e.g. case classes, tuples). + * + * @note this method is not supported in Spark Connect. + * @since 2.0.0 + */ + def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DS[Row] + + /** + * :: DeveloperApi :: + * Creates a `DataFrame` from an `RDD` containing [[Row]]s using the given schema. + * It is important to make sure that the structure of every [[Row]] of the provided RDD matches + * the provided schema. Otherwise, there will be runtime exception. + * Example: + * {{{ + * import org.apache.spark.sql._ + * import org.apache.spark.sql.types._ + * val sparkSession = new org.apache.spark.sql.SparkSession(sc) + * + * val schema = + * StructType( + * StructField("name", StringType, false) :: + * StructField("age", IntegerType, true) :: Nil) + * + * val people = + * sc.textFile("examples/src/main/resources/people.txt").map( + * _.split(",")).map(p => Row(p(0), p(1).trim.toInt)) + * val dataFrame = sparkSession.createDataFrame(people, schema) + * dataFrame.printSchema + * // root + * // |-- name: string (nullable = false) + * // |-- age: integer (nullable = true) + * + * dataFrame.createOrReplaceTempView("people") + * sparkSession.sql("select name from people").collect.foreach(println) + * }}} + * + * @note this method is not supported in Spark Connect. + * @since 2.0.0 + */ + @DeveloperApi + def createDataFrame(rowRDD: RDD[Row], schema: StructType): DS[Row] + + /** + * :: DeveloperApi :: + * Creates a `DataFrame` from a `JavaRDD` containing [[Row]]s using the given schema. + * It is important to make sure that the structure of every [[Row]] of the provided RDD matches + * the provided schema. Otherwise, there will be runtime exception. + * + * @note this method is not supported in Spark Connect. + * @since 2.0.0 + */ + @DeveloperApi + def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DS[Row] + + /** + * Applies a schema to an RDD of Java Beans. + * + * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, + * SELECT * queries will return the columns in an undefined order. + * + * @since 2.0.0 + */ + def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DS[Row] + + /** + * Applies a schema to an RDD of Java Beans. + * + * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, + * SELECT * queries will return the columns in an undefined order. + * + * @note this method is not supported in Spark Connect. + * @since 2.0.0 + */ + def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DS[Row] + /* ------------------------------- * | Methods for creating DataSets | * ------------------------------- */ @@ -191,6 +277,17 @@ abstract class SparkSession[DS[U] <: Dataset[U, DS]] extends Serializable with C */ def createDataset[T: Encoder](data: util.List[T]): DS[T] + /** + * Creates a [[Dataset]] from an RDD of a given type. This method requires an + * encoder (to convert a JVM object of type `T` to and from the internal Spark SQL representation) + * that is generally created automatically through implicits from a `SparkSession`, or can be + * created explicitly by calling static methods on `Encoders`. + * + * @note this method is not supported in Spark Connect. + * @since 2.0.0 + */ + def createDataset[T: Encoder](data: RDD[T]): DS[T] + /** * Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements in a * range from 0 to `end` (exclusive) with step value 1. diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index ac944eb9d8070..d7b54cfa74738 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -44,6 +44,12 @@ org.apache.spark spark-sql-api_${scala.binary.version} ${project.version} + + + org.apache.spark + spark-connect-shims_${scala.binary.version} + + org.apache.spark diff --git a/sql/connect/server/pom.xml b/sql/connect/server/pom.xml index 110065fcd9722..07598ad620f93 100644 --- a/sql/connect/server/pom.xml +++ b/sql/connect/server/pom.xml @@ -52,6 +52,10 @@ spark-connect-common_${scala.binary.version} ${project.version} + + org.apache.spark + spark-connect-shims_${scala.binary.version} + com.google.guava guava diff --git a/sql/connect/shims/README.md b/sql/connect/shims/README.md new file mode 100644 index 0000000000000..07b593dd04b4b --- /dev/null +++ b/sql/connect/shims/README.md @@ -0,0 +1 @@ +This module defines shims used by the interface defined in sql/api. diff --git a/sql/connect/shims/pom.xml b/sql/connect/shims/pom.xml new file mode 100644 index 0000000000000..6bb12a927738c --- /dev/null +++ b/sql/connect/shims/pom.xml @@ -0,0 +1,41 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent_2.13 + 4.0.0-SNAPSHOT + ../../../pom.xml + + + spark-connect-shims_2.13 + jar + Spark Project Connect Shims + https://spark.apache.org/ + + connect-shims + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + diff --git a/sql/connect/shims/src/main/scala/org/apache/spark/api/java/shims.scala b/sql/connect/shims/src/main/scala/org/apache/spark/api/java/shims.scala new file mode 100644 index 0000000000000..45fae00247485 --- /dev/null +++ b/sql/connect/shims/src/main/scala/org/apache/spark/api/java/shims.scala @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.api.java + +class JavaRDD[T] diff --git a/sql/connect/shims/src/main/scala/org/apache/spark/rdd/shims.scala b/sql/connect/shims/src/main/scala/org/apache/spark/rdd/shims.scala new file mode 100644 index 0000000000000..b23f83fa9185c --- /dev/null +++ b/sql/connect/shims/src/main/scala/org/apache/spark/rdd/shims.scala @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.rdd + +class RDD[T] diff --git a/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala b/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala new file mode 100644 index 0000000000000..813b8e4859c28 --- /dev/null +++ b/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark + +class SparkContext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index f105a77cf253b..9f1f10d1fc714 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -174,30 +174,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) @scala.annotation.varargs override def json(paths: String*): DataFrame = super.json(paths: _*) - /** - * Loads a `JavaRDD[String]` storing JSON objects (JSON - * Lines text format or newline-delimited JSON) and returns the result as - * a `DataFrame`. - * - * Unless the schema is specified using `schema` function, this function goes through the - * input once to determine the input schema. - * - * @param jsonRDD input RDD with one JSON object per record - * @since 1.4.0 - */ + /** @inheritdoc */ @deprecated("Use json(Dataset[String]) instead.", "2.2.0") def json(jsonRDD: JavaRDD[String]): DataFrame = json(jsonRDD.rdd) - /** - * Loads an `RDD[String]` storing JSON objects (JSON Lines - * text format or newline-delimited JSON) and returns the result as a `DataFrame`. - * - * Unless the schema is specified using `schema` function, this function goes through the - * input once to determine the input schema. - * - * @param jsonRDD input RDD with one JSON object per record - * @since 1.4.0 - */ + /** @inheritdoc */ @deprecated("Use json(Dataset[String]) instead.", "2.2.0") def json(jsonRDD: RDD[String]): DataFrame = { json(sparkSession.createDataset(jsonRDD)(Encoders.STRING)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 0fab60a948423..1deaa6ded25a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1540,12 +1540,7 @@ class Dataset[T] private[sql]( sparkSession.sessionState.executePlan(deserialized) } - /** - * Represents the content of the Dataset as an `RDD` of `T`. - * - * @group basic - * @since 1.6.0 - */ + /** @inheritdoc */ lazy val rdd: RDD[T] = { val objectType = exprEnc.deserializer.dataType rddQueryExecution.toRdd.mapPartitions { rows => @@ -1553,20 +1548,9 @@ class Dataset[T] private[sql]( } } - /** - * Returns the content of the Dataset as a `JavaRDD` of `T`s. - * @group basic - * @since 1.6.0 - */ + /** @inheritdoc */ def toJavaRDD: JavaRDD[T] = rdd.toJavaRDD() - /** - * Returns the content of the Dataset as a `JavaRDD` of `T`s. - * @group basic - * @since 1.6.0 - */ - def javaRDD: JavaRDD[T] = toJavaRDD - protected def createTempView( viewName: String, replace: Boolean, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index a7fb71d95d147..64d391608e8b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -284,11 +284,7 @@ class SparkSession private( new Dataset(self, LocalRelation(encoder.schema), encoder) } - /** - * Creates a `DataFrame` from an RDD of Product (e.g. case classes, tuples). - * - * @since 2.0.0 - */ + /** @inheritdoc */ def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = withActive { val encoder = Encoders.product[A] Dataset.ofRows(self, ExternalRDD(rdd, self)(encoder)) @@ -301,37 +297,7 @@ class SparkSession private( Dataset.ofRows(self, LocalRelation.fromProduct(attributeSeq, data)) } - /** - * :: DeveloperApi :: - * Creates a `DataFrame` from an `RDD` containing [[Row]]s using the given schema. - * It is important to make sure that the structure of every [[Row]] of the provided RDD matches - * the provided schema. Otherwise, there will be runtime exception. - * Example: - * {{{ - * import org.apache.spark.sql._ - * import org.apache.spark.sql.types._ - * val sparkSession = new org.apache.spark.sql.SparkSession(sc) - * - * val schema = - * StructType( - * StructField("name", StringType, false) :: - * StructField("age", IntegerType, true) :: Nil) - * - * val people = - * sc.textFile("examples/src/main/resources/people.txt").map( - * _.split(",")).map(p => Row(p(0), p(1).trim.toInt)) - * val dataFrame = sparkSession.createDataFrame(people, schema) - * dataFrame.printSchema - * // root - * // |-- name: string (nullable = false) - * // |-- age: integer (nullable = true) - * - * dataFrame.createOrReplaceTempView("people") - * sparkSession.sql("select name from people").collect.foreach(println) - * }}} - * - * @since 2.0.0 - */ + /** @inheritdoc */ @DeveloperApi def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = withActive { val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType] @@ -343,14 +309,7 @@ class SparkSession private( internalCreateDataFrame(catalystRows.setName(rowRDD.name), schema) } - /** - * :: DeveloperApi :: - * Creates a `DataFrame` from a `JavaRDD` containing [[Row]]s using the given schema. - * It is important to make sure that the structure of every [[Row]] of the provided RDD matches - * the provided schema. Otherwise, there will be runtime exception. - * - * @since 2.0.0 - */ + /** @inheritdoc */ @DeveloperApi def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = { val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType] @@ -364,14 +323,7 @@ class SparkSession private( Dataset.ofRows(self, LocalRelation.fromExternalRows(toAttributes(replaced), rows.asScala.toSeq)) } - /** - * Applies a schema to an RDD of Java Beans. - * - * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, - * SELECT * queries will return the columns in an undefined order. - * - * @since 2.0.0 - */ + /** @inheritdoc */ def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = withActive { val attributeSeq: Seq[AttributeReference] = getSchema(beanClass) val className = beanClass.getName @@ -382,14 +334,7 @@ class SparkSession private( Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd.setName(rdd.name))(self)) } - /** - * Applies a schema to an RDD of Java Beans. - * - * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, - * SELECT * queries will return the columns in an undefined order. - * - * @since 2.0.0 - */ + /** @inheritdoc */ def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = { createDataFrame(rdd.rdd, beanClass) } @@ -424,14 +369,7 @@ class SparkSession private( Dataset[T](self, plan) } - /** - * Creates a [[Dataset]] from an RDD of a given type. This method requires an - * encoder (to convert a JVM object of type `T` to and from the internal Spark SQL representation) - * that is generally created automatically through implicits from a `SparkSession`, or can be - * created explicitly by calling static methods on [[Encoders]]. - * - * @since 2.0.0 - */ + /** @inheritdoc */ def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = { Dataset[T](self, ExternalRDD(data, self)) } From 91bd6a22c875c686cca4cb6fea2682a13ef2f416 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Wed, 25 Sep 2024 00:18:02 -0400 Subject: [PATCH 2/8] Fix SBT build --- project/SparkBuild.scala | 40 +++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 2f390cb70baa8..a5eea66400714 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -45,24 +45,24 @@ object BuildCommons { private val buildLocation = file(".").getAbsoluteFile.getParentFile - val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer, tokenProviderKafka010, sqlKafka010, avro, protobuf) = Seq( - "catalyst", "sql", "hive", "hive-thriftserver", "token-provider-kafka-0-10", "sql-kafka-0-10", "avro", "protobuf" - ).map(ProjectRef(buildLocation, _)) + val sqlProjects@Seq(sqlApi, catalyst, sql, hive, hiveThriftServer, tokenProviderKafka010, sqlKafka010, avro, protobuf) = + Seq("sql-api", "catalyst", "sql", "hive", "hive-thriftserver", "token-provider-kafka-0-10", + "sql-kafka-0-10", "avro", "protobuf").map(ProjectRef(buildLocation, _)) val streamingProjects@Seq(streaming, streamingKafka010) = Seq("streaming", "streaming-kafka-0-10").map(ProjectRef(buildLocation, _)) - val connectCommon = ProjectRef(buildLocation, "connect-common") - val connect = ProjectRef(buildLocation, "connect") - val connectClient = ProjectRef(buildLocation, "connect-client-jvm") + val connectProjects@Seq(connectCommon, connect, connectClient, connectShims) = + Seq("connect-common", "connect", "connect-client-jvm", "connect-shims") + .map(ProjectRef(buildLocation, _)) val allProjects@Seq( core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe, tags, sketch, kvstore, - commonUtils, sqlApi, variant, _* + commonUtils, variant, _* ) = Seq( "core", "graphx", "mllib", "mllib-local", "repl", "network-common", "network-shuffle", "launcher", "unsafe", - "tags", "sketch", "kvstore", "common-utils", "sql-api", "variant" - ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ Seq(connectCommon, connect, connectClient) + "tags", "sketch", "kvstore", "common-utils", "variant" + ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ connectProjects val optionallyEnabledProjects@Seq(kubernetes, yarn, sparkGangliaLgpl, streamingKinesisAsl, @@ -354,13 +354,19 @@ object SparkBuild extends PomBuild { // Note ordering of these settings matter. /* Enable shared settings on all projects */ - (allProjects ++ optionallyEnabledProjects ++ assemblyProjects ++ copyJarsProjects ++ Seq(spark, tools)) - .foreach(enable(sharedSettings ++ DependencyOverrides.settings ++ + val everyProject = (allProjects ++ optionallyEnabledProjects ++ assemblyProjects ++ copyJarsProjects ++ Seq(spark, tools)) + everyProject.foreach(enable(sharedSettings ++ DependencyOverrides.settings ++ ExcludedDependencies.settings ++ Checkstyle.settings)) /* Enable tests settings for all projects except examples, assembly and tools */ (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings)) + /** Remove connect shims from every project except the ones used by the connect client. */ + val projectsWithShims = Set(sqlApi, connectCommon, connectClient) + everyProject.filterNot(projectsWithShims).foreach { + enable(ExcludeShims.settings) + } + val mimaProjects = allProjects.filterNot { x => Seq( spark, hive, hiveThriftServer, repl, networkCommon, networkShuffle, networkYarn, @@ -1083,6 +1089,18 @@ object ExcludedDependencies { ) } +/** + * This excludes the spark-connect-shims module from a module. This is needed because SBT (or the + * POM reader) does not seem to respect module exclusions. + */ +object ExcludeShims { + lazy val settings = Seq( + Compile / internalDependencyClasspath ~= { cp => + cp.filterNot(_.data.name.contains("spark-connect-shims")) + } + ) +} + /** * Project to pull previous artifacts of Spark for generating Mima excludes. */ From 7f9993c688ad2e57506bc233dcbd08ea22fba33f Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Wed, 25 Sep 2024 20:37:58 -0400 Subject: [PATCH 3/8] Fix SBT build --- project/SparkBuild.scala | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a5eea66400714..678c88deef279 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -354,19 +354,13 @@ object SparkBuild extends PomBuild { // Note ordering of these settings matter. /* Enable shared settings on all projects */ - val everyProject = (allProjects ++ optionallyEnabledProjects ++ assemblyProjects ++ copyJarsProjects ++ Seq(spark, tools)) - everyProject.foreach(enable(sharedSettings ++ DependencyOverrides.settings ++ - ExcludedDependencies.settings ++ Checkstyle.settings)) + (allProjects ++ optionallyEnabledProjects ++ assemblyProjects ++ copyJarsProjects ++ Seq(spark, tools)) + .foreach(enable(sharedSettings ++ DependencyOverrides.settings ++ + ExcludedDependencies.settings ++ Checkstyle.settings ++ ExcludeShims.settings)) /* Enable tests settings for all projects except examples, assembly and tools */ (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings)) - /** Remove connect shims from every project except the ones used by the connect client. */ - val projectsWithShims = Set(sqlApi, connectCommon, connectClient) - everyProject.filterNot(projectsWithShims).foreach { - enable(ExcludeShims.settings) - } - val mimaProjects = allProjects.filterNot { x => Seq( spark, hive, hiveThriftServer, repl, networkCommon, networkShuffle, networkYarn, @@ -1090,14 +1084,22 @@ object ExcludedDependencies { } /** - * This excludes the spark-connect-shims module from a module. This is needed because SBT (or the - * POM reader) does not seem to respect module exclusions. + * This excludes the spark-connect-shims module from a module when it has a dependency on + * spark-core. This is needed because SBT (or the POM reader) does not seem to respect module + * exclusions. */ object ExcludeShims { - lazy val settings = Seq( - Compile / internalDependencyClasspath ~= { cp => - cp.filterNot(_.data.name.contains("spark-connect-shims")) + def excludeShims(classpath: Seq[Attributed[File]]): Seq[Attributed[File]] = { + if (classpath.exists(_.data.name.contains("spark-core"))) { + classpath.filterNot(_.data.name.contains("spark-connect-shims")) + } else { + classpath } + } + + lazy val settings = Seq( + Compile / internalDependencyClasspath ~= excludeShims _, + Test / internalDependencyClasspath ~= excludeShims _, ) } From 42f53643d44ef4ae8e2c187cd0314c02d0fce454 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Wed, 25 Sep 2024 22:33:31 -0400 Subject: [PATCH 4/8] Style --- .../org/apache/spark/sql/SparkSession.scala | 2 +- .../spark/sql/api/DataFrameReader.scala | 31 ++++++----- .../org/apache/spark/sql/api/Dataset.scala | 9 ++-- .../apache/spark/sql/api/SparkSession.scala | 51 ++++++++++--------- 4 files changed, 51 insertions(+), 42 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index cc9688f1145db..f5798133c60d1 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -152,7 +152,7 @@ class SparkSession private[sql] ( } /** @inheritdoc */ - override def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = + override def createDataFrame[A <: Product: TypeTag](rdd: RDD[A]): DataFrame = throwRddNotSupportedException() /** @inheritdoc */ diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala index 03d48593e92cd..8c88387714228 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala @@ -311,30 +311,33 @@ abstract class DataFrameReader { */ def json(jsonDataset: DS[String]): Dataset[Row] - /** - * Loads a `JavaRDD[String]` storing JSON objects (JSON - * Lines text format or newline-delimited JSON) and returns the result as - * a `DataFrame`. + /** + * Loads a `JavaRDD[String]` storing JSON objects (JSON Lines + * text format or newline-delimited JSON) and returns the result as a `DataFrame`. * - * Unless the schema is specified using `schema` function, this function goes through the - * input once to determine the input schema. + * Unless the schema is specified using `schema` function, this function goes through the input + * once to determine the input schema. * - * @note this method is not supported in Spark Connect. - * @param jsonRDD input RDD with one JSON object per record + * @note + * this method is not supported in Spark Connect. + * @param jsonRDD + * input RDD with one JSON object per record * @since 1.4.0 */ @deprecated("Use json(Dataset[String]) instead.", "2.2.0") def json(jsonRDD: JavaRDD[String]): DS[Row] /** - * Loads an `RDD[String]` storing JSON objects (JSON Lines - * text format or newline-delimited JSON) and returns the result as a `DataFrame`. + * Loads an `RDD[String]` storing JSON objects (JSON Lines text + * format or newline-delimited JSON) and returns the result as a `DataFrame`. * - * Unless the schema is specified using `schema` function, this function goes through the - * input once to determine the input schema. + * Unless the schema is specified using `schema` function, this function goes through the input + * once to determine the input schema. * - * @note this method is not supported in Spark Connect. - * @param jsonRDD input RDD with one JSON object per record + * @note + * this method is not supported in Spark Connect. + * @param jsonRDD + * input RDD with one JSON object per record * @since 1.4.0 */ @deprecated("Use json(Dataset[String]) instead.", "2.2.0") diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala index 4ec13535b9817..c277b4cab85c1 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala @@ -3104,7 +3104,8 @@ abstract class Dataset[T] extends Serializable { /** * Represents the content of the Dataset as an `RDD` of `T`. * - * @note this method is not supported in Spark Connect. + * @note + * this method is not supported in Spark Connect. * @group basic * @since 1.6.0 */ @@ -3113,7 +3114,8 @@ abstract class Dataset[T] extends Serializable { /** * Returns the content of the Dataset as a `JavaRDD` of `T`s. * - * @note this method is not supported in Spark Connect. + * @note + * this method is not supported in Spark Connect. * @group basic * @since 1.6.0 */ @@ -3122,7 +3124,8 @@ abstract class Dataset[T] extends Serializable { /** * Returns the content of the Dataset as a `JavaRDD` of `T`s. * - * @note this method is not supported in Spark Connect. + * @note + * this method is not supported in Spark Connect. * @group basic * @since 1.6.0 */ diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala index 14f0fe28adc8e..f03f7a60c262b 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala @@ -58,7 +58,8 @@ abstract class SparkSession extends Serializable with Closeable { /** * The Spark context associated with this Spark session. * - * @note this method is not supported in Spark Connect. + * @note + * this method is not supported in Spark Connect. */ def sparkContext: SparkContext @@ -159,17 +160,16 @@ abstract class SparkSession extends Serializable with Closeable { /** * Creates a `DataFrame` from an RDD of Product (e.g. case classes, tuples). * - * @note this method is not supported in Spark Connect. + * @note + * this method is not supported in Spark Connect. * @since 2.0.0 */ - def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): Dataset[Row] + def createDataFrame[A <: Product: TypeTag](rdd: RDD[A]): Dataset[Row] /** - * :: DeveloperApi :: - * Creates a `DataFrame` from an `RDD` containing [[Row]]s using the given schema. - * It is important to make sure that the structure of every [[Row]] of the provided RDD matches - * the provided schema. Otherwise, there will be runtime exception. - * Example: + * :: DeveloperApi :: Creates a `DataFrame` from an `RDD` containing [[Row]]s using the given + * schema. It is important to make sure that the structure of every [[Row]] of the provided RDD + * matches the provided schema. Otherwise, there will be runtime exception. Example: * {{{ * import org.apache.spark.sql._ * import org.apache.spark.sql.types._ @@ -193,19 +193,20 @@ abstract class SparkSession extends Serializable with Closeable { * sparkSession.sql("select name from people").collect.foreach(println) * }}} * - * @note this method is not supported in Spark Connect. + * @note + * this method is not supported in Spark Connect. * @since 2.0.0 */ @DeveloperApi def createDataFrame(rowRDD: RDD[Row], schema: StructType): Dataset[Row] /** - * :: DeveloperApi :: - * Creates a `DataFrame` from a `JavaRDD` containing [[Row]]s using the given schema. - * It is important to make sure that the structure of every [[Row]] of the provided RDD matches - * the provided schema. Otherwise, there will be runtime exception. + * :: DeveloperApi :: Creates a `DataFrame` from a `JavaRDD` containing [[Row]]s using the given + * schema. It is important to make sure that the structure of every [[Row]] of the provided RDD + * matches the provided schema. Otherwise, there will be runtime exception. * - * @note this method is not supported in Spark Connect. + * @note + * this method is not supported in Spark Connect. * @since 2.0.0 */ @DeveloperApi @@ -214,8 +215,8 @@ abstract class SparkSession extends Serializable with Closeable { /** * Applies a schema to an RDD of Java Beans. * - * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, - * SELECT * queries will return the columns in an undefined order. + * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, SELECT * queries + * will return the columns in an undefined order. * * @since 2.0.0 */ @@ -224,10 +225,11 @@ abstract class SparkSession extends Serializable with Closeable { /** * Applies a schema to an RDD of Java Beans. * - * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, - * SELECT * queries will return the columns in an undefined order. + * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, SELECT * queries + * will return the columns in an undefined order. * - * @note this method is not supported in Spark Connect. + * @note + * this method is not supported in Spark Connect. * @since 2.0.0 */ def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): Dataset[Row] @@ -290,12 +292,13 @@ abstract class SparkSession extends Serializable with Closeable { def createDataset[T: Encoder](data: util.List[T]): Dataset[T] /** - * Creates a [[Dataset]] from an RDD of a given type. This method requires an - * encoder (to convert a JVM object of type `T` to and from the internal Spark SQL representation) - * that is generally created automatically through implicits from a `SparkSession`, or can be - * created explicitly by calling static methods on `Encoders`. + * Creates a [[Dataset]] from an RDD of a given type. This method requires an encoder (to + * convert a JVM object of type `T` to and from the internal Spark SQL representation) that is + * generally created automatically through implicits from a `SparkSession`, or can be created + * explicitly by calling static methods on `Encoders`. * - * @note this method is not supported in Spark Connect. + * @note + * this method is not supported in Spark Connect. * @since 2.0.0 */ def createDataset[T: Encoder](data: RDD[T]): Dataset[T] From d08b3cb08e894b5b512cee4d87da7ac210a17cd7 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Thu, 26 Sep 2024 13:53:52 -0400 Subject: [PATCH 5/8] MiMa/Docs/Implicits --- project/SparkBuild.scala | 2 +- .../org/apache/spark/sql/api/SQLImplicits.scala | 9 +++++++++ .../org/apache/spark/sql/api/SparkSession.scala | 14 ++++++++------ .../scala/org/apache/spark/sql/SQLImplicits.scala | 12 ------------ 4 files changed, 18 insertions(+), 19 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index bc4ac67e41b6c..6d57ec8af7149 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -369,7 +369,7 @@ object SparkBuild extends PomBuild { Seq( spark, hive, hiveThriftServer, repl, networkCommon, networkShuffle, networkYarn, unsafe, tags, tokenProviderKafka010, sqlKafka010, connectCommon, connect, connectClient, - variant + variant, connectShims ).contains(x) } diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SQLImplicits.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/SQLImplicits.scala index f6b44e168390a..5e022570d3ca7 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/SQLImplicits.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SQLImplicits.scala @@ -23,6 +23,7 @@ import scala.reflect.runtime.universe.TypeTag import _root_.java +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ColumnName, DatasetHolder, Encoder, Encoders} import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder @@ -278,6 +279,14 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits with Serializable { new DatasetHolder(session.createDataset(s).asInstanceOf[DS[T]]) } + /** + * Creates a [[Dataset]] from an RDD. + * + * @since 1.6.0 + */ + implicit def rddToDatasetHolder[T: Encoder](rdd: RDD[T]): DatasetHolder[T, DS] = + new DatasetHolder(session.createDataset(rdd).asInstanceOf[DS[T]]) + /** * An implicit conversion that turns a Scala `Symbol` into a [[org.apache.spark.sql.Column]]. * @since 1.3.0 diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala index f03f7a60c262b..5c68605ddf042 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala @@ -167,9 +167,10 @@ abstract class SparkSession extends Serializable with Closeable { def createDataFrame[A <: Product: TypeTag](rdd: RDD[A]): Dataset[Row] /** - * :: DeveloperApi :: Creates a `DataFrame` from an `RDD` containing [[Row]]s using the given - * schema. It is important to make sure that the structure of every [[Row]] of the provided RDD - * matches the provided schema. Otherwise, there will be runtime exception. Example: + * :: DeveloperApi :: Creates a `DataFrame` from an `RDD` containing + * [[org.apache.spark.sql.Row]]s using the given schema. It is important to make sure that the + * structure of every [[org.apache.spark.sql.Row]] of the provided RDD matches the provided + * schema. Otherwise, there will be runtime exception. Example: * {{{ * import org.apache.spark.sql._ * import org.apache.spark.sql.types._ @@ -201,9 +202,10 @@ abstract class SparkSession extends Serializable with Closeable { def createDataFrame(rowRDD: RDD[Row], schema: StructType): Dataset[Row] /** - * :: DeveloperApi :: Creates a `DataFrame` from a `JavaRDD` containing [[Row]]s using the given - * schema. It is important to make sure that the structure of every [[Row]] of the provided RDD - * matches the provided schema. Otherwise, there will be runtime exception. + * :: DeveloperApi :: Creates a `DataFrame` from a `JavaRDD` containing + * [[org.apache.spark.sql.Row]]s using the given schema. It is important to make sure that the + * structure of every [[org.apache.spark.sql.Row]] of the provided RDD matches the provided + * schema. Otherwise, there will be runtime exception. * * @note * this method is not supported in Spark Connect. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index 1bc7e3ee98e76..b6ed50447109d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -17,21 +17,9 @@ package org.apache.spark.sql -import scala.language.implicitConversions - -import org.apache.spark.rdd.RDD - /** @inheritdoc */ abstract class SQLImplicits extends api.SQLImplicits { type DS[U] = Dataset[U] protected def session: SparkSession - - /** - * Creates a [[Dataset]] from an RDD. - * - * @since 1.6.0 - */ - implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T, Dataset] = - new DatasetHolder(session.createDataset(rdd)) } From d776b972e7b4f1002f77bff02a659a49df1ac432 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Mon, 30 Sep 2024 13:46:49 -0700 Subject: [PATCH 6/8] Exclude shims from runtime classpath --- project/SparkBuild.scala | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 6d57ec8af7149..72a577ea5aae0 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -1088,22 +1088,26 @@ object ExcludedDependencies { } /** - * This excludes the spark-connect-shims module from a module when it has a dependency on - * spark-core. This is needed because SBT (or the POM reader) does not seem to respect module - * exclusions. + * This excludes the spark-connect-shims module from a module when it is not part of the connect + * client dependencies. */ object ExcludeShims { - def excludeShims(classpath: Seq[Attributed[File]]): Seq[Attributed[File]] = { - if (classpath.exists(_.data.name.contains("spark-core"))) { - classpath.filterNot(_.data.name.contains("spark-connect-shims")) - } else { - classpath - } - } - + val shimmedProjects = Set("spark-sql-api", "spark-connect-common", "spark-connect-client-jvm") + val classPathFilter = TaskKey[Classpath => Classpath]("filter for classpath") lazy val settings = Seq( - Compile / internalDependencyClasspath ~= excludeShims _, - Test / internalDependencyClasspath ~= excludeShims _, + classPathFilter := { + if (!shimmedProjects(moduleName.value)) { + cp => cp.filterNot(_.data.name.contains("spark-connect-shims")) + } else { + identity _ + } + }, + Compile / internalDependencyClasspath := + classPathFilter.value((Compile / internalDependencyClasspath).value), + Runtime / internalDependencyClasspath := + classPathFilter.value((Runtime / internalDependencyClasspath).value), + Test / internalDependencyClasspath := + classPathFilter.value((Test / internalDependencyClasspath).value), ) } From 15be1b570d4e27d62e612e0f3e5ac0b4184d07ad Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Mon, 30 Sep 2024 14:42:28 -0700 Subject: [PATCH 7/8] Fix docs --- project/SparkBuild.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 8f7b4432b3b30..4f4336fe461ac 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -1480,10 +1480,12 @@ object SparkUnidoc extends SharedUnidocSettings { lazy val settings = baseSettings ++ Seq( (ScalaUnidoc / unidoc / unidocProjectFilter) := inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes, - yarn, tags, streamingKafka010, sqlKafka010, connectCommon, connect, connectClient, protobuf), + yarn, tags, streamingKafka010, sqlKafka010, connectCommon, connect, connectClient, + connectShims, protobuf), (JavaUnidoc / unidoc / unidocProjectFilter) := inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes, - yarn, tags, streamingKafka010, sqlKafka010, connectCommon, connect, connectClient, protobuf), + yarn, tags, streamingKafka010, sqlKafka010, connectCommon, connect, connectClient, + connectShims, protobuf), ) } From 82b65627037d65891a4a2e9b7e01fae4a632c3ef Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Tue, 8 Oct 2024 14:39:07 -0400 Subject: [PATCH 8/8] Fix build... again... --- project/SparkBuild.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 4f4336fe461ac..5882fcbf336b0 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -1104,10 +1104,16 @@ object ExcludeShims { }, Compile / internalDependencyClasspath := classPathFilter.value((Compile / internalDependencyClasspath).value), + Compile / internalDependencyAsJars := + classPathFilter.value((Compile / internalDependencyAsJars).value), Runtime / internalDependencyClasspath := classPathFilter.value((Runtime / internalDependencyClasspath).value), + Runtime / internalDependencyAsJars := + classPathFilter.value((Runtime / internalDependencyAsJars).value), Test / internalDependencyClasspath := classPathFilter.value((Test / internalDependencyClasspath).value), + Test / internalDependencyAsJars := + classPathFilter.value((Test / internalDependencyAsJars).value), ) }