-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-7390: [C++][Dataset] Fix RecordBatchProjector race #6661
ARROW-7390: [C++][Dataset] Fix RecordBatchProjector race #6661
Conversation
I could not come up with an deterministic unit test for this, it also had to be run in release mode. I tested the issue with scanning the nyc dataset locally. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there multithreaded tests in the dataset tests? Did you try running them with TSAN enabled?
@@ -44,7 +44,10 @@ static inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it, | |||
RecordBatchProjector* projector, | |||
MemoryPool* pool) { | |||
return MakeMaybeMapIterator( | |||
[=](std::shared_ptr<RecordBatch> in) { return projector->Project(*in, pool); }, | |||
[=](std::shared_ptr<RecordBatch> in) { | |||
RecordBatchProjector local_projector{*projector}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... perhaps add a comment explaining why this is needed?
5cc5cb3
to
523ddfc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
cpp/src/arrow/dataset/projector.cc
Outdated
@@ -63,7 +63,7 @@ Status RecordBatchProjector::SetDefaultValue(FieldRef ref, | |||
|
|||
Result<std::shared_ptr<RecordBatch>> RecordBatchProjector::Project( | |||
const RecordBatch& batch, MemoryPool* pool) { | |||
if (from_ == nullptr || !batch.schema()->Equals(*from_)) { | |||
if (from_ == nullptr || !batch.schema()->Equals(*from_, false /*check_metadata*/)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we should try to use /*param=*/
consistently (this is used elsewhere)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok!
+1 At some point we should examine Fragment::splitable() to determine whether ScanTasks from a single Fragment should be kept in a single thread. For such fragments local_projector will be unnecessary |
The RecordBatchProjector is shared accross ScanTasks of the same Fragment. The resize operation of missing columns is not thread safe. This change ensure that each ScanTask gets his own projector. The copy should not be costly since it's copying empty vectors and one shared pointer.
523ddfc
to
32ca42d
Compare
The RecordBatchProjector is shared accross ScanTasks of the same Fragment. The resize operation of missing columns is not thread safe. This change ensure that each ScanTask gets his own projector. The copy should not be costly since it's copying empty vectors and one shared pointer.