-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18012][SQL] Simplify WriterContainer #15551
Conversation
cc @cloud-fan, @liancheng, @hvanhovell What do you think about the high level idea? I got rid of WriterContainer and the associated OOP, since there isn't really any polymorphism in the old code. There was an "if" branch that triggers which object to instantiate. I basically moved that if and object hierarchy into an if branch in executeTask. I'm hoping this provides a clearer high level control flow. |
object WriteOutput extends Logging { | ||
|
||
def write( | ||
sparkSession: SparkSession, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not too happy this function has a lot of arguments
} | ||
} | ||
|
||
class WriteJobDescription( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this includes the all description needed in each task for the write job.
writer.writeInternal(internalRow) | ||
} | ||
|
||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like i can remove this try/catch
committer, | ||
iterator) | ||
} else { | ||
executeDynamicPartitionWriteTask( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note to self: it'd be better if we can move the task commit and abort logic into this function
description.outputFormatClass, taskAttemptContext, description.path, description.isAppend) | ||
committer.setupTask(taskAttemptContext) | ||
|
||
if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we will run this if-else at executor side right? Not a big deal, and worth it to make the code simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup.
Test build #67180 has finished for PR 15551 at commit
|
Test build #67182 has finished for PR 15551 at commit
|
Test build #67183 has finished for PR 15551 at commit
|
Test build #67209 has finished for PR 15551 at commit
|
Test build #67214 has finished for PR 15551 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, left some minor comments and questions.
SparkHadoopMapRedUtil.commitTask(committer, taskAttemptContext, jobId.getId, taskId.getId) | ||
})(catchBlock = { | ||
// If there is an error, release resource and then abort the task | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we eliminate this try
and just put the finally
clause into finallyBlock
of tryWithSafeFinallyAndFailureCallbacks
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No because we can't abort the task if there is no error (which would happen if we put it in finally).
} catch { | ||
case t: Throwable => | ||
throw new SparkException("Task failed while writing rows", t) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that the only purpose of this outermost try
is to wrap SparkException
over any thrown exception.
Curious about the contract here, when do we want a SparkException
rather than some random exception class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm preserving old behavior here.
|
||
// Returns the data columns to be written given an input row | ||
val getOutputRow = | ||
UnsafeProjection.create(description.nonPartitionColumns, description.allColumns) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Indentation is off.
Test build #67223 has finished for PR 15551 at commit
|
Test build #67226 has finished for PR 15551 at commit
|
Test build #67227 has finished for PR 15551 at commit
|
LGTM |
cc @tejasapatil fyi on the change |
@rxin : Thanks for notifying me. |
## What changes were proposed in this pull request? This patch refactors WriterContainer to simplify the logic and make control flow more obvious.The previous code setup made it pretty difficult to track the actual dependencies on variables and setups because the driver side and the executor side were using the same set of variables. ## How was this patch tested? N/A - this should be covered by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes apache#15551 from rxin/writercontainer-refactor.
## What changes were proposed in this pull request? This patch refactors WriterContainer to simplify the logic and make control flow more obvious.The previous code setup made it pretty difficult to track the actual dependencies on variables and setups because the driver side and the executor side were using the same set of variables. ## How was this patch tested? N/A - this should be covered by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes apache#15551 from rxin/writercontainer-refactor.
What changes were proposed in this pull request?
This patch refactors WriterContainer to simplify the logic and make control flow more obvious.The previous code setup made it pretty difficult to track the actual dependencies on variables and setups because the driver side and the executor side were using the same set of variables.
How was this patch tested?
N/A - this should be covered by existing tests.