Skip to content

Commit

Permalink
[BugFix] Fix consistency problem
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyehcf committed Jul 19, 2022
1 parent 7ee482c commit d7ba635
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
7 changes: 1 addition & 6 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,12 +546,7 @@ Status FragmentExecutor::_decompose_data_sink_to_operator(RuntimeState* runtime_
if (sender->get_partition_type() == TPartitionType::HASH_PARTITIONED ||
sender->get_partition_type() == TPartitionType::BUCKET_SHUFFLE_HASH_PARTITIONED) {
dest_dop = t_stream_sink.dest_dop;

// UNPARTITIONED mode will be performed if both num of destination and dest dop is 1
// So we only enable pipeline level shuffle when num of destination or dest dop is greater than 1
if (sender->destinations().size() > 1 || dest_dop > 1) {
is_pipeline_level_shuffle = true;
}
is_pipeline_level_shuffle = true;
DCHECK_GT(dest_dop, 0);
}

Expand Down
35 changes: 27 additions & 8 deletions be/src/runtime/data_stream_recvr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ class DataStreamRecvr::SenderQueue {

typedef std::list<ChunkItem> ChunkQueue;
ChunkQueue _chunk_queue;
#ifdef DEBUG
bool _is_pipeline_level_shuffle_init = false;
#endif
bool _is_pipeline_level_shuffle = false;
std::vector<bool> _has_chunks_per_driver_sequence;
serde::ProtobufChunkMeta _chunk_meta;
Expand Down Expand Up @@ -471,12 +473,21 @@ Status DataStreamRecvr::SenderQueue::add_chunks(const PTransmitChunkParams& requ
ChunkQueue chunks;
size_t total_chunk_bytes = 0;
faststring uncompressed_buffer;
bool prev_is_pipeline_level_shuffle = _is_pipeline_level_shuffle;

#ifdef DEBUG
{
std::lock_guard<std::mutex> l(_lock);
bool prev_is_pipeline_level_shuffle = _is_pipeline_level_shuffle;
_is_pipeline_level_shuffle =
_recvr->_is_pipeline && request.has_is_pipeline_level_shuffle() && request.is_pipeline_level_shuffle();
// _is_pipeline_level_shuffle must be stable after first assignment
DCHECK(!_is_pipeline_level_shuffle_init || (prev_is_pipeline_level_shuffle == _is_pipeline_level_shuffle));
_is_pipeline_level_shuffle_init = true;
}
#else
_is_pipeline_level_shuffle =
_recvr->_is_pipeline && request.has_is_pipeline_level_shuffle() && request.is_pipeline_level_shuffle();
// _is_pipeline_level_shuffle must be stable after first assignment
DCHECK(!_is_pipeline_level_shuffle_init || (prev_is_pipeline_level_shuffle == _is_pipeline_level_shuffle));
_is_pipeline_level_shuffle_init = true;
#endif

if (use_pass_through) {
ChunkUniquePtrVector swap_chunks;
Expand Down Expand Up @@ -604,12 +615,20 @@ Status DataStreamRecvr::SenderQueue::add_chunks_and_keep_order(const PTransmitCh
size_t total_chunk_bytes = 0;
faststring uncompressed_buffer;
ChunkQueue local_chunk_queue;
bool prev_is_pipeline_level_shuffle = _is_pipeline_level_shuffle;
#ifdef DEBUG
{
std::lock_guard<std::mutex> l(_lock);
bool prev_is_pipeline_level_shuffle = _is_pipeline_level_shuffle;
_is_pipeline_level_shuffle =
_recvr->_is_pipeline && request.has_is_pipeline_level_shuffle() && request.is_pipeline_level_shuffle();
// _is_pipeline_level_shuffle must be stable after first assignment
DCHECK(!_is_pipeline_level_shuffle_init || (prev_is_pipeline_level_shuffle == _is_pipeline_level_shuffle));
_is_pipeline_level_shuffle_init = true;
}
#else
_is_pipeline_level_shuffle =
_recvr->_is_pipeline && request.has_is_pipeline_level_shuffle() && request.is_pipeline_level_shuffle();
// _is_pipeline_level_shuffle must be stable after first assignment
DCHECK(!_is_pipeline_level_shuffle_init || (prev_is_pipeline_level_shuffle == _is_pipeline_level_shuffle));
_is_pipeline_level_shuffle_init = true;
#endif

if (use_pass_through) {
ChunkUniquePtrVector swap_chunks;
Expand Down

0 comments on commit d7ba635

Please sign in to comment.