Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "vec/sink/vmysql_table_writer.h"
#include "vec/sink/vtablet_sink.h"
#include "vec/sink/vmysql_table_sink.h"
#include "vec/sink/vresult_file_sink.h"

namespace doris {

Expand Down Expand Up @@ -91,12 +92,25 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
if (!thrift_sink.__isset.result_file_sink) {
return Status::InternalError("Missing result file sink.");
}
// Result file sink is not the top sink
if (params.__isset.destinations && params.destinations.size() > 0) {
tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink,
params.destinations, pool, params.sender_id, desc_tbl);

if (is_vec) {
if (params.__isset.destinations && params.destinations.size() > 0) {
tmp_sink = new doris::vectorized::VResultFileSink(
row_desc, output_exprs, thrift_sink.result_file_sink, params.destinations,
pool, params.sender_id, desc_tbl);
} else {
tmp_sink = new doris::vectorized::VResultFileSink(row_desc, output_exprs,
thrift_sink.result_file_sink);
}
} else {
tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink);
// Result file sink is not the top sink
if (params.__isset.destinations && params.destinations.size() > 0) {
tmp_sink =
new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink,
params.destinations, pool, params.sender_id, desc_tbl);
} else {
tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink);
}
}
sink->reset(tmp_sink);
break;
Expand Down
483 changes: 427 additions & 56 deletions be/src/exec/parquet_writer.cpp

Large diffs are not rendered by default.

21 changes: 19 additions & 2 deletions be/src/exec/parquet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
#include "gen_cpp/Types_types.h"
#include "runtime/row_batch.h"
#include "runtime/tuple.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr_context.h"

namespace doris {
class FileWriter;
Expand Down Expand Up @@ -70,10 +72,17 @@ class ParquetOutputStream : public arrow::io::OutputStream {
// a wrapper of parquet output stream
class ParquetWriterWrapper {
public:
ParquetWriterWrapper(FileWriter* file_writer, const std::vector<ExprContext*>& output_expr_ctxs,
ParquetWriterWrapper(FileWriter* file_writer, const std::vector<ExprContext*>* output_expr_ctxs,
const std::map<std::string, std::string>& properties,
const std::vector<std::vector<std::string>>& schema,
bool output_object_data);

ParquetWriterWrapper(FileWriter* file_writer,
const std::vector<vectorized::VExprContext*>* output_vexpr_ctxs,
const std::map<std::string, std::string>& properties,
const std::vector<std::vector<std::string>>& schema,
bool output_object_data);

virtual ~ParquetWriterWrapper();

Status write(const RowBatch& row_batch);
Expand All @@ -82,6 +91,10 @@ class ParquetWriterWrapper {

Status _write_one_row(TupleRow* row);

Status write(vectorized::Block& block);

Status _write_one_row(vectorized::Block& block, size_t row);

void close();

void parse_properties(const std::map<std::string, std::string>& propertie_map);
Expand All @@ -92,12 +105,16 @@ class ParquetWriterWrapper {

int64_t written_len();

Status error_msg(const std::string& field_type, const std::string& parquert_type, int index);

private:
std::shared_ptr<ParquetOutputStream> _outstream;
std::shared_ptr<parquet::WriterProperties> _properties;
std::shared_ptr<parquet::schema::GroupNode> _schema;
std::unique_ptr<parquet::ParquetFileWriter> _writer;
const std::vector<ExprContext*>& _output_expr_ctxs;
const std::vector<ExprContext*>* _output_expr_ctxs;
const std::vector<vectorized::VExprContext*>* _output_vexpr_ctxs;
std::vector<int> _column_ids;
std::vector<std::vector<std::string>> _str_schema;
int64_t _cur_writed_rows = 0;
parquet::RowGroupWriter* _rg_writer;
Expand Down
12 changes: 6 additions & 6 deletions be/src/runtime/file_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const size_t FileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;

// deprecated
FileResultWriter::FileResultWriter(const ResultFileOptions* file_opts,
const std::vector<ExprContext*>& output_expr_ctxs,
const std::vector<ExprContext*>* output_expr_ctxs,
RuntimeProfile* parent_profile, BufferControlBlock* sinker,
bool output_object_data)
: _file_opts(file_opts),
Expand All @@ -70,7 +70,7 @@ FileResultWriter::FileResultWriter(const ResultFileOptions* file_opts,
FileResultWriter::FileResultWriter(const ResultFileOptions* file_opts,
const TStorageBackendType::type storage_type,
const TUniqueId fragment_instance_id,
const std::vector<ExprContext*>& output_expr_ctxs,
const std::vector<ExprContext*>* output_expr_ctxs,
RuntimeProfile* parent_profile, BufferControlBlock* sinker,
RowBatch* output_batch, bool output_object_data)
: _file_opts(file_opts),
Expand Down Expand Up @@ -253,9 +253,9 @@ Status FileResultWriter::_write_csv_file(const RowBatch& batch) {
Status FileResultWriter::_write_one_row_as_csv(TupleRow* row) {
{
SCOPED_TIMER(_convert_tuple_timer);
int num_columns = _output_expr_ctxs.size();
int num_columns = _output_expr_ctxs->size();
for (int i = 0; i < num_columns; ++i) {
void* item = _output_expr_ctxs[i]->get_value(row);
void* item = (*_output_expr_ctxs)[i]->get_value(row);

if (item == nullptr) {
_plain_text_outstream << NULL_IN_CSV;
Expand All @@ -265,7 +265,7 @@ Status FileResultWriter::_write_one_row_as_csv(TupleRow* row) {
continue;
}

switch (_output_expr_ctxs[i]->root()->type().type) {
switch ((*_output_expr_ctxs)[i]->root()->type().type) {
case TYPE_BOOLEAN:
case TYPE_TINYINT:
_plain_text_outstream << (int)*static_cast<int8_t*>(item);
Expand Down Expand Up @@ -330,7 +330,7 @@ Status FileResultWriter::_write_one_row_as_csv(TupleRow* row) {
const DecimalV2Value decimal_val(
reinterpret_cast<const PackedInt128*>(item)->value);
std::string decimal_str;
int output_scale = _output_expr_ctxs[i]->root()->output_scale();
int output_scale = (*_output_expr_ctxs)[i]->root()->output_scale();
decimal_str = decimal_val.to_string(output_scale);
_plain_text_outstream << decimal_str;
break;
Expand Down
34 changes: 17 additions & 17 deletions be/src/runtime/file_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include "gen_cpp/Types_types.h"
#include "runtime/result_writer.h"
#include "runtime/runtime_state.h"

#include "vec/exprs/vexpr_context.h"
namespace doris {

class ExprContext;
Expand Down Expand Up @@ -75,16 +75,16 @@ struct ResultFileOptions {

class BufferControlBlock;
// write result to file
class FileResultWriter final : public ResultWriter {
class FileResultWriter : public ResultWriter {
public:
FileResultWriter(const ResultFileOptions* file_option,
const std::vector<ExprContext*>& output_expr_ctxs,
const std::vector<ExprContext*>* output_expr_ctxs,
RuntimeProfile* parent_profile, BufferControlBlock* sinker,
bool output_object_data);
FileResultWriter(const ResultFileOptions* file_option,
const TStorageBackendType::type storage_type,
const TUniqueId fragment_instance_id,
const std::vector<ExprContext*>& output_expr_ctxs,
const std::vector<ExprContext*>* output_expr_ctxs,
RuntimeProfile* parent_profile, BufferControlBlock* sinker,
RowBatch* output_batch, bool output_object_data);
virtual ~FileResultWriter();
Expand All @@ -96,41 +96,41 @@ class FileResultWriter final : public ResultWriter {
// file result writer always return statistic result in one row
virtual int64_t get_written_rows() const override { return 1; }

private:
// if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer
// if eos, write the data even if buffer is not full.
virtual Status _flush_plain_text_outstream(bool eos);
virtual Status _create_file_writer(const std::string& file_name);
// save result into batch rather than send it
virtual Status _fill_result_batch();
// close file writer, and if !done, it will create new writer for next file.
// if only_close is true, this method will just close the file writer and return.
virtual Status _close_file_writer(bool done, bool only_close = false);

protected:
Status _write_csv_file(const RowBatch& batch);
Status _write_parquet_file(const RowBatch& batch);
Status _write_one_row_as_csv(TupleRow* row);

// if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer
// if eos, write the data even if buffer is not full.
Status _flush_plain_text_outstream(bool eos);
void _init_profile();

Status _create_file_writer(const std::string& file_name);
Status _create_next_file_writer();
Status _create_success_file();
// get next export file name
Status _get_next_file_name(std::string* file_name);
Status _get_success_file_name(std::string* file_name);
Status _get_file_url(std::string* file_url);
std::string _file_format_to_name();
// close file writer, and if !done, it will create new writer for next file.
// if only_close is true, this method will just close the file writer and return.
Status _close_file_writer(bool done, bool only_close = false);
// create a new file if current file size exceed limit
Status _create_new_file_if_exceed_size();
// send the final statistic result
Status _send_result();
// save result into batch rather than send it
Status _fill_result_batch();

private:
protected:
RuntimeState* _state; // not owned, set when init
const ResultFileOptions* _file_opts;
TStorageBackendType::type _storage_type;
TUniqueId _fragment_instance_id;
const std::vector<ExprContext*>& _output_expr_ctxs;

const std::vector<ExprContext*>* _output_expr_ctxs;
// If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter.
// If the result file format is Parquet, this _file_writer is owned by _parquet_writer.
FileWriter* _file_writer = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/result_file_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ Status ResultFileSink::prepare(RuntimeState* state) {
state->fragment_instance_id(), _buf_size, &_sender));
// create writer
_writer.reset(new (std::nothrow) FileResultWriter(
_file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs,
_file_opts.get(), _storage_type, state->fragment_instance_id(), &_output_expr_ctxs,
_profile, _sender.get(), nullptr, state->return_object_data_as_binary()));
} else {
// init channel
Expand All @@ -115,7 +115,7 @@ Status ResultFileSink::prepare(RuntimeState* state) {
// create writer
_output_batch = new RowBatch(_output_row_descriptor, 1024, _mem_tracker.get());
_writer.reset(new (std::nothrow) FileResultWriter(
_file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs,
_file_opts.get(), _storage_type, state->fragment_instance_id(), &_output_expr_ctxs,
_profile, nullptr, _output_batch, state->return_object_data_as_binary()));
}
RETURN_IF_ERROR(_writer->init(state));
Expand Down
12 changes: 6 additions & 6 deletions be/src/runtime/result_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "util/uid_util.h"

#include "vec/exprs/vexpr.h"

namespace doris {
Expand Down Expand Up @@ -62,11 +61,12 @@ Status ResultSink::prepare_exprs(RuntimeState* state) {

Status ResultSink::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::prepare(state));
std::stringstream title;
title << "DataBufferSender (dst_fragment_instance_id="
<< print_id(state->fragment_instance_id()) << ")";

fmt::memory_buffer title;
fmt::format_to(title, "DataBufferSender ( dst_fragment_instance_id= {} ).",
print_id(state->fragment_instance_id()));
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
_profile = state->obj_pool()->add(new RuntimeProfile(fmt::to_string(title.data())));
// prepare output_expr
RETURN_IF_ERROR(prepare_exprs(state));

Expand All @@ -83,7 +83,7 @@ Status ResultSink::prepare(RuntimeState* state) {
// deprecated
case TResultSinkType::FILE:
CHECK(_file_opts.get() != nullptr);
_writer.reset(new (std::nothrow) FileResultWriter(_file_opts.get(), _output_expr_ctxs,
_writer.reset(new (std::nothrow) FileResultWriter(_file_opts.get(), &_output_expr_ctxs,
_profile, _sender.get(),
state->return_object_data_as_binary()));
break;
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ set(VEC_FILES
olap/block_reader.cpp
sink/mysql_result_writer.cpp
sink/result_sink.cpp
sink/vresult_file_sink.cpp
sink/vfile_result_writer.cpp
sink/vdata_stream_sender.cpp
sink/vtablet_sink.cpp
sink/vmysql_table_writer.cpp
Expand Down
10 changes: 5 additions & 5 deletions be/src/vec/sink/result_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "runtime/runtime_state.h"
#include "vec/exprs/vexpr.h"
#include "vec/sink/mysql_result_writer.h"
#include "vec/sink/vfile_result_writer.h"

namespace doris {
namespace vectorized {
Expand Down Expand Up @@ -77,11 +78,10 @@ Status VResultSink::prepare(RuntimeState* state) {
break;
case TResultSinkType::FILE:
CHECK(_file_opts.get() != nullptr);
return Status::InternalError("Unsupport vfile result sink type");
// TODO:
/* _writer.reset(new (std::nothrow) FileResultWriter(_file_opts.get(), _output_expr_ctxs,*/
/*_profile, _sender.get()));*/
// break;
_writer.reset(new (std::nothrow) VFileResultWriter(_file_opts.get(), &_output_vexpr_ctxs,
_profile, _sender.get(),
state->return_object_data_as_binary()));
break;
default:
return Status::InternalError("Unknown result sink type");
}
Expand Down
9 changes: 9 additions & 0 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ void VDataStreamSender::Channel::ch_roll_pb_block() {
_ch_cur_pb_block = (_ch_cur_pb_block == &_ch_pb_block1 ? &_ch_pb_block2 : &_ch_pb_block1);
}

VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc)
: _sender_id(sender_id),
_pool(pool),
_row_desc(row_desc),
_cur_pb_block(&_pb_block1),
_serialize_batch_timer(nullptr),
_bytes_sent_counter(nullptr),
_local_bytes_send_counter(nullptr) {}

VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
Expand Down
8 changes: 5 additions & 3 deletions be/src/vec/sink/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ namespace vectorized {
class VExprContext;
class VPartitionInfo;

class VDataStreamSender final : public DataSink {
class VDataStreamSender : public DataSink {
public:
VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc);

VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
int per_channel_buffer_size, bool send_query_statistics_with_every_batch);

~VDataStreamSender();
virtual ~VDataStreamSender();

virtual Status init(const TDataSink& thrift_sink) override;

Expand All @@ -72,7 +74,7 @@ class VDataStreamSender final : public DataSink {
private:
void _roll_pb_block();

private:
protected:
class Channel;

Status get_partition_column_result(Block* block, int* result) const {
Expand Down
Loading