Skip to content
Permalink
Browse files

[SPARK-27680][CORE][SQL][GRAPHX] Remove usage of Traversable

## What changes were proposed in this pull request?

This removes usage of `Traversable`, which is removed in Scala 2.13. This is mostly an internal change, except for the change in the `SparkConf.setAll` method. See additional comments below.

## How was this patch tested?

Existing tests.

Closes #24584 from srowen/SPARK-27680.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
  • Loading branch information...
srowen committed May 14, 2019
1 parent 695dbe2 commit a10608cb82907157b1858aa77eac585b1ed37baf
@@ -168,6 +168,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
}

/** Set multiple parameters together */
def setAll(settings: Iterable[(String, String)]): SparkConf = {
settings.foreach { case (k, v) => set(k, v) }
this
}

/**
* Set multiple parameters together
*/
@deprecated("Use setAll(Iterable) instead", "3.0.0")
def setAll(settings: Traversable[(String, String)]): SparkConf = {
settings.foreach { case (k, v) => set(k, v) }
this
@@ -2556,7 +2556,7 @@ object SparkContext extends Logging {
private[spark] val DRIVER_IDENTIFIER = "driver"


private implicit def arrayToArrayWritable[T <: Writable : ClassTag](arr: Traversable[T])
private implicit def arrayToArrayWritable[T <: Writable : ClassTag](arr: Iterable[T])
: ArrayWritable = {
def anyToWritable[U <: Writable](u: U): Writable = u

@@ -31,7 +31,7 @@ import scala.collection.immutable.IndexedSeq
*/
private[spark] class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int) {
require(startIdx < endIdx)
def this(data: Traversable[Double]) = this(data.toArray, 0, data.size)
def this(data: Iterable[Double]) = this(data.toArray, 0, data.size)
java.util.Arrays.sort(data, startIdx, endIdx)
val length = endIdx - startIdx

@@ -42,7 +42,7 @@ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, va
* given from 0 to 1
* @param probabilities
*/
def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities)
def getQuantiles(probabilities: Iterable[Double] = defaultProbabilities)
: IndexedSeq[Double] = {
probabilities.toIndexedSeq.map { p: Double => data(closestIndex(p)) }
}
@@ -75,15 +75,15 @@ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, va

private[spark] object Distribution {

def apply(data: Traversable[Double]): Option[Distribution] = {
def apply(data: Iterable[Double]): Option[Distribution] = {
if (data.size > 0) {
Some(new Distribution(data))
} else {
None
}
}

def showQuantiles(out: PrintStream = System.out, quantiles: Traversable[Double]) {
def showQuantiles(out: PrintStream = System.out, quantiles: Iterable[Double]) {
// scalastyle:off println
out.println("min\t25%\t50%\t75%\tmax")
quantiles.foreach{q => out.print(q + "\t")}
@@ -309,7 +309,7 @@ private[spark] object JsonProtocol {

private lazy val accumulableBlacklist = Set("internal.metrics.updatedBlockStatuses")

def accumulablesToJson(accumulables: Traversable[AccumulableInfo]): JArray = {
def accumulablesToJson(accumulables: Iterable[AccumulableInfo]): JArray = {
JArray(accumulables
.filterNot(_.name.exists(accumulableBlacklist.contains))
.toList.map(accumulableInfoToJson))
@@ -535,7 +535,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
/**
* Assert that the given list of numbers has an average that is greater than zero.
*/
private def checkNonZeroAvg(m: Traversable[Long], msg: String) {
private def checkNonZeroAvg(m: Iterable[Long], msg: String) {
assert(m.sum / m.size.toDouble > 0.0, msg)
}

@@ -83,7 +83,7 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
UserGroupInformation.getCurrentUser.addCredentials(creds)
}

protected def setSparkEnv(settings: Traversable[(String, String)]): Unit = {
protected def setSparkEnv(settings: Iterable[(String, String)]): Unit = {
val conf = new SparkConf().setAll(settings)
val env = mock(classOf[SparkEnv])
doReturn(conf).when(env).conf
@@ -55,7 +55,7 @@ object LabelPropagation {
val count1Val = count1.getOrElse(i, 0L)
val count2Val = count2.getOrElse(i, 0L)
i -> (count1Val + count2Val)
}(collection.breakOut) // more efficient alternative to [[collection.Traversable.toMap]]
}(collection.breakOut)
}
def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]): VertexId = {
if (message.isEmpty) attr else message.maxBy(_._2)._1
@@ -36,7 +36,7 @@ object ShortestPaths extends Serializable {
private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
(spmap1.keySet ++ spmap2.keySet).map {
k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
}(collection.breakOut) // more efficient alternative to [[collection.Traversable.toMap]]
}(collection.breakOut)
}

/**
@@ -63,7 +63,7 @@ object AttributeSet {
* when the transformation was a no-op).
*/
class AttributeSet private (val baseSet: Set[AttributeEquals])
extends Traversable[Attribute] with Serializable {
extends Iterable[Attribute] with Serializable {

override def hashCode: Int = baseSet.hashCode()

@@ -99,7 +99,7 @@ class AttributeSet private (val baseSet: Set[AttributeEquals])
* Returns a new [[AttributeSet]] that does not contain any of the [[Attribute Attributes]] found
* in `other`.
*/
def --(other: Traversable[NamedExpression]): AttributeSet = {
def --(other: Iterable[NamedExpression]): AttributeSet = {
other match {
case otherSet: AttributeSet =>
new AttributeSet(baseSet -- otherSet.baseSet)
@@ -253,7 +253,7 @@ abstract class Expression extends TreeNode[Expression] {
def prettyName: String = nodeName.toLowerCase(Locale.ROOT)

protected def flatArguments: Iterator[Any] = productIterator.flatMap {
case t: Traversable[_] => t
case t: Iterable[_] => t
case single => single :: Nil
}

@@ -183,7 +183,7 @@ trait Block extends TreeNode[Block] with JavaCode {
def doTransform(arg: Any): AnyRef = arg match {
case e: ExprValue => transform(e)
case Some(value) => Some(doTransform(value))
case seq: Traversable[_] => seq.map(doTransform)
case seq: Iterable[_] => seq.map(doTransform)
case other: AnyRef => other
}

@@ -119,7 +119,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
case m: Map[_, _] => m
case d: DataType => d // Avoid unpacking Structs
case stream: Stream[_] => stream.map(recursiveTransform).force
case seq: Traversable[_] => seq.map(recursiveTransform)
case seq: Iterable[_] => seq.map(recursiveTransform)
case other: AnyRef => other
case null => null
}
@@ -142,16 +142,16 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
/** Returns all of the expressions present in this query plan operator. */
final def expressions: Seq[Expression] = {
// Recursively find all expressions from a traversable.
def seqToExpressions(seq: Traversable[Any]): Traversable[Expression] = seq.flatMap {
def seqToExpressions(seq: Iterable[Any]): Iterable[Expression] = seq.flatMap {
case e: Expression => e :: Nil
case s: Traversable[_] => seqToExpressions(s)
case s: Iterable[_] => seqToExpressions(s)
case other => Nil
}

productIterator.flatMap {
case e: Expression => e :: Nil
case s: Some[_] => seqToExpressions(s.toSeq)
case seq: Traversable[_] => seqToExpressions(seq)
case seq: Iterable[_] => seqToExpressions(seq)
case other => Nil
}.toSeq
}
@@ -353,7 +353,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
}.view.force // `mapValues` is lazy and we need to force it to materialize
case d: DataType => d // Avoid unpacking Structs
case args: Stream[_] => args.map(mapChild).force // Force materialization on stream
case args: Traversable[_] => args.map(mapChild)
case args: Iterable[_] => args.map(mapChild)
case nonChild: AnyRef => nonChild
case null => null
}

0 comments on commit a10608c

Please sign in to comment.
You can’t perform that action at this time.