Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39298][CORE][SQL][DSTREAM][GRAPHX][ML][MLLIB][SS][YARN] Replace constructing ranges of collection indices manually with .indices #36679

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2281,7 +2281,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
val headers = row.getList[Row](row.fieldIndex("headers")).asScala
assert(headers.length === expected.length)

(0 until expected.length).foreach { idx =>
expected.indices.foreach { idx =>
val key = headers(idx).getAs[String]("key")
val value = headers(idx).getAs[Array[Byte]]("value")
assert(key === expected(idx)._1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class KafkaDataConsumerSuite extends SparkFunSuite with MockitoSugar with Before
val consumer = KafkaDataConsumer.acquire[Array[Byte], Array[Byte]](
topicPartition, kafkaParams, taskContext, useCache)
try {
val rcvd = (0 until data.length).map { offset =>
val rcvd = data.indices.map { offset =>
val bytes = consumer.get(offset, 10000).value()
new String(bytes)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
).map { bytes => new String(bytes).toInt }.collectPartitions()
assert(receivedData3.length === allRanges.size)
for (i <- 0 until allRanges.size) {
for (i <- allRanges.indices) {
assert(receivedData3(i).toSeq === shardIdToData(allRanges(i).shardId))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ private[spark] class MapOutputTrackerMaster(
statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1).toInt
if (parallelism <= 1) {
statuses.filter(_ != null).foreach { s =>
for (i <- 0 until totalSizes.length) {
for (i <- totalSizes.indices) {
totalSizes(i) += s.getSizeForBlock(i)
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2277,7 +2277,7 @@ class SparkContext(config: SparkConf) extends Logging {
* a result from one partition)
*/
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
runJob(rdd, func, rdd.partitions.indices)
}

/**
Expand All @@ -2289,7 +2289,7 @@ class SparkContext(config: SparkConf) extends Logging {
* a result from one partition)
*/
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
runJob(rdd, func, rdd.partitions.indices)
}

/**
Expand All @@ -2304,7 +2304,7 @@ class SparkContext(config: SparkConf) extends Logging {
rdd: RDD[T],
processPartition: (TaskContext, Iterator[T]) => U,
resultHandler: (Int, U) => Unit): Unit = {
runJob[T, U](rdd, processPartition, 0 until rdd.partitions.length, resultHandler)
runJob[T, U](rdd, processPartition, rdd.partitions.indices, resultHandler)
}

/**
Expand All @@ -2319,7 +2319,7 @@ class SparkContext(config: SparkConf) extends Logging {
processPartition: Iterator[T] => U,
resultHandler: (Int, U) => Unit): Unit = {
val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter)
runJob[T, U](rdd, processFunc, 0 until rdd.partitions.length, resultHandler)
runJob[T, U](rdd, processFunc, rdd.partitions.indices, resultHandler)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ private[deploy] class Master(
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
for (pos <- usableWorkers.indices if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private[spark] object ExecutorMetricType {
var numberOfMetrics = 0
val definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int]
metricGetters.foreach { m =>
(0 until m.names.length).foreach { idx =>
m.names.indices.foreach { idx =>
definedMetricsAndOffset += (m.names(idx) -> (idx + numberOfMetrics))
}
numberOfMetrics += m.names.length
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[Blo

override def getPartitions: Array[Partition] = {
assertValid()
(0 until blockIds.length).map { i =>
blockIds.indices.map { i =>
new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
}.toArray
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class CoGroupedRDD[K: ClassTag](

override def getPartitions: Array[Partition] = {
val array = new Array[Partition](part.numPartitions)
for (i <- 0 until array.length) {
for (i <- array.indices) {
// Each CoGroupPartition will have a dependency per contributing RDD
array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
// Assume each RDD contributed a single dependency, and get it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class NewHadoopRDD[K, V](
}

val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
for (i <- rawSplits.indices) {
result(i) =
new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](

override def getPartitions: Array[Partition] = {
val array = new Array[Partition](part.numPartitions)
for (i <- 0 until array.length) {
for (i <- array.indices) {
// Each CoGroupPartition will depend on rdd1 and rdd2
array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
dependencies(j) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ private[spark] class TaskSchedulerImpl(
var minLaunchedLocality: Option[TaskLocality] = None
// nodes and executors that are excluded for the entire application have already been
// filtered out by this point
for (i <- 0 until shuffledOffers.size) {
for (i <- shuffledOffers.indices) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
val taskSetRpID = taskSet.taskSet.resourceProfileId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2040,7 +2040,7 @@ private[spark] object BlockManager {
}

val blockManagers = new HashMap[BlockId, Seq[String]]
for (i <- 0 until blockIds.length) {
for (i <- blockIds.indices) {
blockManagers(blockIds(i)) = blockLocations(i).map { loc =>
ExecutorCacheTaskLocation(loc.host, loc.executorId).toString
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ private[spark] class MemoryStore(
try {
logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
s"(${Utils.bytesToString(freedMemory)} bytes)")
(0 until selectedBlocks.size).foreach { idx =>
selectedBlocks.indices.foreach { idx =>
val blockId = selectedBlocks(idx)
val entry = entries.synchronized {
entries.get(blockId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
val futureAction = sc.submitJob(
rdd,
(iter: Iterator[Int]) => iter.toArray,
partitions.getOrElse(0 until rdd.partitions.length),
partitions.getOrElse(rdd.partitions.indices),
{ case (_, _) => return }: (Int, Array[Int]) => Unit,
{ return }
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class ParallelCollectionSplitSuite extends SparkFunSuite with Checkers {
val r = ParallelCollectionRDD.slice(1 to 7, 4)
val nr = ParallelCollectionRDD.slice(1L to 7L, 4)
assert(r.size === 4)
for (i <- 0 until r.size) {
for (i <- r.indices) {
assert(r(i).size === nr(i).size)
}
}
Expand All @@ -126,7 +126,7 @@ class ParallelCollectionSplitSuite extends SparkFunSuite with Checkers {
val r = ParallelCollectionRDD.slice(List(1, 2), 4)
val nr = ParallelCollectionRDD.slice(1L to 2L, 4)
assert(r.size === 4)
for (i <- 0 until r.size) {
for (i <- r.indices) {
assert(r(i).size === nr(i).size)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: A
@transient private lazy val parentPartitionMapping: Array[Int] = {
val n = parent.numPartitions
val result = new Array[Int](n)
for (i <- 0 until partitionStartIndices.length) {
for (i <- partitionStartIndices.indices) {
val start = partitionStartIndices(i)
val end = if (i < partitionStartIndices.length - 1) partitionStartIndices(i + 1) else n
for (j <- start until end) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
sc.submitJob(
rdd,
(iter: Iterator[Int]) => iter.toArray,
0 until rdd.partitions.length,
rdd.partitions.indices,
{ case (_, _) => return }: (Int, Array[Int]) => Unit,
{ return }
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1806,7 +1806,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
// now we should submit stage 1, and the map output from stage 0 should be registered

// check that we have all the map output for stage 0
(0 until reduceRdd.partitions.length).foreach { reduceIdx =>
reduceRdd.partitions.indices.foreach { reduceIdx =>
val statuses = mapOutputTracker.getMapSizesByExecutorId(0, reduceIdx)
// really we should have already thrown an exception rather than fail either of these
// asserts, but just to be extra defensive let's double check the statuses are OK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
test("Only one of two duplicate commit tasks should commit") {
val rdd = sc.parallelize(Seq(1), 1)
sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).commitSuccessfully _,
0 until rdd.partitions.size)
rdd.partitions.indices)
assert(tempDir.list().size === 1)
}

test("If commit fails, if task is retried it should not be locked, and will succeed.") {
val rdd = sc.parallelize(Seq(1), 1)
sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).failFirstCommitAttempt _,
0 until rdd.partitions.size)
rdd.partitions.indices)
assert(tempDir.list().size === 1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
// send data to appender through the input stream, and wait for the data to be written
val allGeneratedFiles = new HashSet[String]()
val items = (1 to 10).map { _.toString * 10000 }
for (i <- 0 until items.size) {
for (i <- items.indices) {
testOutputStream.write(items(i).getBytes(StandardCharsets.UTF_8))
testOutputStream.flush()
allGeneratedFiles ++= RollingFileAppender.getSortedRolledOverFiles(
Expand Down Expand Up @@ -364,7 +364,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
): Seq[File] = {
// send data to appender through the input stream, and wait for the data to be written
val expectedText = textToAppend.mkString("")
for (i <- 0 until textToAppend.size) {
for (i <- textToAppend.indices) {
outputStream.write(textToAppend(i).getBytes(StandardCharsets.UTF_8))
outputStream.flush()
Thread.sleep(sleepTimeBetweenTexts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
assert(info1.submissionTime === info2.submissionTime)
assert(info1.completionTime === info2.completionTime)
assert(info1.rddInfos.size === info2.rddInfos.size)
(0 until info1.rddInfos.size).foreach { i =>
info1.rddInfos.indices.foreach { i =>
assertEquals(info1.rddInfos(i), info2.rddInfos(i))
}
assert(info1.accumulables === info2.accumulables)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class PrefixComparatorsSuite extends SparkFunSuite with ScalaCheckPropertyChecks
test("Binary prefix comparator") {

def compareBinary(x: Array[Byte], y: Array[Byte]): Int = {
for (i <- 0 until x.length; if i < y.length) {
for (i <- x.indices; if i < y.length) {
val v1 = x(i) & 0xff
val v2 = y(i) & 0xff
val res = v1 - v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ object MultiBroadcastTest {
val num = if (args.length > 1) args(1).toInt else 1000000

val arr1 = new Array[Int](num)
for (i <- 0 until arr1.length) {
for (i <- arr1.indices) {
arr1(i) = i
}

val arr2 = new Array[Int](num)
for (i <- 0 until arr2.length) {
for (i <- arr2.indices) {
arr2(i) = i
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object SparkKMeans {
var bestIndex = 0
var closest = Double.PositiveInfinity

for (i <- 0 until centers.length) {
for (i <- centers.indices) {
val tempDist = squaredDistance(p, centers(i))
if (tempDist < closest) {
closest = tempDist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[graphx]
class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexId], val attrs: Array[VD])
extends Serializable {
def iterator: Iterator[(VertexId, VD)] =
(0 until vids.length).iterator.map { i => (vids(i), attrs(i)) }
vids.indices.iterator.map { i => (vids(i), attrs(i)) }
}

private[graphx]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class EdgeSuite extends SparkFunSuite {
// to ascending order
val sortedEdges = testEdges.sorted(Edge.lexicographicOrdering[Int])

for (i <- 0 until testEdges.length) {
for (i <- testEdges.indices) {
assert(sortedEdges(i) == testEdges(testEdges.length - i - 1))
}
}
Expand Down
8 changes: 4 additions & 4 deletions mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ private[ml] class FeedForwardModel private(
val layers = topology.layers
val layerModels = new Array[LayerModel](layers.length)
private var offset = 0
for (i <- 0 until layers.length) {
for (i <- layers.indices) {
layerModels(i) = layers(i).createModel(
new BDV[Double](weights.toArray, offset, 1, layers(i).weightSize))
offset += layers(i).weightSize
Expand All @@ -495,7 +495,7 @@ private[ml] class FeedForwardModel private(
if (outputs == null || outputs(0).cols != currentBatchSize) {
outputs = new Array[BDM[Double]](layers.length)
var inputSize = data.rows
for (i <- 0 until layers.length) {
for (i <- layers.indices) {
if (layers(i).inPlace) {
outputs(i) = outputs(i - 1)
} else {
Expand Down Expand Up @@ -542,7 +542,7 @@ private[ml] class FeedForwardModel private(
}
val cumGradientArray = cumGradient.toArray
var offset = 0
for (i <- 0 until layerModels.length) {
for (i <- layerModels.indices) {
val input = if (i == 0) data else outputs(i - 1)
layerModels(i).grad(deltas(i), input,
new BDV[Double](cumGradientArray, offset, 1, layers(i).weightSize))
Expand Down Expand Up @@ -601,7 +601,7 @@ private[ann] object FeedForwardModel {
val weights = BDV.zeros[Double](topology.layers.map(_.weightSize).sum)
var offset = 0
val random = new XORShiftRandom(seed)
for (i <- 0 until layers.length) {
for (i <- layers.indices) {
layerModels(i) = layers(i).
initModel(new BDV[Double](weights.data, offset, 1, layers(i).weightSize), random)
offset += layers(i).weightSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui
// non-deterministic results when array contains both 0.0 and -0.0
// So that here we should first normalize all 0.0 and -0.0 to be 0.0
// See https://github.com/scala/bug/issues/11995
for (i <- 0 until splits.length) {
for (i <- splits.indices) {
if (splits(i) == -0.0) {
splits(i) = 0.0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ class StringIndexerModel (
// This filters out any null values and also the input labels which are not in
// the dataset used for fitting.
private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = {
val conditions: Seq[Column] = (0 until inputColNames.length).map { i =>
val conditions: Seq[Column] = inputColNames.indices.map { i =>
val inputColName = inputColNames(i)
val labelToIndex = labelsToIndexArray(i)
// We have this additional lookup at `labelToIndex` when `handleInvalid` is set to
Expand Down Expand Up @@ -423,7 +423,7 @@ class StringIndexerModel (
dataset
}

for (i <- 0 until outputColNames.length) {
for (i <- outputColNames.indices) {
val inputColName = inputColNames(i)
val outputColName = outputColNames(i)
val labelToIndex = labelsToIndexArray(i)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ object CrossValidatorModel extends MLReadable[CrossValidatorModel] {
val subModelsPath = new Path(path, "subModels")
for (splitIndex <- 0 until instance.getNumFolds) {
val splitPath = new Path(subModelsPath, s"fold${splitIndex.toString}")
for (paramIndex <- 0 until instance.getEstimatorParamMaps.length) {
for (paramIndex <- instance.getEstimatorParamMaps.indices) {
val modelPath = new Path(splitPath, paramIndex.toString).toString
instance.subModels(splitIndex)(paramIndex).asInstanceOf[MLWritable].save(modelPath)
}
Expand Down Expand Up @@ -442,7 +442,7 @@ object CrossValidatorModel extends MLReadable[CrossValidatorModel] {
Array.ofDim[Model[_]](estimatorParamMaps.length))
for (splitIndex <- 0 until numFolds) {
val splitPath = new Path(subModelsPath, s"fold${splitIndex.toString}")
for (paramIndex <- 0 until estimatorParamMaps.length) {
for (paramIndex <- estimatorParamMaps.indices) {
val modelPath = new Path(splitPath, paramIndex.toString).toString
_subModels(splitIndex)(paramIndex) =
DefaultParamsReader.loadParamsInstance(modelPath, sc)
Expand Down
Loading