Skip to content

Commit

Permalink
[SPARK-45384][CORE][SQL][SS][DSTREAM][CONNECT] Replace `TraversableOn…
Browse files Browse the repository at this point in the history
…ce` with `IterableOnce`

### What changes were proposed in this pull request?
This pr replace `TraversableOnce` with `IterableOnce` because `TraversableOnce` has been marked as deprecated after Scala 2.13.0.

```
deprecated("Use IterableOnce instead of TraversableOnce", "2.13.0")
type TraversableOnce[+A] = scala.collection.IterableOnce[A]
```

Additionally, this PR renames two functions:
- rename `UdfUtils#traversableOnceToSeq` to `UdfUtils#iterableOnceToSeq`
- rename `GenerateExec#codeGenTraversableOnce` to `GenerateExec#codeGenIterableOnce`

### Why are the changes needed?
Clean up deprecated Scala API usage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43186 from LuciferYang/SPARK-45384.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
  • Loading branch information
LuciferYang committed Oct 1, 2023
1 parent 58c24a5 commit b70c88b
Show file tree
Hide file tree
Showing 34 changed files with 88 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2733,7 +2733,7 @@ class Dataset[T] private[sql] (
* @group typedrel
* @since 3.5.0
*/
def flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U] =
def flatMap[U: Encoder](func: T => IterableOnce[U]): Dataset[U] =
mapPartitions(UdfUtils.flatMapFuncToMapPartitionsAdaptor(func))

