-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BugFix] Fix consistency problem #8896
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -150,7 +150,9 @@ class DataStreamRecvr::SenderQueue { | |
|
||
typedef std::list<ChunkItem> ChunkQueue; | ||
ChunkQueue _chunk_queue; | ||
#ifdef DEBUG | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this variable is only used in debug mode? |
||
bool _is_pipeline_level_shuffle_init = false; | ||
#endif | ||
bool _is_pipeline_level_shuffle = false; | ||
std::vector<bool> _has_chunks_per_driver_sequence; | ||
serde::ProtobufChunkMeta _chunk_meta; | ||
|
@@ -471,12 +473,21 @@ Status DataStreamRecvr::SenderQueue::add_chunks(const PTransmitChunkParams& requ | |
ChunkQueue chunks; | ||
size_t total_chunk_bytes = 0; | ||
faststring uncompressed_buffer; | ||
bool prev_is_pipeline_level_shuffle = _is_pipeline_level_shuffle; | ||
|
||
#ifdef DEBUG | ||
{ | ||
std::lock_guard<std::mutex> l(_lock); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need to keep so many codes only for debugging? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These code is only for DCHECK, the logic of pipeline level shuffle is too implied and complex, and I think adding |
||
bool prev_is_pipeline_level_shuffle = _is_pipeline_level_shuffle; | ||
_is_pipeline_level_shuffle = | ||
_recvr->_is_pipeline && request.has_is_pipeline_level_shuffle() && request.is_pipeline_level_shuffle(); | ||
// _is_pipeline_level_shuffle must be stable after first assignment | ||
DCHECK(!_is_pipeline_level_shuffle_init || (prev_is_pipeline_level_shuffle == _is_pipeline_level_shuffle)); | ||
_is_pipeline_level_shuffle_init = true; | ||
} | ||
#else | ||
_is_pipeline_level_shuffle = | ||
_recvr->_is_pipeline && request.has_is_pipeline_level_shuffle() && request.is_pipeline_level_shuffle(); | ||
// _is_pipeline_level_shuffle must be stable after first assignment | ||
DCHECK(!_is_pipeline_level_shuffle_init || (prev_is_pipeline_level_shuffle == _is_pipeline_level_shuffle)); | ||
_is_pipeline_level_shuffle_init = true; | ||
#endif | ||
|
||
if (use_pass_through) { | ||
ChunkUniquePtrVector swap_chunks; | ||
|
@@ -604,12 +615,20 @@ Status DataStreamRecvr::SenderQueue::add_chunks_and_keep_order(const PTransmitCh | |
size_t total_chunk_bytes = 0; | ||
faststring uncompressed_buffer; | ||
ChunkQueue local_chunk_queue; | ||
bool prev_is_pipeline_level_shuffle = _is_pipeline_level_shuffle; | ||
#ifdef DEBUG | ||
{ | ||
std::lock_guard<std::mutex> l(_lock); | ||
bool prev_is_pipeline_level_shuffle = _is_pipeline_level_shuffle; | ||
_is_pipeline_level_shuffle = | ||
_recvr->_is_pipeline && request.has_is_pipeline_level_shuffle() && request.is_pipeline_level_shuffle(); | ||
// _is_pipeline_level_shuffle must be stable after first assignment | ||
DCHECK(!_is_pipeline_level_shuffle_init || (prev_is_pipeline_level_shuffle == _is_pipeline_level_shuffle)); | ||
_is_pipeline_level_shuffle_init = true; | ||
} | ||
#else | ||
_is_pipeline_level_shuffle = | ||
_recvr->_is_pipeline && request.has_is_pipeline_level_shuffle() && request.is_pipeline_level_shuffle(); | ||
// _is_pipeline_level_shuffle must be stable after first assignment | ||
DCHECK(!_is_pipeline_level_shuffle_init || (prev_is_pipeline_level_shuffle == _is_pipeline_level_shuffle)); | ||
_is_pipeline_level_shuffle_init = true; | ||
#endif | ||
|
||
if (use_pass_through) { | ||
ChunkUniquePtrVector swap_chunks; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition here is redundant, ExchangeSinkOperator's ctor will take both of destionation number and dest dop into consideration.