Skip to content

Commit

Permalink
[CARMEL-4005] Use collectAsIterator() instead of toLocalIterator() to…
Browse files Browse the repository at this point in the history
… minimize number of job launching (delta-io#8)
  • Loading branch information
LantaoJin authored and GitHub Enterprise committed Nov 2, 2020
1 parent ca22ec1 commit eab3180
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,6 @@ abstract class ConvertToDeltaCommandBase(
sessionCatalog: SessionCatalog): Unit = {
val newCatalog = catalogTable.copy(
provider = Some("delta"),
// TODO: Schema changes unfortunately doesn't get reflected in the HiveMetaStore. Should be
// fixed in Apache Spark
schema = new StructType(),
partitionColumnNames = catalogTable.partitionColumnNames,
bucketSpec = catalogTable.bucketSpec,
properties = Map.empty,
// TODO: Serde information also doesn't get removed
Expand Down Expand Up @@ -656,7 +652,7 @@ abstract class ConvertToDeltaCommandBase(
ds
}

override def getFiles: Iterator[SerializableFileStatus] = list.toLocalIterator().asScala
override def getFiles: Iterator[SerializableFileStatus] = list.collectAsIterator()

override def close(): Unit = list.unpersist()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ trait VacuumCommandImpl extends DeltaCommand {
* Attempts to delete the list of candidate files. Returns the number of files deleted.
*/
protected def delete(diff: Dataset[String], fs: FileSystem): Long = {
val fileResultSet = diff.toLocalIterator().asScala
val fileResultSet = diff.collectAsIterator()
fileResultSet.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
}

Expand Down

0 comments on commit eab3180

Please sign in to comment.