/**
Expand Down Expand Up @@ -2775,9 +2775,9 @@ class Dataset[T] private[sql] (
* @since 3.5.0
*/
@deprecated("use flatMap() or select() with functions.explode() instead", "3.5.0")
def explode[A <: Product: TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame = {
def explode[A <: Product: TypeTag](input: Column*)(f: Row => IterableOnce[A]): DataFrame = {
val generator = ScalarUserDefinedFunction(
UdfUtils.traversableOnceToSeq(f),
UdfUtils.iterableOnceToSeq(f),
UnboundRowEncoder :: Nil,
ScalaReflection.encoderFor[Seq[A]])
select(col("*"), functions.inline(generator(struct(input: _*))))
Expand Down Expand Up @@ -2807,9 +2807,9 @@ class Dataset[T] private[sql] (
*/
@deprecated("use flatMap() or select() with functions.explode() instead", "3.5.0")
def explode[A, B: TypeTag](inputColumn: String, outputColumn: String)(
f: A => TraversableOnce[B]): DataFrame = {
f: A => IterableOnce[B]): DataFrame = {
val generator = ScalarUserDefinedFunction(
UdfUtils.traversableOnceToSeq(f),
UdfUtils.iterableOnceToSeq(f),
Nil,
ScalaReflection.encoderFor[Seq[B]])
select(col("*"), functions.explode(generator(col(inputColumn))).as((outputColumn)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable {
*
* @since 3.5.0
*/
def flatMapGroups[U: Encoder](f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U] = {
def flatMapGroups[U: Encoder](f: (K, Iterator[V]) => IterableOnce[U]): Dataset[U] = {
flatMapSortedGroups()(f)
}

Expand Down Expand Up @@ -162,7 +162,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable {
* @since 3.5.0
*/
def flatMapSortedGroups[U: Encoder](sortExprs: Column*)(
f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U] = {
f: (K, Iterator[V]) => IterableOnce[U]): Dataset[U] = {
throw new UnsupportedOperationException
}

Expand Down Expand Up @@ -397,7 +397,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable {
* @since 3.5.0
*/
def cogroup[U, R: Encoder](other: KeyValueGroupedDataset[K, U])(
f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = {
f: (K, Iterator[V], Iterator[U]) => IterableOnce[R]): Dataset[R] = {
cogroupSorted(other)()()(f)
}

Expand Down Expand Up @@ -433,7 +433,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable {
*/
def cogroupSorted[U, R: Encoder](other: KeyValueGroupedDataset[K, U])(thisSortExprs: Column*)(
otherSortExprs: Column*)(
f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = {
f: (K, Iterator[V], Iterator[U]) => IterableOnce[R]): Dataset[R] = {
throw new UnsupportedOperationException
}

Expand Down Expand Up @@ -865,7 +865,7 @@ private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
}

override def flatMapSortedGroups[U: Encoder](sortExprs: Column*)(
f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U] = {
f: (K, Iterator[V]) => IterableOnce[U]): Dataset[U] = {
// Apply mapValues changes to the udf
val nf =
if (valueMapFunc == UdfUtils.identical()) f else UdfUtils.mapValuesAdaptor(f, valueMapFunc)
Expand All @@ -881,7 +881,7 @@ private class KeyValueGroupedDatasetImpl[K, V, IK, IV](

override def cogroupSorted[U, R: Encoder](other: KeyValueGroupedDataset[K, U])(
thisSortExprs: Column*)(otherSortExprs: Column*)(
f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = {
f: (K, Iterator[V], Iterator[U]) => IterableOnce[R]): Dataset[R] = {
assert(other.isInstanceOf[KeyValueGroupedDatasetImpl[K, U, _, _]])
val otherImpl = other.asInstanceOf[KeyValueGroupedDatasetImpl[K, U, _, _]]
// Apply mapValues changes to the udf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM
}

test("Dataset result collection") {
def checkResult(rows: TraversableOnce[java.lang.Long], expectedValues: Long*): Unit = {
def checkResult(rows: IterableOnce[java.lang.Long], expectedValues: Long*): Unit = {
rows.toIterator.zipAll(expectedValues.iterator, null, null).foreach {
case (actual, expected) => assert(actual === expected)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[sql] object UdfUtils extends Serializable {
}

def flatMapFuncToMapPartitionsAdaptor[T, U](
f: T => TraversableOnce[U]): Iterator[T] => Iterator[U] = _.flatMap(f)
f: T => IterableOnce[U]): Iterator[T] => Iterator[U] = _.flatMap(f)

def filterFuncToScalaFunc[T](f: FilterFunction[T]): T => Boolean = f.call

Expand All @@ -62,38 +62,38 @@ private[sql] object UdfUtils extends Serializable {
def foreachBatchFuncToScalaFunc[D](f: VoidFunction2[D, java.lang.Long]): (D, Long) => Unit =
(d, i) => f.call(d, i)

def flatMapFuncToScalaFunc[T, U](f: FlatMapFunction[T, U]): T => TraversableOnce[U] = x =>
def flatMapFuncToScalaFunc[T, U](f: FlatMapFunction[T, U]): T => IterableOnce[U] = x =>
f.call(x).asScala

def flatMapGroupsFuncToScalaFunc[K, V, U](
f: FlatMapGroupsFunction[K, V, U]): (K, Iterator[V]) => TraversableOnce[U] = (key, data) =>
f: FlatMapGroupsFunction[K, V, U]): (K, Iterator[V]) => IterableOnce[U] = (key, data) =>
f.call(key, data.asJava).asScala

def mapGroupsFuncToScalaFunc[K, V, U](f: MapGroupsFunction[K, V, U]): (K, Iterator[V]) => U =
(key, data) => f.call(key, data.asJava)

def coGroupFunctionToScalaFunc[K, V, U, R](
f: CoGroupFunction[K, V, U, R]): (K, Iterator[V], Iterator[U]) => TraversableOnce[R] =
f: CoGroupFunction[K, V, U, R]): (K, Iterator[V], Iterator[U]) => IterableOnce[R] =
(key, left, right) => f.call(key, left.asJava, right.asJava).asScala

def mapGroupsFuncToFlatMapAdaptor[K, V, U](
f: (K, Iterator[V]) => U): (K, Iterator[V]) => TraversableOnce[U] = {
f: (K, Iterator[V]) => U): (K, Iterator[V]) => IterableOnce[U] = {
(key: K, it: Iterator[V]) => Iterator(f(key, it))
}

def mapValuesAdaptor[K, V, U, IV](
f: (K, Iterator[V]) => TraversableOnce[U],
valueMapFunc: IV => V): (K, Iterator[IV]) => TraversableOnce[U] = {
f: (K, Iterator[V]) => IterableOnce[U],
valueMapFunc: IV => V): (K, Iterator[IV]) => IterableOnce[U] = {
(k: K, itr: Iterator[IV]) =>
{
f(k, itr.map(v => valueMapFunc(v)))
}
}

def mapValuesAdaptor[K, V, U, R, IV, IU](
f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R],
f: (K, Iterator[V], Iterator[U]) => IterableOnce[R],
valueMapFunc: IV => V,
otherValueMapFunc: IU => U): (K, Iterator[IV], Iterator[IU]) => TraversableOnce[R] = {
otherValueMapFunc: IU => U): (K, Iterator[IV], Iterator[IU]) => IterableOnce[R] = {
(k: K, itr: Iterator[IV], otherItr: Iterator[IU]) =>
{
f(k, itr.map(v => valueMapFunc(v)), otherItr.map(u => otherValueMapFunc(u)))
Expand Down Expand Up @@ -131,7 +131,7 @@ private[sql] object UdfUtils extends Serializable {

def noOp[V, K](): V => K = _ => null.asInstanceOf[K]

def traversableOnceToSeq[A, B](f: A => TraversableOnce[B]): A => Seq[B] = { value =>
def iterableOnceToSeq[A, B](f: A => IterableOnce[B]): A => Seq[B] = { value =>
f(value).toSeq
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
SerializeFromObject(udf.outputNamedExpression, flatMapGroupsWithState)
} else {
val mapped = new MapGroups(
udf.function.asInstanceOf[(Any, Iterator[Any]) => TraversableOnce[Any]],
udf.function.asInstanceOf[(Any, Iterator[Any]) => IterableOnce[Any]],
udf.inputDeserializer(ds.groupingAttributes),
ds.valueDeserializer,
ds.groupingAttributes,
Expand Down Expand Up @@ -721,7 +721,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
rel.getOtherSortingExpressionsList)

val mapped = CoGroup(
udf.function.asInstanceOf[(Any, Iterator[Any], Iterator[Any]) => TraversableOnce[Any]],
udf.function.asInstanceOf[(Any, Iterator[Any], Iterator[Any]) => IterableOnce[Any]],
// The `leftGroup` and `rightGroup` are guaranteed te be of same schema, so it's safe to
// resolve the `keyDeserializer` based on either of them, here we pick the left one.
udf.inputDeserializer(left.groupingAttributes),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* as in scala.IterableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
Expand All @@ -347,7 +347,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* as in scala.IterableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Aggregate the elements of each partition, and then the results for all the partitions, using
* given combine functions and a neutral "zero value". This function can return a different result
* type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
* and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
* and one operation for merging two U's, as in scala.IterableOnce. Both of these functions are
* allowed to modify and return their first argument instead of creating a new U to avoid memory
* allocation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* as in scala.IterableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
Expand All @@ -174,7 +174,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* as in scala.IterableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
Expand All @@ -188,7 +188,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* as in scala.IterableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
Expand Down Expand Up @@ -757,7 +757,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Pass each value in the key-value pair RDD through a flatMap function without changing the
* keys; this also retains the original RDD's partitioning.
*/
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = self.withScope {
def flatMapValues[U](f: V => IterableOnce[U]): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.flatMap { case (k, v) =>
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ abstract class RDD[T: ClassTag](
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
def flatMap[U: ClassTag](f: T => IterableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))
}
Expand Down Expand Up @@ -1204,7 +1204,7 @@ abstract class RDD[T: ClassTag](
* Aggregate the elements of each partition, and then the results for all the partitions, using
* given combine functions and a neutral "zero value". This function can return a different result
* type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
* and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
* and one operation for merging two U's, as in scala.IterableOnce. Both of these functions are
* allowed to modify and return their first argument instead of creating a new U to avoid memory
* allocation.
*
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/StatCounter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.annotation.Since
*
* @constructor Initialize the StatCounter with the given values.
*/
class StatCounter(values: TraversableOnce[Double]) extends Serializable {
class StatCounter(values: IterableOnce[Double]) extends Serializable {
private var n: Long = 0 // Running count of our values
private var mu: Double = 0 // Running mean of our values
private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2)
Expand All @@ -51,7 +51,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
}

/** Add multiple values into this StatCounter, updating the internal statistics. */
def merge(values: TraversableOnce[Double]): StatCounter = {
def merge(values: IterableOnce[Double]): StatCounter = {
values.foreach(v => merge(v))
this
}
Expand Down Expand Up @@ -155,7 +155,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {

object StatCounter {
/** Build a StatCounter from a list of values. */
def apply(values: TraversableOnce[Double]): StatCounter = new StatCounter(values)
def apply(values: IterableOnce[Double]): StatCounter = new StatCounter(values)

/** Build a StatCounter from a list of values passed as variable-length arguments. */
def apply(values: Double*): StatCounter = new StatCounter(values)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ private[spark] object Utils
* result in a new collection. Unlike scala.util.Random.shuffle, this method
* uses a local random number generator, avoiding inter-thread contention.
*/
def randomize[T: ClassTag](seq: TraversableOnce[T]): Seq[T] = {
def randomize[T: ClassTag](seq: IterableOnce[T]): Seq[T] = {
randomizeInPlace(seq.toArray)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable
this
}

def ++= (values: TraversableOnce[T]): CompactBuffer[T] = {
def ++= (values: IterableOnce[T]): CompactBuffer[T] = {
values match {
// Optimize merging of CompactBuffers, used in cogroup and groupByKey
case compactBuf: CompactBuffer[T] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[spark] object Utils extends SparkCollectionUtils {
* Callers must ensure that all the input iterators are already sorted by
* the same ordering `ord`, otherwise the result is likely to be incorrect.
*/
def mergeOrdered[T](inputs: Iterable[TraversableOnce[T]])(
def mergeOrdered[T](inputs: Iterable[IterableOnce[T]])(
implicit ord: Ordering[T]): Iterator[T] = {
val ordering = new GuavaOrdering[T] {
override def compare(l: T, r: T): Int = ord.compare(l, r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,13 @@ case class UnresolvedGenerator(name: FunctionIdentifier, children: Seq[Expressio
override def prettyName: String = name.unquotedString
override def toString: String = s"'$name(${children.mkString(", ")})"

override def eval(input: InternalRow = null): TraversableOnce[InternalRow] =
override def eval(input: InternalRow = null): IterableOnce[InternalRow] =
throw QueryExecutionErrors.cannotEvaluateExpressionError(this)

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode =
throw QueryExecutionErrors.cannotGenerateCodeForExpressionError(this)

override def terminate(): TraversableOnce[InternalRow] =
override def terminate(): IterableOnce[InternalRow] =
throw QueryExecutionErrors.cannotTerminateGeneratorError(this)

override protected def withNewChildrenInternal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ package object dsl {

def cogroup[Key: Encoder, Left: Encoder, Right: Encoder, Result: Encoder](
otherPlan: LogicalPlan,
func: (Key, Iterator[Left], Iterator[Right]) => TraversableOnce[Result],
func: (Key, Iterator[Left], Iterator[Right]) => IterableOnce[Result],
leftGroup: Seq[Attribute],
rightGroup: Seq[Attribute],
leftAttr: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ case class PythonUDAF(
}

abstract class UnevaluableGenerator extends Generator {
final override def eval(input: InternalRow): TraversableOnce[InternalRow] =
final override def eval(input: InternalRow): IterableOnce[InternalRow] =
throw QueryExecutionErrors.cannotEvaluateExpressionError(this)

final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode =
Expand Down

0 comments on commit b70c88b

Please sign in to comment.