Skip to content
Permalink
Browse files

Revert "[SPARK-23249][SQL] Improved block merging logic for partitions"

This reverts commit f5f21e8.
  • Loading branch information...
gatorsmile committed Feb 14, 2018
1 parent fd66a3b commit a5a8a86e213c34d6fb32f0ae52db24d8f1ef0905
@@ -445,29 +445,16 @@ case class FileSourceScanExec(
currentSize = 0
}

def addFile(file: PartitionedFile): Unit = {
currentFiles += file
currentSize += file.length + openCostInBytes
}

var frontIndex = 0
var backIndex = splitFiles.length - 1

while (frontIndex <= backIndex) {
addFile(splitFiles(frontIndex))
frontIndex += 1
while (frontIndex <= backIndex &&
currentSize + splitFiles(frontIndex).length <= maxSplitBytes) {
addFile(splitFiles(frontIndex))
frontIndex += 1
}
while (backIndex > frontIndex &&
currentSize + splitFiles(backIndex).length <= maxSplitBytes) {
addFile(splitFiles(backIndex))
backIndex -= 1
// Assign files to partitions using "Next Fit Decreasing"
splitFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
closePartition()
// Add the given file to the current partition.
currentSize += file.length + openCostInBytes
currentFiles += file
}
closePartition()

new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
}
@@ -141,17 +141,16 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4",
SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
checkScan(table.select('c1)) { partitions =>
// Files should be laid out [(file1, file6), (file2, file3), (file4, file5)]
assert(partitions.size == 3, "when checking partitions")
assert(partitions(0).files.size == 2, "when checking partition 1")
// Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)]
assert(partitions.size == 4, "when checking partitions")
assert(partitions(0).files.size == 1, "when checking partition 1")
assert(partitions(1).files.size == 2, "when checking partition 2")
assert(partitions(2).files.size == 2, "when checking partition 3")
assert(partitions(3).files.size == 1, "when checking partition 4")

// First partition reads (file1, file6)
// First partition reads (file1)
assert(partitions(0).files(0).start == 0)
assert(partitions(0).files(0).length == 2)
assert(partitions(0).files(1).start == 0)
assert(partitions(0).files(1).length == 1)

// Second partition reads (file2, file3)
assert(partitions(1).files(0).start == 0)
@@ -164,6 +163,10 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
assert(partitions(2).files(0).length == 1)
assert(partitions(2).files(1).start == 0)
assert(partitions(2).files(1).length == 1)

// Final partition reads (file6)
assert(partitions(3).files(0).start == 0)
assert(partitions(3).files(0).length == 1)
}

checkPartitionSchema(StructType(Nil))

0 comments on commit a5a8a86

Please sign in to comment.
You can’t perform that action at this time.