Skip to content

Commit

Permalink
[SPARK-21621][CORE] Reset numRecordsWritten after DiskBlockObjectWrit…
Browse files Browse the repository at this point in the history
…er.commitAndGet called

## What changes were proposed in this pull request?

We should reset numRecordsWritten to zero after DiskBlockObjectWriter.commitAndGet called.
Because when `revertPartialWritesAndClose` be called, we decrease the written records in `ShuffleWriteMetrics` . However, we decreased the written records to zero, this should be wrong, we should only decreased the number reords after the last `commitAndGet` called.

## How was this patch tested?
Modified existing test.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Xianyang Liu <xianyang.liu@intel.com>

Closes apache#18830 from ConeyLiu/DiskBlockObjectWriter.
  • Loading branch information
ConeyLiu authored and cloud-fan committed Aug 7, 2017
1 parent 39e044e commit 534a063
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ private[spark] class DiskBlockObjectWriter(
/**
* Keep track of number of records written and also use this to periodically
* output bytes written since the latter is expensive to do for each record.
* And we reset it after every commitAndGet called.
*/
private var numRecordsWritten = 0

Expand Down Expand Up @@ -185,6 +186,7 @@ private[spark] class DiskBlockObjectWriter(
// In certain compression codecs, more bytes are written after streams are closed
writeMetrics.incBytesWritten(committedPosition - reportedPosition)
reportedPosition = committedPosition
numRecordsWritten = 0
fileSegment
} else {
new FileSegment(file, committedPosition, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
writer.revertPartialWritesAndClose()
assert(firstSegment.length === file.length())
assert(writeMetrics.bytesWritten === file.length())
assert(writeMetrics.recordsWritten == 1)
}

test("calling revertPartialWritesAndClose() after commit() should have no effect") {
Expand Down

0 comments on commit 534a063

Please sign in to comment.