Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <utility>

#include "common/config.h"
#include "vec/sink/multi_cast_data_stream_sink.h"
#include "vec/sink/vdata_stream_sender.h"
#include "vec/sink/vjdbc_table_sink.h"
#include "vec/sink/vmemory_scratch_sink.h"
Expand Down Expand Up @@ -161,6 +162,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
return Status::NotSupported("MULTI_CAST_DATA_STREAM_SINK only support in pipeline engine");
}

default: {
std::stringstream error_msg;
Expand Down Expand Up @@ -302,6 +306,14 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
auto multi_cast_data_streamer = std::make_shared<pipeline::MultiCastDataStreamer>(
row_desc, pool, thrift_sink.multi_cast_stream_sink.sinks.size());
sink->reset(new vectorized::MultiCastDataStreamSink(multi_cast_data_streamer));
break;
}

default: {
std::stringstream error_msg;
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ set(PIPELINE_FILES
exec/union_source_operator.cpp
exec/data_queue.cpp
exec/select_operator.cpp
exec/empty_source_operator.cpp)
exec/empty_source_operator.cpp
exec/multi_cast_data_streamer.cpp
exec/multi_cast_data_stream_source.cpp)

if (WITH_MYSQL)
set(PIPELINE_FILES
Expand Down
27 changes: 20 additions & 7 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,33 @@ class DataSink;
namespace doris::pipeline {

ExchangeSinkOperatorBuilder::ExchangeSinkOperatorBuilder(int32_t id, DataSink* sink,
PipelineFragmentContext* context)
: DataSinkOperatorBuilder(id, "ExchangeSinkOperator", sink), _context(context) {}
PipelineFragmentContext* context,
int mult_cast_id)
: DataSinkOperatorBuilder(id, "ExchangeSinkOperator", sink),
_context(context),
_mult_cast_id(mult_cast_id) {}

OperatorPtr ExchangeSinkOperatorBuilder::build_operator() {
return std::make_shared<ExchangeSinkOperator>(this, _sink, _context);
return std::make_shared<ExchangeSinkOperator>(this, _sink, _context, _mult_cast_id);
}

ExchangeSinkOperator::ExchangeSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink,
PipelineFragmentContext* context)
: DataSinkOperator(operator_builder, sink), _context(context) {}
PipelineFragmentContext* context, int mult_cast_id)
: DataSinkOperator(operator_builder, sink),
_context(context),
_mult_cast_id(mult_cast_id) {}

Status ExchangeSinkOperator::init(const TDataSink& tsink) {
RETURN_IF_ERROR(_sink->init(tsink));
_dest_node_id = tsink.stream_sink.dest_node_id;
// -1 means not the mult cast stream sender
if (_mult_cast_id == -1) {
RETURN_IF_ERROR(_sink->init(tsink));
_dest_node_id = tsink.stream_sink.dest_node_id;
} else {
TDataSink new_t_sink;
new_t_sink.stream_sink = tsink.multi_cast_stream_sink.sinks[_mult_cast_id];
RETURN_IF_ERROR(_sink->init(new_t_sink));
_dest_node_id = tsink.multi_cast_stream_sink.sinks[_mult_cast_id].dest_node_id;
}
return Status::OK();
}

Expand Down
7 changes: 5 additions & 2 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,21 @@ class PipelineFragmentContext;
class ExchangeSinkOperatorBuilder final
: public DataSinkOperatorBuilder<vectorized::VDataStreamSender> {
public:
ExchangeSinkOperatorBuilder(int32_t id, DataSink* sink, PipelineFragmentContext* context);
ExchangeSinkOperatorBuilder(int32_t id, DataSink* sink, PipelineFragmentContext* context,
int mult_cast_id = -1);

OperatorPtr build_operator() override;

private:
PipelineFragmentContext* _context;
int _mult_cast_id = -1;
};

// Now local exchange is not supported since VDataStreamRecvr is considered as a pipeline broker.
class ExchangeSinkOperator final : public DataSinkOperator<ExchangeSinkOperatorBuilder> {
public:
ExchangeSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink,
PipelineFragmentContext* context);
PipelineFragmentContext* context, int mult_cast_id);
Status init(const TDataSink& tsink) override;

Status prepare(RuntimeState* state) override;
Expand All @@ -65,6 +67,7 @@ class ExchangeSinkOperator final : public DataSinkOperator<ExchangeSinkOperatorB
int _dest_node_id = -1;
RuntimeState* _state = nullptr;
PipelineFragmentContext* _context;
int _mult_cast_id = -1;
};

} // namespace pipeline
Expand Down
47 changes: 47 additions & 0 deletions be/src/pipeline/exec/multi_cast_data_stream_sink.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "operator.h"
#include "vec/sink/multi_cast_data_stream_sink.h"

namespace doris::pipeline {

class MultiCastDataStreamSinkOperatorBuilder final
: public DataSinkOperatorBuilder<vectorized::MultiCastDataStreamSink> {
public:
MultiCastDataStreamSinkOperatorBuilder(int32_t id, DataSink* sink)
: DataSinkOperatorBuilder(id, "MultiCastDataStreamSinkOperator", sink) {}

OperatorPtr build_operator() override;
};

class MultiCastDataStreamSinkOperator final
: public DataSinkOperator<MultiCastDataStreamSinkOperatorBuilder> {
public:
MultiCastDataStreamSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink)
: DataSinkOperator(operator_builder, sink) {}

bool can_write() override { return true; }
};

OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() {
return std::make_shared<MultiCastDataStreamSinkOperator>(this, _sink);
}

} // namespace doris::pipeline
64 changes: 64 additions & 0 deletions be/src/pipeline/exec/multi_cast_data_stream_source.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "multi_cast_data_stream_source.h"

#include <functional>

#include "common/status.h"
#include "pipeline/exec/multi_cast_data_streamer.h"
#include "pipeline/exec/operator.h"
#include "vec/core/block.h"

namespace doris::pipeline {

MultiCastDataStreamerSourceOperatorBuilder::MultiCastDataStreamerSourceOperatorBuilder(
int32_t id, const int consumer_id, std::shared_ptr<MultiCastDataStreamer>& data_streamer)
: OperatorBuilderBase(id, "MultiCastDataStreamerSourceOperator"),
_consumer_id(consumer_id),
_multi_cast_data_streamer(data_streamer) {};

OperatorPtr MultiCastDataStreamerSourceOperatorBuilder::build_operator() {
return std::make_shared<MultiCastDataStreamerSourceOperator>(this, _consumer_id,
_multi_cast_data_streamer);
}

const RowDescriptor& MultiCastDataStreamerSourceOperatorBuilder::row_desc() {
return _multi_cast_data_streamer->row_desc();
}

MultiCastDataStreamerSourceOperator::MultiCastDataStreamerSourceOperator(
OperatorBuilderBase* operator_builder, const int consumer_id,
std::shared_ptr<MultiCastDataStreamer>& data_streamer)
: OperatorBase(operator_builder),
_consumer_id(consumer_id),
_multi_cast_data_streamer(data_streamer) {};

bool MultiCastDataStreamerSourceOperator::can_read() {
return _multi_cast_data_streamer->can_read(_consumer_id);
}

Status MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
bool eos = false;
_multi_cast_data_streamer->pull(_consumer_id, block, &eos);
if (eos) {
source_state = SourceState::FINISHED;
}
return Status::OK();
}
} // namespace doris::pipeline
78 changes: 78 additions & 0 deletions be/src/pipeline/exec/multi_cast_data_stream_source.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once

#include <stdint.h>

#include <memory>

#include "common/status.h"
#include "operator.h"

namespace doris {
class ExecNode;
class RuntimeState;

namespace vectorized {
class Block;
} // namespace vectorized

namespace pipeline {
class MultiCastDataStreamer;

class MultiCastDataStreamerSourceOperatorBuilder final : public OperatorBuilderBase {
public:
MultiCastDataStreamerSourceOperatorBuilder(int32_t id, const int consumer_id,
std::shared_ptr<MultiCastDataStreamer>&);

bool is_source() const override { return true; }

OperatorPtr build_operator() override;

const RowDescriptor& row_desc() override;

private:
const int _consumer_id;
std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer;
};

class MultiCastDataStreamerSourceOperator final : public OperatorBase {
public:
MultiCastDataStreamerSourceOperator(OperatorBuilderBase* operator_builder,
const int consumer_id,
std::shared_ptr<MultiCastDataStreamer>& data_streamer);

Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;

Status prepare(RuntimeState* state) override { return Status::OK(); };

Status open(RuntimeState* state) override { return Status::OK(); };

Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override {
return Status::OK();
}

bool can_read() override;

private:
const int _consumer_id;
std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer;
};

} // namespace pipeline
} // namespace doris
75 changes: 75 additions & 0 deletions be/src/pipeline/exec/multi_cast_data_streamer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "multi_cast_data_streamer.h"

#include "runtime/runtime_state.h"

namespace doris::pipeline {

MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size)
: _used_count(used_count), _mem_size(mem_size) {
_block = vectorized::Block::create_unique(block->get_columns_with_type_and_name());
block->clear();
}

void MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) {
std::lock_guard l(_mutex);
auto& pos_to_pull = _sender_pos_to_read[sender_idx];
if (pos_to_pull != _multi_cast_blocks.end()) {
if (pos_to_pull->_used_count == 1) {
DCHECK(pos_to_pull == _multi_cast_blocks.begin());
pos_to_pull->_block->swap(*block);

_cumulative_mem_size -= pos_to_pull->_mem_size;
pos_to_pull++;
_multi_cast_blocks.pop_front();
} else {
pos_to_pull->_used_count--;
pos_to_pull->_block->create_same_struct_block(0)->swap(*block);
(void)vectorized::MutableBlock(block).merge(*pos_to_pull->_block);
pos_to_pull++;
}
}
*eos = _eos and pos_to_pull == _multi_cast_blocks.end();
}

void MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block* block, bool eos) {
auto rows = block->rows();
COUNTER_UPDATE(_process_rows, rows);

auto block_mem_size = block->allocated_bytes();
std::lock_guard l(_mutex);
int need_process_count = _cast_sender_count - _opened_sender_count;
// TODO: if the [queue back block rows + block->rows()] < batch_size, better
// do merge block. but need check the need_process_count and used_count whether
// equal
_multi_cast_blocks.emplace_back(block, need_process_count, block_mem_size);
_cumulative_mem_size += block_mem_size;
COUNTER_SET(_peak_mem_usage, std::max(_cumulative_mem_size, _peak_mem_usage->value()));

auto end = _multi_cast_blocks.end();
end--;
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
_sender_pos_to_read[i] = end;
}
}
_eos = eos;
}

} // namespace doris::pipeline
Loading