Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38337][CORE][SQL][DSTREAM][MLLIB] Replace toIterator with iterator for IterableLike/IterableOnce to cleanup deprecated api usage #35665

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ private[spark] class MapOutputTrackerMaster(
override def getMapSizesForMergeResult(
shuffleId: Int,
partitionId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
Seq.empty.toIterator
Seq.empty.iterator
}

// This method is only called in local-mode. Since push based shuffle won't be
Expand All @@ -1186,7 +1186,7 @@ private[spark] class MapOutputTrackerMaster(
shuffleId: Int,
partitionId: Int,
chunkTracker: RoaringBitmap): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
Seq.empty.toIterator
Seq.empty.iterator
}

// This method is only called in local-mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private[spark] object PythonRDD extends Logging {
out.writeInt(1)

// Write the next object and signal end of data for this iteration
writeIteratorToStream(partitionArray.toIterator, out)
writeIteratorToStream(partitionArray.iterator, out)
out.writeInt(SpecialLengths.END_OF_DATA_SECTION)
out.flush()
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {

private def formatMasterResourcesInUse(aliveWorkers: Array[WorkerInfo]): String = {
val totalInfo = aliveWorkers.map(_.resourcesInfo)
.flatMap(_.toIterator)
.flatMap(_.iterator)
.groupBy(_._1) // group by resource name
.map { case (rName, rInfoArr) =>
rName -> rInfoArr.map(_._2.addresses.size).sum
}
val usedInfo = aliveWorkers.map(_.resourcesInfoUsed)
.flatMap(_.toIterator)
.flatMap(_.iterator)
.groupBy(_._1) // group by resource name
.map { case (rName, rInfoArr) =>
rName -> rInfoArr.map(_._2.addresses.size).sum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private[scheduler] class TaskSetExcludelist(
// over the limit, exclude this task from the entire host.
val execsWithFailuresOnNode = nodeToExecsWithFailures.getOrElseUpdate(host, new HashSet())
execsWithFailuresOnNode += exec
val failuresOnHost = execsWithFailuresOnNode.toIterator.flatMap { exec =>
val failuresOnHost = execsWithFailuresOnNode.iterator.flatMap { exec =>
execToFailures.get(exec).map { failures =>
// We count task attempts here, not the number of unique executors with failures. This is
// because jobs are aborted based on the number task attempts; if we counted unique
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1872,7 +1872,7 @@ private[spark] class BlockManager(
serializerManager.dataSerializeStream(
blockId,
out,
elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]])
elements.iterator)(info.classTag.asInstanceOf[ClassTag[T]])
}
case Right(bytes) =>
diskStore.putBytes(blockId, bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ private[spark] class MemoryStore(
val unrolledIterator = if (valuesHolder.vector != null) {
valuesHolder.vector.iterator
} else {
valuesHolder.arrayValues.toIterator
valuesHolder.arrayValues.iterator
}

Left(new PartiallyUnrolledIterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,5 +138,5 @@ class RDDOperationScopeSuite extends SparkFunSuite with BeforeAndAfter {

private class MyCoolRDD(sc: SparkContext) extends RDD[Int](sc, Nil) {
override def getPartitions: Array[Partition] = Array.empty
override def compute(p: Partition, context: TaskContext): Iterator[Int] = { Nil.toIterator }
override def compute(p: Partition, context: TaskContext): Iterator[Int] = { Nil.iterator }
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext
val shuffleBlockId = ShuffleBlockId(shuffleId, mapId, reduceId)
(shuffleBlockId, byteOutputStream.size().toLong, mapId)
}
Seq((localBlockManagerId, shuffleBlockIdsAndSizes)).toIterator
Seq((localBlockManagerId, shuffleBlockIdsAndSizes)).iterator
}

// Create a mocked shuffle handle to pass into HashShuffleReader.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class SortShuffleWriterSuite
mapId = 2,
context,
shuffleExecutorComponents)
writer.write(records.toIterator)
writer.write(records.iterator)
writer.stop(success = true)
val dataFile = shuffleBlockResolver.getDataFile(shuffleId, 2)
val writeMetrics = context.taskMetrics().shuffleWriteMetrics
Expand Down Expand Up @@ -160,7 +160,7 @@ class SortShuffleWriterSuite
context,
new LocalDiskShuffleExecutorComponents(
conf, shuffleBlockResolver._blockManager, shuffleBlockResolver))
writer.write(records.toIterator)
writer.write(records.iterator)
val sorterMethod = PrivateMethod[ExternalSorter[_, _, _]](Symbol("sorter"))
val sorter = writer.invokePrivate(sorterMethod())
val expectSpillSize = if (doSpill) records.size else 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
transfer,
blockManager.getOrElse(createMockBlockManager()),
mapOutputTracker,
blocksByAddress.toIterator,
blocksByAddress.iterator,
(_, in) => streamWrapperLimitSize.map(new LimitedInputStream(in, _)).getOrElse(in),
maxBytesInFlight,
maxReqsInFlight,
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {

test("get iterator size") {
val empty = Seq[Int]()
assert(Utils.getIteratorSize(empty.toIterator) === 0L)
assert(Utils.getIteratorSize(empty.iterator) === 0L)
val iterator = Iterator.range(0, 5)
assert(Utils.getIteratorSize(iterator) === 5L)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private[image] class ImageFileFormat extends FileFormat with DataSourceRegister
}
val resultOpt = ImageSchema.decode(origin, bytes)
val filteredResult = if (imageSourceOptions.dropInvalid) {
resultOpt.toIterator
resultOpt.iterator
} else {
Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private[evaluation] object AreaUnderCurve {
* @param curve an iterator over ordered 2D points stored in pairs representing a curve
*/
def of(curve: Iterable[(Double, Double)]): Double = {
curve.toIterator.sliding(2).withPartial(false).aggregate(0.0)(
curve.iterator.sliding(2).withPartial(false).aggregate(0.0)(
seqop = (auc: Double, points: Seq[(Double, Double)]) => auc + trapezoid(points),
combop = _ + _
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[fpm] class LocalPrefixSpan(
count >= minCount
}.sorted
// project and recursively call genFreqPatterns
freqItems.toIterator.flatMap { case (item, count) =>
freqItems.iterator.flatMap { case (item, count) =>
val newPrefix = prefix :+ item
Iterator.single((newPrefix, count)) ++ {
val projected = postfixes.map(_.project(item)).filter(_.nonEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ object PrefixSpan extends Logging {
data.flatMap { itemsets =>
val uniqItems = mutable.Set.empty[Item]
itemsets.foreach(set => uniqItems ++= set)
uniqItems.toIterator.map((_, 1L))
uniqItems.iterator.map((_, 1L))
}.reduceByKey(_ + _).filter { case (_, count) =>
count >= minCount
}.sortBy(-_._2).map(_._1).collect()
Expand Down Expand Up @@ -478,7 +478,7 @@ object PrefixSpan extends Logging {
}
i += 1
}
prefixes.toIterator
prefixes.iterator
}

/** Tests whether this postfix is non-empty. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ case class GetViewColumnByNameAndOrdinal(
override def dataType: DataType = throw new UnresolvedException("dataType")
override def nullable: Boolean = throw new UnresolvedException("nullable")
override lazy val resolved = false
override def stringArgs: Iterator[Any] = super.stringArgs.toSeq.dropRight(1).toIterator
override def stringArgs: Iterator[Any] = super.stringArgs.toSeq.dropRight(1).iterator
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ case class MapObjects private(

private def executeFuncOnCollection(inputCollection: Seq[_]): Iterator[_] = {
val row = new GenericInternalRow(1)
inputCollection.toIterator.map { element =>
inputCollection.iterator.map { element =>
row.update(0, element)
lambdaFunction.eval(row)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
wrappedCharException.initCause(e)
handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord, wrappedCharException)
}
}.reduceOption(typeMerger).toIterator
}.reduceOption(typeMerger).iterator
}

// Here we manually submit a fold-like Spark job, so that we can set the SQLConf when running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ object ObjectSerializerPruning extends Rule[LogicalPlan] {
serializer: NamedExpression,
prunedDataType: DataType): NamedExpression = {
val prunedStructTypes = collectStructType(prunedDataType, ArrayBuffer.empty[StructType])
.toIterator
.iterator

def transformer: PartialFunction[Expression, Expression] = {
case m: ExternalMapToCatalyst =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class FailureSafeParser[IN](

def parse(input: IN): Iterator[InternalRow] = {
try {
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
rawParser.apply(input).iterator.map(row => toResultRow(Some(row), () => null))
} catch {
case e: BadRecordException => mode match {
case PermissiveMode =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class StringKeyHashMap[T](normalizer: (String) => String) {

def remove(key: String): Option[T] = base.remove(normalizer(key))

def iterator: Iterator[(String, T)] = base.toIterator
def iterator: Iterator[(String, T)] = base.iterator

def clear(): Unit = base.clear()
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object StringUtils extends Logging {
* @return the equivalent Java regular expression of the pattern
*/
def escapeLikeRegex(pattern: String, escapeChar: Char): String = {
val in = pattern.toIterator
val in = pattern.iterator
val out = new StringBuilder()

def fail(message: String) = throw QueryCompilationErrors.invalidPatternError(pattern, message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ trait SQLKeywordUtils extends SparkFunSuite with SQLHelper {
val default = (_: String) => Nil
var startTagFound = false
var parseFinished = false
val lineIter = sqlSyntaxDefs.toIterator
val lineIter = sqlSyntaxDefs.iterator
while (!parseFinished && lineIter.hasNext) {
val line = lineIter.next()
if (line.trim.startsWith(startTag)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class GenerateUnsafeRowJoinerSuite extends SparkFunSuite {
if (actualFixedLength !== expectedFixedLength) {
actualFixedLength.grouped(8)
.zip(expectedFixedLength.grouped(8))
.zip(mergedSchema.fields.toIterator)
.zip(mergedSchema.fields.iterator)
.foreach {
case ((actual, expected), field) =>
assert(actual === expected, s"Fixed length sections are not equal for field $field")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode {

override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray

override def executeToIterator(): Iterator[InternalRow] = sideEffectResult.toIterator
override def executeToIterator(): Iterator[InternalRow] = sideEffectResult.iterator

override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray

Expand Down Expand Up @@ -124,7 +124,7 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan)

override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray

override def executeToIterator(): Iterator[InternalRow] = sideEffectResult.toIterator
override def executeToIterator(): Iterator[InternalRow] = sideEffectResult.iterator

override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ case class AlterTableAddPartitionCommand(
// Also the request to metastore times out when adding lot of partitions in one shot.
// we should split them into smaller batches
val batchSize = conf.getConf(SQLConf.ADD_PARTITION_BATCH_SIZE)
parts.toIterator.grouped(batchSize).foreach { batch =>
parts.iterator.grouped(batchSize).foreach { batch =>
catalog.createPartitions(table.identifier, batch, ignoreIfExists = ifNotExists)
}

Expand Down Expand Up @@ -772,7 +772,7 @@ case class RepairTableCommand(
// we should split them into smaller batches. Since Hive client is not thread safe, we cannot
// do this in parallel.
val batchSize = spark.conf.get(SQLConf.ADD_PARTITION_BATCH_SIZE)
partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch =>
partitionSpecsAndLocs.iterator.grouped(batchSize).foreach { batch =>
val now = MILLISECONDS.toSeconds(System.currentTimeMillis())
val parts = batch.map { case (spec, location) =>
val params = partitionStats.get(location.toString).map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class FileScanRDD(
inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback())
}

private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
private[this] val files = split.asInstanceOf[FilePartition].files.iterator
private[this] var currentFile: PartitionedFile = null
private[this] var currentIterator: Iterator[Object] = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ object OrcUtils extends Logging {
: Option[StructType] = {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
val conf = sparkSession.sessionState.newHadoopConfWithOptions(options)
files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst {
files.iterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst {
case Some(schema) =>
logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
toCatalystSchema(schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ abstract class FilePartitionReaderFactory extends PartitionReaderFactory {
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
assert(partition.isInstanceOf[FilePartition])
val filePartition = partition.asInstanceOf[FilePartition]
val iter = filePartition.files.toIterator.map { file =>
val iter = filePartition.files.iterator.map { file =>
PartitionedFileReader(file, buildReader(file))
}
new FilePartitionReader[InternalRow](iter)
Expand All @@ -35,7 +35,7 @@ abstract class FilePartitionReaderFactory extends PartitionReaderFactory {
override def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch] = {
assert(partition.isInstanceOf[FilePartition])
val filePartition = partition.asInstanceOf[FilePartition]
val iter = filePartition.files.toIterator.map { file =>
val iter = filePartition.files.iterator.map { file =>
PartitionedFileReader(file, buildColumnarReader(file))
}
new FilePartitionReader[ColumnarBatch](iter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ abstract class V2CommandExec extends SparkPlan {
*/
override def executeCollect(): Array[InternalRow] = result.toArray

override def executeToIterator(): Iterator[InternalRow] = result.toIterator
override def executeToIterator(): Iterator[InternalRow] = result.iterator

override def executeTake(limit: Int): Array[InternalRow] = result.take(limit).toArray

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ case class BroadcastNestedLoopJoinExec(
i += 1
}
}
Seq(matched).toIterator
Seq(matched).iterator
}

matchedBuildRows.fold(new BitSet(relation.value.length))(_ | _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1377,7 +1377,7 @@ class ArrowConvertersSuite extends SharedSparkSession {
val schema = StructType(Seq(StructField("int", IntegerType, nullable = true)))

val ctx = TaskContext.empty()
val batchIter = ArrowConverters.toBatchIterator(inputRows.toIterator, schema, 5, null, ctx)
val batchIter = ArrowConverters.toBatchIterator(inputRows.iterator, schema, 5, null, ctx)
val outputRowIter = ArrowConverters.fromBatchIterator(batchIter, schema, null, ctx)

var count = 0
Expand All @@ -1398,7 +1398,7 @@ class ArrowConvertersSuite extends SharedSparkSession {

val schema = StructType(Seq(StructField("int", IntegerType, nullable = true)))
val ctx = TaskContext.empty()
val batchIter = ArrowConverters.toBatchIterator(inputRows.toIterator, schema, 5, null, ctx)
val batchIter = ArrowConverters.toBatchIterator(inputRows.iterator, schema, 5, null, ctx)

// Write batches to Arrow stream format as a byte array
val out = new ByteArrayOutputStream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ class FileIndexSuite extends SharedSparkSession {
new Path("file")), Array(new BlockLocation()))
)
when(dfs.listLocatedStatus(path)).thenReturn(new RemoteIterator[LocatedFileStatus] {
val iter = statuses.toIterator
val iter = statuses.iterator
override def hasNext: Boolean = iter.hasNext
override def next(): LocatedFileStatus = iter.next
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ class HashedRelationSuite extends SharedSparkSession {

test("EmptyHashedRelation override methods behavior test") {
val buildKey = Seq(BoundReference(0, LongType, false))
val hashed = HashedRelation(Seq.empty[InternalRow].toIterator, buildKey, 1, mm)
val hashed = HashedRelation(Seq.empty[InternalRow].iterator, buildKey, 1, mm)
assert(hashed == EmptyHashedRelation)

val key = InternalRow(1L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private[hive] object OrcFileOperator extends Logging {
: Option[StructType] = {
// Take the first file where we can open a valid reader if we can find one. Otherwise just
// return None to indicate we can't infer the schema.
paths.toIterator.map(getFileReader(_, conf, ignoreCorruptFiles)).collectFirst {
paths.iterator.map(getFileReader(_, conf, ignoreCorruptFiles)).collectFirst {
case Some(reader) =>
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
Expand Down