Skip to content

[SPARK-27852][Spark Core] updateBytesWritten() operaton is missed#24720

Closed
whatlulumomo wants to merge 1 commit intoapache:masterfrom
whatlulumomo:master
Closed

[SPARK-27852][Spark Core] updateBytesWritten() operaton is missed#24720
whatlulumomo wants to merge 1 commit intoapache:masterfrom
whatlulumomo:master

Conversation

@whatlulumomo
Copy link

@whatlulumomo whatlulumomo commented May 27, 2019

What changes were proposed in this pull request?

one line code maybe missed in core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala

 override def write(kvBytes: Array[Byte], offs: Int, len: Int): Unit = {
     if (!streamOpen) {
       open()
     }
     bs.write(kvBytes, offs, len)
+    updateBytesWritten()   // the function is missed
 } 

Possible Patch Link

…ed in DiskBlockObjectWriter.scala

In DiskBlockObjectWriter.scala, there are 2 overload write functions, the first of which executes updateBytesWritten function while the other doesn't. I think writeMetrics should record all the information about writing operations, some data of which will be displayed in the Spark jobs UI such as the data size of shuffle read and shuffle write.
@whatlulumomo whatlulumomo reopened this May 27, 2019
@whatlulumomo whatlulumomo changed the title [SPARK-27852][Spark Core] One updateBytesWritten operaton may be miss… [SPARK-27852][Spark Core] updateBytesWritten() operaton is missed May 27, 2019
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@srowen
Copy link
Member

srowen commented May 27, 2019

This would make it update metrics on every write. It appears this is purposely done only every 16,000 records for this reason.

@whatlulumomo
Copy link
Author

whatlulumomo commented May 27, 2019

This would make it update metrics on every write. It appears this is purposely done only every 16,000 records for this reason.

 override def write(kvBytes: Array[Byte], offs: Int, len: Int): Unit = {
     if (!streamOpen) {
       open()
     }
     bs.write(kvBytes, offs, len)
 } 

In this write function, there isn't any work to update metrics or records. bs.write(kvBytes, offs, len) doesn't do the work, either. It seems strange.

@srowen
Copy link
Member

srowen commented May 27, 2019

Look at recordWritten().

@whatlulumomo
Copy link
Author

whatlulumomo commented May 28, 2019

Look at recordWritten().

recordWritten() is never used in override def write(kvBytes: Array[Byte], offs: Int, len: Int)

Here is the function body:

 override def write(kvBytes: Array[Byte], offs: Int, len: Int): Unit = {
     if (!streamOpen) {
       open()
     }
     bs.write(kvBytes, offs, len)
 } 

recordWritten() is called only in

def write(key: Any, value: Any) {
    if (!streamOpen) {
      open()
    }

    objOut.writeKey(key)
    objOut.writeValue(value)
    recordWritten()
  }

@HyukjinKwon
Copy link
Member

So, is the matrix not getting updated? what's the issue this PR fixes?

@srowen
Copy link
Member

srowen commented May 28, 2019

@BestOreo the caller calls write potentially may times to write a record, then calls recordWritten, where metrics are update every 16384 records. There's a write overload that writes a whole record at once and then calls recordWritten. We specifically don't want to update metrics several times for every record. That would be very slow.

@srowen srowen closed this May 28, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants