diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index 5a4eba9f1dd734..4554d9ee470102 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -40,6 +40,7 @@ #include "vec/sink/vmysql_table_writer.h" #include "vec/sink/vtablet_sink.h" #include "vec/sink/vmysql_table_sink.h" +#include "vec/sink/vresult_file_sink.h" namespace doris { @@ -91,12 +92,25 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink if (!thrift_sink.__isset.result_file_sink) { return Status::InternalError("Missing result file sink."); } - // Result file sink is not the top sink - if (params.__isset.destinations && params.destinations.size() > 0) { - tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink, - params.destinations, pool, params.sender_id, desc_tbl); + + if (is_vec) { + if (params.__isset.destinations && params.destinations.size() > 0) { + tmp_sink = new doris::vectorized::VResultFileSink( + row_desc, output_exprs, thrift_sink.result_file_sink, params.destinations, + pool, params.sender_id, desc_tbl); + } else { + tmp_sink = new doris::vectorized::VResultFileSink(row_desc, output_exprs, + thrift_sink.result_file_sink); + } } else { - tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink); + // Result file sink is not the top sink + if (params.__isset.destinations && params.destinations.size() > 0) { + tmp_sink = + new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink, + params.destinations, pool, params.sender_id, desc_tbl); + } else { + tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink); + } } sink->reset(tmp_sink); break; diff --git a/be/src/exec/parquet_writer.cpp b/be/src/exec/parquet_writer.cpp index bf8be94097474a..1c3f02cb0dcfac 100644 --- a/be/src/exec/parquet_writer.cpp +++ b/be/src/exec/parquet_writer.cpp @@ -32,6 +32,8 @@ #include "runtime/mem_pool.h" #include "util/thrift_util.h" #include "util/types.h" +#include "vec/core/materialize_block.h" +#include "vec/exprs/vexpr.h" namespace doris { @@ -89,11 +91,28 @@ void ParquetOutputStream::set_written_len(int64_t written_len) { /// ParquetWriterWrapper ParquetWriterWrapper::ParquetWriterWrapper(FileWriter* file_writer, - const std::vector& output_expr_ctxs, + const std::vector* output_expr_ctxs, const std::map& properties, const std::vector>& schema, bool output_object_data) : _output_expr_ctxs(output_expr_ctxs), + _output_vexpr_ctxs(nullptr), + _str_schema(schema), + _cur_writed_rows(0), + _rg_writer(nullptr), + _output_object_data(output_object_data) { + _outstream = std::shared_ptr(new ParquetOutputStream(file_writer)); + parse_properties(properties); + parse_schema(schema); + init_parquet_writer(); +} + +ParquetWriterWrapper::ParquetWriterWrapper( + FileWriter* file_writer, const std::vector* output_vexpr_ctxs, + const std::map& properties, + const std::vector>& schema, bool output_object_data) + : _output_expr_ctxs(nullptr), + _output_vexpr_ctxs(output_vexpr_ctxs), _str_schema(schema), _cur_writed_rows(0), _rg_writer(nullptr), @@ -203,6 +222,25 @@ Status ParquetWriterWrapper::write(const RowBatch& row_batch) { return Status::OK(); } +Status ParquetWriterWrapper::write(vectorized::Block& block) { + int num_columns = _output_vexpr_ctxs->size(); + _column_ids.resize(num_columns); + for (int i = 0; i < num_columns; ++i) { + int column_id = -1; + (*_output_vexpr_ctxs)[i]->execute(&block, &column_id); + _column_ids[i] = column_id; + } + + int num_rows = block.rows(); + materialize_block_inplace(block, _column_ids.begin(), _column_ids.end()); + + for (int i = 0; i < num_rows; ++i) { + RETURN_IF_ERROR(_write_one_row(block, i)); + _cur_writed_rows++; + } + return Status::OK(); +} + Status ParquetWriterWrapper::init_parquet_writer() { _writer = parquet::ParquetFileWriter::Open(_outstream, _schema, _properties); if (_writer == nullptr) { @@ -224,20 +262,17 @@ parquet::RowGroupWriter* ParquetWriterWrapper::get_rg_writer() { } Status ParquetWriterWrapper::_write_one_row(TupleRow* row) { - int num_columns = _output_expr_ctxs.size(); + int num_columns = _output_expr_ctxs->size(); if (num_columns != _str_schema.size()) { return Status::InternalError("project field size is not equal to schema column size"); } try { for (int index = 0; index < num_columns; ++index) { - void* item = _output_expr_ctxs[index]->get_value(row); - switch (_output_expr_ctxs[index]->root()->type().type) { + void* item = (*_output_expr_ctxs)[index]->get_value(row); + switch ((*_output_expr_ctxs)[index]->root()->type().type) { case TYPE_BOOLEAN: { if (_str_schema[index][1] != "boolean") { - std::stringstream ss; - ss << "project field type is boolean, but the definition type of column " - << _str_schema[index][2] << " is " << _str_schema[index][1]; - return Status::InvalidArgument(ss.str()); + return error_msg("boolean", "boolean", index); } parquet::RowGroupWriter* rgWriter = get_rg_writer(); parquet::BoolWriter* col_writer = @@ -250,15 +285,43 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row) { } break; } - case TYPE_TINYINT: - case TYPE_SMALLINT: + case TYPE_TINYINT: { + if (_str_schema[index][1] != "int32") { + return error_msg("tiny int", "int32", index); + } + + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int32Writer* col_writer = + static_cast(rgWriter->column(index)); + if (item != nullptr) { + int32_t data = *(static_cast(item)); + col_writer->WriteBatch(1, nullptr, nullptr, &data); + } else { + int32_t default_int32 = 0; + col_writer->WriteBatch(1, nullptr, nullptr, &default_int32); + } + break; + } + case TYPE_SMALLINT: { + if (_str_schema[index][1] != "int32") { + return error_msg("small int", "int32", index); + } + + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int32Writer* col_writer = + static_cast(rgWriter->column(index)); + if (item != nullptr) { + int32_t data = *(static_cast(item)); + col_writer->WriteBatch(1, nullptr, nullptr, &data); + } else { + int32_t default_int32 = 0; + col_writer->WriteBatch(1, nullptr, nullptr, &default_int32); + } + break; + } case TYPE_INT: { if (_str_schema[index][1] != "int32") { - std::stringstream ss; - ss << "project field type is tiny int/small int/int, should use int32, but the " - "definition type of column " - << _str_schema[index][2] << " is " << _str_schema[index][1]; - return Status::InvalidArgument(ss.str()); + return error_msg("int", "int32", index); } parquet::RowGroupWriter* rgWriter = get_rg_writer(); @@ -274,11 +337,7 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row) { } case TYPE_BIGINT: { if (_str_schema[index][1] != "int64") { - std::stringstream ss; - ss << "project field type is big int, should use int64, but the definition " - "type of column " - << _str_schema[index][2] << " is " << _str_schema[index][1]; - return Status::InvalidArgument(ss.str()); + return error_msg("big int", "int64", index); } parquet::RowGroupWriter* rgWriter = get_rg_writer(); parquet::Int64Writer* col_writer = @@ -303,10 +362,7 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row) { } case TYPE_FLOAT: { if (_str_schema[index][1] != "float") { - std::stringstream ss; - ss << "project field type is float, but the definition type of column " - << _str_schema[index][2] << " is " << _str_schema[index][1]; - return Status::InvalidArgument(ss.str()); + return error_msg("float", "float", index); } parquet::RowGroupWriter* rgWriter = get_rg_writer(); parquet::FloatWriter* col_writer = @@ -321,10 +377,7 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row) { } case TYPE_DOUBLE: { if (_str_schema[index][1] != "double") { - std::stringstream ss; - ss << "project field type is double, but the definition type of column " - << _str_schema[index][2] << " is " << _str_schema[index][1]; - return Status::InvalidArgument(ss.str()); + return error_msg("double", "double", index); } parquet::RowGroupWriter* rgWriter = get_rg_writer(); parquet::DoubleWriter* col_writer = @@ -340,11 +393,7 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row) { case TYPE_DATETIME: case TYPE_DATE: { if (_str_schema[index][1] != "int64") { - std::stringstream ss; - ss << "project field type is date/datetime, should use int64, but the " - "definition type of column " - << _str_schema[index][2] << " is " << _str_schema[index][1]; - return Status::InvalidArgument(ss.str()); + return error_msg("date/datetime", "int64", index); } parquet::RowGroupWriter* rgWriter = get_rg_writer(); parquet::Int64Writer* col_writer = @@ -364,11 +413,7 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row) { case TYPE_OBJECT: { if (_output_object_data) { if (_str_schema[index][1] != "byte_array") { - std::stringstream ss; - ss << "project field type is hll/bitmap, should use byte_array, but the " - "definition type of column " - << _str_schema[index][2] << " is " << _str_schema[index][1]; - return Status::InvalidArgument(ss.str()); + return error_msg("hll/bitmap", "byte_array", index); } parquet::RowGroupWriter* rgWriter = get_rg_writer(); parquet::ByteArrayWriter* col_writer = @@ -384,10 +429,10 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row) { col_writer->WriteBatch(1, nullptr, nullptr, &value); } } else { - std::stringstream ss; - ss << "unsupported file format: " - << _output_expr_ctxs[index]->root()->type().type; - return Status::InvalidArgument(ss.str()); + fmt::memory_buffer err_ss; + fmt::format_to(err_ss, "unsupported file format: : {}.", + (*_output_vexpr_ctxs)[index]->root()->type().type); + return Status::InvalidArgument(fmt::to_string(err_ss.data())); } break; } @@ -395,11 +440,7 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row) { case TYPE_VARCHAR: case TYPE_STRING: { if (_str_schema[index][1] != "byte_array") { - std::stringstream ss; - ss << "project field type is char/varchar, should use byte_array, but the " - "definition type of column " - << _str_schema[index][2] << " is " << _str_schema[index][1]; - return Status::InvalidArgument(ss.str()); + return error_msg("char/varchar", "byte_array", index); } parquet::RowGroupWriter* rgWriter = get_rg_writer(); parquet::ByteArrayWriter* col_writer = @@ -418,11 +459,7 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row) { } case TYPE_DECIMALV2: { if (_str_schema[index][1] != "byte_array") { - std::stringstream ss; - ss << "project field type is decimal v2, should use byte_array, but the " - "definition type of column " - << _str_schema[index][2] << " is " << _str_schema[index][1]; - return Status::InvalidArgument(ss.str()); + return error_msg("decimal v2", "byte_array", index); } parquet::RowGroupWriter* rgWriter = get_rg_writer(); parquet::ByteArrayWriter* col_writer = @@ -431,7 +468,7 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row) { const DecimalV2Value decimal_val( reinterpret_cast(item)->value); char decimal_buffer[MAX_DECIMAL_WIDTH]; - int output_scale = _output_expr_ctxs[index]->root()->output_scale(); + int output_scale = (*_output_expr_ctxs)[index]->root()->output_scale(); parquet::ByteArray value; value.ptr = reinterpret_cast(decimal_buffer); value.len = decimal_val.to_buffer(decimal_buffer, output_scale); @@ -443,9 +480,333 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row) { break; } default: { - std::stringstream ss; - ss << "unsupported file format: " << _output_expr_ctxs[index]->root()->type().type; - return Status::InvalidArgument(ss.str()); + fmt::memory_buffer err_ss; + fmt::format_to(err_ss, "unsupported file format: : {}.", + (*_output_vexpr_ctxs)[index]->root()->type().type); + return Status::InvalidArgument(fmt::to_string(err_ss.data())); + } + } + } + } catch (const std::exception& e) { + LOG(WARNING) << "Parquet write error: " << e.what(); + return Status::InternalError(e.what()); + } + return Status::OK(); +} + +Status ParquetWriterWrapper::_write_one_row(vectorized::Block& block, size_t row) { + int num_columns = _output_vexpr_ctxs->size(); + if (num_columns != _str_schema.size()) { + return Status::InternalError("project field size is not equal to schema column size"); + } + try { + for (int index = 0; index < num_columns; ++index) { + bool is_null = false; + auto& column_ptr = block.get_by_position(_column_ids[index]).column; + auto& type_ptr = block.get_by_position(_column_ids[index]).type; + + vectorized::ColumnPtr column; + if (type_ptr->is_nullable()) { + column = assert_cast(*column_ptr) + .get_nested_column_ptr(); + if (column_ptr->is_null_at(row)) { + is_null = true; + } + } else { + is_null = false; + column = column_ptr; + } + + switch ((*_output_vexpr_ctxs)[index]->root()->result_type()) { + case TYPE_BOOLEAN: { + if (_str_schema[index][1] != "boolean") { + return error_msg("boolean", "boolean", index); + } + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::BoolWriter* col_writer = + static_cast(rgWriter->column(index)); + + if (is_null) { + bool default_bool = false; + col_writer->WriteBatch(1, nullptr, nullptr, &default_bool); + } else { + auto& data_column = assert_cast(*column); + uint8_t data = data_column.get_element(row); + col_writer->WriteBatch(1, nullptr, nullptr, + reinterpret_cast(&data)); + } + break; + } + + case TYPE_TINYINT: { + if (_str_schema[index][1] != "int32") { + return error_msg("tiny int", "int32", index); + } + + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int32Writer* col_writer = + static_cast(rgWriter->column(index)); + + if (is_null) { + int32_t default_int32 = 0; + col_writer->WriteBatch(1, nullptr, nullptr, &default_int32); + } else { + auto& data_column = assert_cast(*column); + int32_t data = data_column.get_element(row); + col_writer->WriteBatch(1, nullptr, nullptr, &data); + } + break; + } + + case TYPE_SMALLINT: { + if (_str_schema[index][1] != "int32") { + return error_msg("small int", "int32", index); + } + + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int32Writer* col_writer = + static_cast(rgWriter->column(index)); + + if (is_null) { + int32_t default_int32 = 0; + col_writer->WriteBatch(1, nullptr, nullptr, &default_int32); + } else { + auto& data_column = assert_cast(*column); + int32_t data = data_column.get_element(row); + col_writer->WriteBatch(1, nullptr, nullptr, + reinterpret_cast(&data)); + } + break; + } + + case TYPE_INT: { + if (_str_schema[index][1] != "int32") { + return error_msg("int", "int32", index); + } + + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int32Writer* col_writer = + static_cast(rgWriter->column(index)); + + if (is_null) { + int32_t default_int32 = 0; + col_writer->WriteBatch(1, nullptr, nullptr, &default_int32); + } else { + auto& data = assert_cast(*column).get_data(); + col_writer->WriteBatch(1, nullptr, nullptr, (int32_t*)(&data[row])); + } + break; + } + + case TYPE_BIGINT: { + if (_str_schema[index][1] != "int64") { + return error_msg("big int", "int64", index); + } + + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int64Writer* col_writer = + static_cast(rgWriter->column(index)); + + if (is_null) { + int64_t default_int64 = 0; + col_writer->WriteBatch(1, nullptr, nullptr, &default_int64); + } else { + auto& data = assert_cast(*column).get_data(); + col_writer->WriteBatch(1, nullptr, nullptr, (int64_t*)(&data[row])); + } + break; + } + + case TYPE_LARGEINT: { + // TODO: not support int_128 + // It is better write a default value, because rg_writer need all columns has value before flush to disk. + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int64Writer* col_writer = + static_cast(rgWriter->column(index)); + int64_t default_int64 = 0; + col_writer->WriteBatch(1, nullptr, nullptr, &default_int64); + return Status::InvalidArgument("do not support large int type."); + } + + case TYPE_FLOAT: { + if (_str_schema[index][1] != "float") { + return error_msg("float", "float", index); + } + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::FloatWriter* col_writer = + static_cast(rgWriter->column(index)); + if (is_null) { + float_t default_float = 0.0; + col_writer->WriteBatch(1, nullptr, nullptr, &default_float); + + } else { + auto& data = assert_cast(*column).get_data(); + col_writer->WriteBatch(1, nullptr, nullptr, (float_t*)(&data[row])); + } + break; + } + + case TYPE_DOUBLE: { + if (_str_schema[index][1] != "double") { + return error_msg("double", "double", index); + } + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::DoubleWriter* col_writer = + static_cast(rgWriter->column(index)); + if (is_null) { + double_t default_double = 0.0; + col_writer->WriteBatch(1, nullptr, nullptr, &default_double); + + } else { + auto& data = assert_cast(*column).get_data(); + col_writer->WriteBatch(1, nullptr, nullptr, (double_t*)(&data[row])); + } + break; + } + + case TYPE_STRING: + case TYPE_CHAR: + case TYPE_VARCHAR: { + if (_str_schema[index][1] != "byte_array") { + return error_msg("char/varchar", "byte_array", index); + } + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::ByteArrayWriter* col_writer = + static_cast(rgWriter->column(index)); + if (is_null) { + parquet::ByteArray value; + col_writer->WriteBatch(1, nullptr, nullptr, &value); + + } else { + const auto& string_val = + assert_cast(*column).get_data_at(row); + parquet::ByteArray value; + value.ptr = reinterpret_cast(string_val.data); + value.len = string_val.size; + col_writer->WriteBatch(1, nullptr, nullptr, &value); + } + break; + } + + case TYPE_DECIMALV2: { + if (_str_schema[index][1] != "byte_array") { + return error_msg("decimal v2", "byte_array", index); + } + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::ByteArrayWriter* col_writer = + static_cast(rgWriter->column(index)); + if (is_null) { + parquet::ByteArray value; + col_writer->WriteBatch(1, nullptr, nullptr, &value); + + } else { + DecimalV2Value decimal_val = + (DecimalV2Value)assert_cast< + const vectorized::ColumnDecimal&>( + *column) + .get_data()[row]; + char decimal_buffer[MAX_DECIMAL_WIDTH]; + // TODO: Support decimal output_scale accuracy + //int output_scale = (*_output_vexpr_ctxs)[index]->root()->output_scale(); + int output_scale = -1; + parquet::ByteArray value; + value.ptr = reinterpret_cast(decimal_buffer); + value.len = decimal_val.to_buffer(decimal_buffer, output_scale); + col_writer->WriteBatch(1, nullptr, nullptr, &value); + } + break; + } + + case TYPE_DATE: + case TYPE_DATETIME: { + if (_str_schema[index][1] != "int64") { + return error_msg("date/datetime", "int64", index); + } + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int64Writer* col_writer = + static_cast(rgWriter->column(index)); + if (is_null) { + int64_t default_int64 = 0; + col_writer->WriteBatch(1, nullptr, nullptr, &default_int64); + + } else { + int64_t int_val = + assert_cast(*column).get_data()[row]; + vectorized::VecDateTimeValue value = + binary_cast(int_val); + int64_t timestamp = value.to_olap_datetime(); + col_writer->WriteBatch(1, nullptr, nullptr, ×tamp); + } + break; + } + + case TYPE_OBJECT: { + if (_output_object_data) { + if (_str_schema[index][1] != "byte_array") { + return error_msg("object", "byte_array", index); + } + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::ByteArrayWriter* col_writer = + static_cast(rgWriter->column(index)); + if (is_null) { + parquet::ByteArray value; + col_writer->WriteBatch(1, nullptr, nullptr, &value); + + } else { + auto date_column = assert_cast(*column); + BitmapValue& data = date_column.get_element(row); + std::string bitmap_str(data.getSizeInBytes(), '0'); + data.write(bitmap_str.data()); + parquet::ByteArray value; + value.ptr = reinterpret_cast(bitmap_str.c_str()); + value.len = bitmap_str.length(); + col_writer->WriteBatch(1, nullptr, nullptr, &value); + } + } else { + fmt::memory_buffer err_ss; + fmt::format_to(err_ss, "unsupported file format: : {}.", + (*_output_vexpr_ctxs)[index]->root()->type().type); + return Status::InvalidArgument(fmt::to_string(err_ss.data())); + } + break; + } + + case TYPE_HLL: { + if (_output_object_data) { + if (_str_schema[index][1] != "byte_array") { + return error_msg("hll", "byte_array", index); + } + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::ByteArrayWriter* col_writer = + static_cast(rgWriter->column(index)); + if (is_null) { + parquet::ByteArray value; + col_writer->WriteBatch(1, nullptr, nullptr, &value); + + } else { + auto& data = + assert_cast(*column).get_data()[row]; + std::string hll_str(data.max_serialized_size(), '0'); + size_t actual_size = data.serialize((uint8_t*)hll_str.data()); + hll_str.resize(actual_size); + parquet::ByteArray value; + value.ptr = reinterpret_cast(hll_str.c_str()); + value.len = hll_str.length(); + col_writer->WriteBatch(1, nullptr, nullptr, &value); + } + } else { + fmt::memory_buffer err_ss; + fmt::format_to(err_ss, "unsupported file format: : {}.", + (*_output_vexpr_ctxs)[index]->root()->type().type); + return Status::InvalidArgument(fmt::to_string(err_ss.data())); + } + break; + } + default: { + fmt::memory_buffer err_ss; + fmt::format_to(err_ss, "unsupported file format: : {}.", + (*_output_vexpr_ctxs)[index]->root()->type().type); + return Status::InvalidArgument(fmt::to_string(err_ss.data())); } } } @@ -476,6 +837,16 @@ void ParquetWriterWrapper::close() { } } +Status ParquetWriterWrapper::error_msg(const std::string& field_type, + const std::string& parquert_type, int index) { + fmt::memory_buffer err_ss; + fmt::format_to(err_ss, + "project field type is {}, should use parquert_type {}, but the definition type " + "of column {} is {}.", + field_type, parquert_type, _str_schema[index][2], _str_schema[index][1]); + return Status::InvalidArgument(fmt::to_string(err_ss.data())); +} + ParquetWriterWrapper::~ParquetWriterWrapper() {} } // namespace doris diff --git a/be/src/exec/parquet_writer.h b/be/src/exec/parquet_writer.h index c076aed94101aa..c3c96935b9ba2f 100644 --- a/be/src/exec/parquet_writer.h +++ b/be/src/exec/parquet_writer.h @@ -39,6 +39,8 @@ #include "gen_cpp/Types_types.h" #include "runtime/row_batch.h" #include "runtime/tuple.h" +#include "vec/core/block.h" +#include "vec/exprs/vexpr_context.h" namespace doris { class FileWriter; @@ -70,10 +72,17 @@ class ParquetOutputStream : public arrow::io::OutputStream { // a wrapper of parquet output stream class ParquetWriterWrapper { public: - ParquetWriterWrapper(FileWriter* file_writer, const std::vector& output_expr_ctxs, + ParquetWriterWrapper(FileWriter* file_writer, const std::vector* output_expr_ctxs, const std::map& properties, const std::vector>& schema, bool output_object_data); + + ParquetWriterWrapper(FileWriter* file_writer, + const std::vector* output_vexpr_ctxs, + const std::map& properties, + const std::vector>& schema, + bool output_object_data); + virtual ~ParquetWriterWrapper(); Status write(const RowBatch& row_batch); @@ -82,6 +91,10 @@ class ParquetWriterWrapper { Status _write_one_row(TupleRow* row); + Status write(vectorized::Block& block); + + Status _write_one_row(vectorized::Block& block, size_t row); + void close(); void parse_properties(const std::map& propertie_map); @@ -92,12 +105,16 @@ class ParquetWriterWrapper { int64_t written_len(); + Status error_msg(const std::string& field_type, const std::string& parquert_type, int index); + private: std::shared_ptr _outstream; std::shared_ptr _properties; std::shared_ptr _schema; std::unique_ptr _writer; - const std::vector& _output_expr_ctxs; + const std::vector* _output_expr_ctxs; + const std::vector* _output_vexpr_ctxs; + std::vector _column_ids; std::vector> _str_schema; int64_t _cur_writed_rows = 0; parquet::RowGroupWriter* _rg_writer; diff --git a/be/src/runtime/file_result_writer.cpp b/be/src/runtime/file_result_writer.cpp index 6b20f5dfd0a881..1bf4176fed7d13 100644 --- a/be/src/runtime/file_result_writer.cpp +++ b/be/src/runtime/file_result_writer.cpp @@ -47,7 +47,7 @@ const size_t FileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024; // deprecated FileResultWriter::FileResultWriter(const ResultFileOptions* file_opts, - const std::vector& output_expr_ctxs, + const std::vector* output_expr_ctxs, RuntimeProfile* parent_profile, BufferControlBlock* sinker, bool output_object_data) : _file_opts(file_opts), @@ -70,7 +70,7 @@ FileResultWriter::FileResultWriter(const ResultFileOptions* file_opts, FileResultWriter::FileResultWriter(const ResultFileOptions* file_opts, const TStorageBackendType::type storage_type, const TUniqueId fragment_instance_id, - const std::vector& output_expr_ctxs, + const std::vector* output_expr_ctxs, RuntimeProfile* parent_profile, BufferControlBlock* sinker, RowBatch* output_batch, bool output_object_data) : _file_opts(file_opts), @@ -253,9 +253,9 @@ Status FileResultWriter::_write_csv_file(const RowBatch& batch) { Status FileResultWriter::_write_one_row_as_csv(TupleRow* row) { { SCOPED_TIMER(_convert_tuple_timer); - int num_columns = _output_expr_ctxs.size(); + int num_columns = _output_expr_ctxs->size(); for (int i = 0; i < num_columns; ++i) { - void* item = _output_expr_ctxs[i]->get_value(row); + void* item = (*_output_expr_ctxs)[i]->get_value(row); if (item == nullptr) { _plain_text_outstream << NULL_IN_CSV; @@ -265,7 +265,7 @@ Status FileResultWriter::_write_one_row_as_csv(TupleRow* row) { continue; } - switch (_output_expr_ctxs[i]->root()->type().type) { + switch ((*_output_expr_ctxs)[i]->root()->type().type) { case TYPE_BOOLEAN: case TYPE_TINYINT: _plain_text_outstream << (int)*static_cast(item); @@ -330,7 +330,7 @@ Status FileResultWriter::_write_one_row_as_csv(TupleRow* row) { const DecimalV2Value decimal_val( reinterpret_cast(item)->value); std::string decimal_str; - int output_scale = _output_expr_ctxs[i]->root()->output_scale(); + int output_scale = (*_output_expr_ctxs)[i]->root()->output_scale(); decimal_str = decimal_val.to_string(output_scale); _plain_text_outstream << decimal_str; break; diff --git a/be/src/runtime/file_result_writer.h b/be/src/runtime/file_result_writer.h index 95e329cf6fa2ac..8bc94f6f602bcf 100644 --- a/be/src/runtime/file_result_writer.h +++ b/be/src/runtime/file_result_writer.h @@ -21,7 +21,7 @@ #include "gen_cpp/Types_types.h" #include "runtime/result_writer.h" #include "runtime/runtime_state.h" - +#include "vec/exprs/vexpr_context.h" namespace doris { class ExprContext; @@ -75,16 +75,16 @@ struct ResultFileOptions { class BufferControlBlock; // write result to file -class FileResultWriter final : public ResultWriter { +class FileResultWriter : public ResultWriter { public: FileResultWriter(const ResultFileOptions* file_option, - const std::vector& output_expr_ctxs, + const std::vector* output_expr_ctxs, RuntimeProfile* parent_profile, BufferControlBlock* sinker, bool output_object_data); FileResultWriter(const ResultFileOptions* file_option, const TStorageBackendType::type storage_type, const TUniqueId fragment_instance_id, - const std::vector& output_expr_ctxs, + const std::vector* output_expr_ctxs, RuntimeProfile* parent_profile, BufferControlBlock* sinker, RowBatch* output_batch, bool output_object_data); virtual ~FileResultWriter(); @@ -96,17 +96,23 @@ class FileResultWriter final : public ResultWriter { // file result writer always return statistic result in one row virtual int64_t get_written_rows() const override { return 1; } -private: + // if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer + // if eos, write the data even if buffer is not full. + virtual Status _flush_plain_text_outstream(bool eos); + virtual Status _create_file_writer(const std::string& file_name); + // save result into batch rather than send it + virtual Status _fill_result_batch(); + // close file writer, and if !done, it will create new writer for next file. + // if only_close is true, this method will just close the file writer and return. + virtual Status _close_file_writer(bool done, bool only_close = false); + +protected: Status _write_csv_file(const RowBatch& batch); Status _write_parquet_file(const RowBatch& batch); Status _write_one_row_as_csv(TupleRow* row); - // if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer - // if eos, write the data even if buffer is not full. - Status _flush_plain_text_outstream(bool eos); void _init_profile(); - Status _create_file_writer(const std::string& file_name); Status _create_next_file_writer(); Status _create_success_file(); // get next export file name @@ -114,23 +120,17 @@ class FileResultWriter final : public ResultWriter { Status _get_success_file_name(std::string* file_name); Status _get_file_url(std::string* file_url); std::string _file_format_to_name(); - // close file writer, and if !done, it will create new writer for next file. - // if only_close is true, this method will just close the file writer and return. - Status _close_file_writer(bool done, bool only_close = false); // create a new file if current file size exceed limit Status _create_new_file_if_exceed_size(); // send the final statistic result Status _send_result(); - // save result into batch rather than send it - Status _fill_result_batch(); -private: +protected: RuntimeState* _state; // not owned, set when init const ResultFileOptions* _file_opts; TStorageBackendType::type _storage_type; TUniqueId _fragment_instance_id; - const std::vector& _output_expr_ctxs; - + const std::vector* _output_expr_ctxs; // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter. // If the result file format is Parquet, this _file_writer is owned by _parquet_writer. FileWriter* _file_writer = nullptr; diff --git a/be/src/runtime/result_file_sink.cpp b/be/src/runtime/result_file_sink.cpp index a26e4a38d70288..abfa8e2757af16 100644 --- a/be/src/runtime/result_file_sink.cpp +++ b/be/src/runtime/result_file_sink.cpp @@ -98,7 +98,7 @@ Status ResultFileSink::prepare(RuntimeState* state) { state->fragment_instance_id(), _buf_size, &_sender)); // create writer _writer.reset(new (std::nothrow) FileResultWriter( - _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs, + _file_opts.get(), _storage_type, state->fragment_instance_id(), &_output_expr_ctxs, _profile, _sender.get(), nullptr, state->return_object_data_as_binary())); } else { // init channel @@ -115,7 +115,7 @@ Status ResultFileSink::prepare(RuntimeState* state) { // create writer _output_batch = new RowBatch(_output_row_descriptor, 1024, _mem_tracker.get()); _writer.reset(new (std::nothrow) FileResultWriter( - _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs, + _file_opts.get(), _storage_type, state->fragment_instance_id(), &_output_expr_ctxs, _profile, nullptr, _output_batch, state->return_object_data_as_binary())); } RETURN_IF_ERROR(_writer->init(state)); diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp index 610f105074c5bb..1b855901a0286d 100644 --- a/be/src/runtime/result_sink.cpp +++ b/be/src/runtime/result_sink.cpp @@ -28,7 +28,6 @@ #include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "util/uid_util.h" - #include "vec/exprs/vexpr.h" namespace doris { @@ -62,11 +61,12 @@ Status ResultSink::prepare_exprs(RuntimeState* state) { Status ResultSink::prepare(RuntimeState* state) { RETURN_IF_ERROR(DataSink::prepare(state)); - std::stringstream title; - title << "DataBufferSender (dst_fragment_instance_id=" - << print_id(state->fragment_instance_id()) << ")"; + + fmt::memory_buffer title; + fmt::format_to(title, "DataBufferSender ( dst_fragment_instance_id= {} ).", + print_id(state->fragment_instance_id())); // create profile - _profile = state->obj_pool()->add(new RuntimeProfile(title.str())); + _profile = state->obj_pool()->add(new RuntimeProfile(fmt::to_string(title.data()))); // prepare output_expr RETURN_IF_ERROR(prepare_exprs(state)); @@ -83,7 +83,7 @@ Status ResultSink::prepare(RuntimeState* state) { // deprecated case TResultSinkType::FILE: CHECK(_file_opts.get() != nullptr); - _writer.reset(new (std::nothrow) FileResultWriter(_file_opts.get(), _output_expr_ctxs, + _writer.reset(new (std::nothrow) FileResultWriter(_file_opts.get(), &_output_expr_ctxs, _profile, _sender.get(), state->return_object_data_as_binary())); break; diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 9a5819143640e2..7f868fd82ca62b 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -163,6 +163,8 @@ set(VEC_FILES olap/block_reader.cpp sink/mysql_result_writer.cpp sink/result_sink.cpp + sink/vresult_file_sink.cpp + sink/vfile_result_writer.cpp sink/vdata_stream_sender.cpp sink/vtablet_sink.cpp sink/vmysql_table_writer.cpp diff --git a/be/src/vec/sink/result_sink.cpp b/be/src/vec/sink/result_sink.cpp index fda7702ab31335..693c84c921f6d0 100644 --- a/be/src/vec/sink/result_sink.cpp +++ b/be/src/vec/sink/result_sink.cpp @@ -24,6 +24,7 @@ #include "runtime/runtime_state.h" #include "vec/exprs/vexpr.h" #include "vec/sink/mysql_result_writer.h" +#include "vec/sink/vfile_result_writer.h" namespace doris { namespace vectorized { @@ -77,11 +78,10 @@ Status VResultSink::prepare(RuntimeState* state) { break; case TResultSinkType::FILE: CHECK(_file_opts.get() != nullptr); - return Status::InternalError("Unsupport vfile result sink type"); - // TODO: - /* _writer.reset(new (std::nothrow) FileResultWriter(_file_opts.get(), _output_expr_ctxs,*/ - /*_profile, _sender.get()));*/ -// break; + _writer.reset(new (std::nothrow) VFileResultWriter(_file_opts.get(), &_output_vexpr_ctxs, + _profile, _sender.get(), + state->return_object_data_as_binary())); + break; default: return Status::InternalError("Unknown result sink type"); } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index cc8555ca67c1ae..3926bfe1135ff8 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -242,6 +242,15 @@ void VDataStreamSender::Channel::ch_roll_pb_block() { _ch_cur_pb_block = (_ch_cur_pb_block == &_ch_pb_block1 ? &_ch_pb_block2 : &_ch_pb_block1); } +VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc) + : _sender_id(sender_id), + _pool(pool), + _row_desc(row_desc), + _cur_pb_block(&_pb_block1), + _serialize_batch_timer(nullptr), + _bytes_sent_counter(nullptr), + _local_bytes_send_counter(nullptr) {} + VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, const TDataStreamSink& sink, const std::vector& destinations, diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index d62ebeb68489ed..8c41ce854b8807 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -45,14 +45,16 @@ namespace vectorized { class VExprContext; class VPartitionInfo; -class VDataStreamSender final : public DataSink { +class VDataStreamSender : public DataSink { public: + VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc); + VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, const TDataStreamSink& sink, const std::vector& destinations, int per_channel_buffer_size, bool send_query_statistics_with_every_batch); - ~VDataStreamSender(); + virtual ~VDataStreamSender(); virtual Status init(const TDataSink& thrift_sink) override; @@ -72,7 +74,7 @@ class VDataStreamSender final : public DataSink { private: void _roll_pb_block(); -private: +protected: class Channel; Status get_partition_column_result(Block* block, int* result) const { diff --git a/be/src/vec/sink/vfile_result_writer.cpp b/be/src/vec/sink/vfile_result_writer.cpp new file mode 100644 index 00000000000000..369d1d75453002 --- /dev/null +++ b/be/src/vec/sink/vfile_result_writer.cpp @@ -0,0 +1,383 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/vfile_result_writer.h" + +#include "exec/broker_writer.h" +#include "exec/hdfs_reader_writer.h" +#include "exec/local_file_writer.h" +#include "exec/parquet_writer.h" +#include "exec/s3_writer.h" +#include "gutil/strings/substitute.h" +#include "runtime/buffer_control_block.h" +#include "service/backend_options.h" +#include "util/file_utils.h" +#include "util/url_coding.h" +#include "vec/core/materialize_block.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris { +namespace vectorized { + +VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts, + const std::vector* output_vexpr_ctxs, + RuntimeProfile* parent_profile, BufferControlBlock* sinker, + bool output_object_data) + : FileResultWriter(file_opts, nullptr, parent_profile, sinker, output_object_data) { + _output_vexpr_ctxs = output_vexpr_ctxs; +} + +VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts, + const TStorageBackendType::type storage_type, + const TUniqueId fragment_instance_id, + const std::vector* output_vexpr_ctxs, + RuntimeProfile* parent_profile, BufferControlBlock* sinker, + MutableBlock* output_batch, bool output_object_data) + : FileResultWriter(file_opts, storage_type, fragment_instance_id, nullptr, parent_profile, + sinker, nullptr, output_object_data) { + _output_vexpr_ctxs = output_vexpr_ctxs; + _mutable_block = output_batch; +} + +VFileResultWriter::~VFileResultWriter() { + _close_file_writer(true); +} + +Status VFileResultWriter::init(RuntimeState* state) { + _state = state; + _init_profile(); + return _create_next_file_writer(); +} + +Status VFileResultWriter::_create_file_writer(const std::string& file_name) { + if (_storage_type == TStorageBackendType::LOCAL) { + _file_writer = new doris::LocalFileWriter(file_name, 0 /* start offset */); + } else if (_storage_type == TStorageBackendType::BROKER) { + _file_writer = new doris::BrokerWriter(_state->exec_env(), _file_opts->broker_addresses, + _file_opts->broker_properties, file_name, + 0 /*start offset*/); + } else if (_storage_type == TStorageBackendType::S3) { + _file_writer = + new doris::S3Writer(_file_opts->broker_properties, file_name, 0 /* offset */); + } else if (_storage_type == TStorageBackendType::HDFS) { + RETURN_IF_ERROR(HdfsReaderWriter::create_writer( + const_cast&>(_file_opts->broker_properties), + file_name, &_file_writer)); + } + + RETURN_IF_ERROR(_file_writer->open()); + switch (_file_opts->file_format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + // just use file writer is enough + break; + + case TFileFormatType::FORMAT_PARQUET: + _parquet_writer = new doris::ParquetWriterWrapper( + _file_writer, _output_vexpr_ctxs, _file_opts->file_properties, _file_opts->schema, + FileResultWriter::_output_object_data); + break; + + default: + return Status::InternalError( + strings::Substitute("unsupported file format: $0", _file_opts->file_format)); + } + LOG(INFO) << "create file for exporting query result. file name: " << file_name + << ". query id: " << print_id(_state->query_id()) + << " format:" << _file_opts->file_format; + return Status::OK(); +} + +Status VFileResultWriter::append_row_batch(const RowBatch* batch) { + return Status::NotSupported( + "Not Implemented VFileResultWriter::append_row_batch(RowBatch* batch)"); +} + +Status VFileResultWriter::append_block(Block& block) { + if (block.rows() == 0) { + return Status::OK(); + } + + SCOPED_TIMER(_append_row_batch_timer); + if (_parquet_writer != nullptr) { + RETURN_IF_ERROR(_write_parquet_file(block)); + } else { + RETURN_IF_ERROR(_write_csv_file(block)); + } + + FileResultWriter::_written_rows += block.rows(); + return Status::OK(); +} + +Status VFileResultWriter::_write_parquet_file(Block& block) { + RETURN_IF_ERROR(_parquet_writer->write(block)); + // split file if exceed limit + return _create_new_file_if_exceed_size(); +} + +Status VFileResultWriter::_write_csv_file(Block& block) { + int num_columns = _output_vexpr_ctxs->size(); + _column_ids.resize(num_columns); + for (int i = 0; i < num_columns; ++i) { + int column_id = -1; + (*_output_vexpr_ctxs)[i]->execute(&block, &column_id); + _column_ids[i] = column_id; + } + + int num_rows = block.rows(); + materialize_block_inplace(block, _column_ids.begin(), _column_ids.end()); + for (int i = 0; i < num_rows; ++i) { + RETURN_IF_ERROR(_write_one_row_as_csv(block, i)); + } + return _flush_plain_text_outstream(true); +} + +Status VFileResultWriter::_write_one_row_as_csv(Block& block, size_t row) { + SCOPED_TIMER(_convert_tuple_timer); + int num_columns = _output_vexpr_ctxs->size(); + + for (int i = 0; i < num_columns; ++i) { + auto& column_ptr = block.get_by_position(_column_ids[i]).column; + auto& type_ptr = block.get_by_position(_column_ids[i]).type; + vectorized::ColumnPtr column; + if (type_ptr->is_nullable()) { + column = assert_cast(*column_ptr) + .get_nested_column_ptr(); + if (column_ptr->is_null_at(row)) { + fmt::format_to(_insert_stmt_buffer, "{}", NULL_IN_CSV); + + if (i < num_columns - 1) { + fmt::format_to(_insert_stmt_buffer, "{}", _file_opts->column_separator); + } + continue; + } + } else { + column = column_ptr; + } + switch ((*_output_vexpr_ctxs)[i]->root()->result_type()) { + case TYPE_BOOLEAN: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); + break; + } + case TYPE_TINYINT: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); + break; + } + case TYPE_SMALLINT: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); + break; + } + case TYPE_INT: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); + break; + } + case TYPE_BIGINT: { + auto& data = assert_cast(*column).get_data(); + fmt::format_to(_insert_stmt_buffer, "{}", data[row]); + break; + } + case TYPE_FLOAT: { + auto& data = assert_cast(*column).get_data(); + // To prevent loss of precision on float and double types, + char buffer[MAX_FLOAT_STR_LENGTH + 2]; + float float_value = static_cast(data[row]); + buffer[0] = '\0'; + int length = FloatToBuffer(float_value, MAX_FLOAT_STR_LENGTH, buffer); + DCHECK(length >= 0) << "gcvt float failed, float value=" << float_value; + fmt::format_to(_insert_stmt_buffer, "{}", buffer); + break; + } + case TYPE_DOUBLE: { + auto& data = assert_cast(*column).get_data(); + char buffer[MAX_DOUBLE_STR_LENGTH + 2]; + double double_value = static_cast(data[row]); + buffer[0] = '\0'; + int length = DoubleToBuffer(double_value, MAX_DOUBLE_STR_LENGTH, buffer); + DCHECK(length >= 0) << "gcvt double failed, double value=" << double_value; + fmt::format_to(_insert_stmt_buffer, "{}", buffer); + break; + } + + case TYPE_STRING: + case TYPE_CHAR: + case TYPE_VARCHAR: { + const auto& string_val = + assert_cast(*column).get_data_at(row); + DCHECK(string_val.data != nullptr); + fmt::format_to(_insert_stmt_buffer, "{}", + std::string(string_val.data, string_val.size)); + break; + } + case TYPE_DECIMALV2: { + DecimalV2Value value = + (DecimalV2Value) + assert_cast&>( + *column) + .get_data()[row]; + fmt::format_to(_insert_stmt_buffer, "{}", value.to_string()); + break; + } + case TYPE_DATE: + case TYPE_DATETIME: { + int64_t int_val = assert_cast(*column).get_data()[row]; + vectorized::VecDateTimeValue value = + binary_cast(int_val); + + char buf[64]; + char* pos = value.to_string(buf); + std::string str(buf, pos - buf - 1); + fmt::format_to(_insert_stmt_buffer, "{}", str); + break; + } + case TYPE_OBJECT: { + if (FileResultWriter::_output_object_data) { + auto date_column = assert_cast(*column); + BitmapValue& data = date_column.get_element(row); + std::string bitmap_str(data.getSizeInBytes(), '0'); + data.write(bitmap_str.data()); + + std::string base64_str; + base64_encode(bitmap_str, &base64_str); + fmt::format_to(_insert_stmt_buffer, "{}", base64_str); + } else { + fmt::format_to(_insert_stmt_buffer, "{}", NULL_IN_CSV); + } + break; + } + case TYPE_HLL: { + if (FileResultWriter::_output_object_data) { + const auto& data = + assert_cast(*column).get_data()[row]; + std::string hll_str(data.max_serialized_size(), '0'); + size_t actual_size = data.serialize((uint8_t*)hll_str.data()); + hll_str.resize(actual_size); + + std::string base64_str; + base64_encode(hll_str, &base64_str); + fmt::format_to(_insert_stmt_buffer, "{}", base64_str); + } else { + fmt::format_to(_insert_stmt_buffer, "{}", NULL_IN_CSV); + } + break; + } + default: { + fmt::format_to(_insert_stmt_buffer, "{}", NULL_IN_CSV); + } + } + if (i < num_columns - 1) { + fmt::format_to(_insert_stmt_buffer, "{}", _file_opts->column_separator); + } + } + + fmt::format_to(_insert_stmt_buffer, "{}", _file_opts->line_delimiter); + // write one line to file + return _flush_plain_text_outstream(false); +} + +Status VFileResultWriter::_flush_plain_text_outstream(bool eos) { + SCOPED_TIMER(_file_write_timer); + + size_t pos = _insert_stmt_buffer.size(); + if (pos == 0 || (pos < OUTSTREAM_BUFFER_SIZE_BYTES && !eos)) { + return Status::OK(); + } + + const std::string& buf = to_string(_insert_stmt_buffer); + size_t written_len = 0; + RETURN_IF_ERROR(_file_writer->write(reinterpret_cast(buf.c_str()), buf.size(), + &written_len)); + COUNTER_UPDATE(_written_data_bytes, written_len); + _current_written_bytes += written_len; + + _insert_stmt_buffer.clear(); + // split file if exceed limit + return _create_new_file_if_exceed_size(); +} + +Status VFileResultWriter::_close_file_writer(bool done, bool only_close) { + if (_parquet_writer != nullptr) { + _parquet_writer->close(); + _current_written_bytes = _parquet_writer->written_len(); + COUNTER_UPDATE(_written_data_bytes, _current_written_bytes); + delete _parquet_writer; + _parquet_writer = nullptr; + delete _file_writer; + _file_writer = nullptr; + } else if (_file_writer != nullptr) { + _file_writer->close(); + delete _file_writer; + _file_writer = nullptr; + } + + if (only_close) { + return Status::OK(); + } + + if (!done) { + // not finished, create new file writer for next file + RETURN_IF_ERROR(_create_next_file_writer()); + } else { + // All data is written to file, send statistic result + if (_file_opts->success_file_name != "") { + // write success file, just need to touch an empty file + RETURN_IF_ERROR(_create_success_file()); + } + if (_mutable_block == nullptr) { + RETURN_IF_ERROR(_send_result()); + } else { + RETURN_IF_ERROR(_fill_result_batch()); + } + } + return Status::OK(); +} + +Status VFileResultWriter::_fill_result_batch() { + if (_is_result_sent) { + return Status::OK(); + } + _is_result_sent = true; + int64_t result_data[3] = {_file_idx, _written_rows_counter->value(), + _written_data_bytes->value()}; // file_num, total rows,file size + std::string file_url; + _get_file_url(&file_url); // url address + + auto& res_columns = _mutable_block->mutable_columns(); + for (int i = 0; i < 3; ++i) { + res_columns[i]->insert_data((const char*)(&result_data[i]), 0); + } + res_columns[3]->insert_data(file_url.c_str(), file_url.length()); + + return Status::OK(); +} + +Status VFileResultWriter::close() { + // the following 2 profile "_written_rows_counter" and "_writer_close_timer" + // must be outside the `_close_file_writer()`. + // because `_close_file_writer()` may be called in deconstructor, + // at that time, the RuntimeState may already been deconstructed, + // so does the profile in RuntimeState. + COUNTER_SET(_written_rows_counter, FileResultWriter::_written_rows); + SCOPED_TIMER(_writer_close_timer); + return _close_file_writer(true, false); +} + +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/sink/vfile_result_writer.h b/be/src/vec/sink/vfile_result_writer.h new file mode 100644 index 00000000000000..af25c339de4d6a --- /dev/null +++ b/be/src/vec/sink/vfile_result_writer.h @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include "runtime/file_result_writer.h" +#include "runtime/primitive_type.h" +#include "util/mysql_row_buffer.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" +#include "vec/sink/result_writer.h" +namespace doris { +class BufferControlBlock; +class RowBatch; +class TFetchDataResult; +class FileResultWriter; +namespace vectorized { + +class VExprContext; + +class VFileResultWriter final : public FileResultWriter, public VResultWriter { +public: + VFileResultWriter(const ResultFileOptions* file_option, + const std::vector* output_vexpr_ctxs, + RuntimeProfile* parent_profile, BufferControlBlock* sinker, + bool output_object_data); + + VFileResultWriter(const ResultFileOptions* file_option, + const TStorageBackendType::type storage_type, + const TUniqueId fragment_instance_id, + const std::vector* output_vexpr_ctxs, + RuntimeProfile* parent_profile, BufferControlBlock* sinker, + MutableBlock* output_batch, bool output_object_data); + + virtual ~VFileResultWriter(); + + virtual Status init(RuntimeState* state) override; + + virtual Status append_row_batch(const RowBatch* batch) override; + + virtual Status append_block(Block& block) override; + + virtual Status close() override; + + // if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer + // if eos, write the data even if buffer is not full. + Status _flush_plain_text_outstream(bool eos) override; + Status _create_file_writer(const std::string& file_name) override; + // save result into batch rather than send it + Status _fill_result_batch() override; + + // close file writer, and if !done, it will create new writer for next file. + // if only_close is true, this method will just close the file writer and return. + Status _close_file_writer(bool done, bool only_close = false) override; + + Status _write_csv_file(Block& block); + Status _write_parquet_file(Block& block); + Status _write_one_row_as_csv(Block& block, size_t row); +private: + const std::vector* _output_vexpr_ctxs; + fmt::memory_buffer _insert_stmt_buffer; + MutableBlock* _mutable_block; + Block* _output_block = nullptr; + MutableColumns _result_columns; + std::vector _column_ids; +}; +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp new file mode 100644 index 00000000000000..b3c32db3a739e4 --- /dev/null +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/vresult_file_sink.h" + +#include "common/config.h" +#include "exprs/expr.h" +#include "runtime/buffer_control_block.h" +#include "runtime/exec_env.h" +#include "runtime/file_result_writer.h" +#include "runtime/mem_tracker.h" +#include "runtime/mysql_result_writer.h" +#include "runtime/result_buffer_mgr.h" +#include "runtime/row_batch.h" +#include "runtime/runtime_state.h" +#include "util/uid_util.h" +#include "vec/sink/vfile_result_writer.h" +#include "vec/utils/util.hpp" + +namespace doris::vectorized { + +VResultFileSink::VResultFileSink(const RowDescriptor& row_desc, + const std::vector& t_output_expr, + const TResultFileSink& sink) + : VDataStreamSender(nullptr, 0, row_desc), _t_output_expr(t_output_expr) { + CHECK(sink.__isset.file_options); + _file_opts.reset(new ResultFileOptions(sink.file_options)); + CHECK(sink.__isset.storage_backend_type); + _storage_type = sink.storage_backend_type; + _is_top_sink = true; + + _name = "VResultFileSink"; +} + +VResultFileSink::VResultFileSink(const RowDescriptor& row_desc, + const std::vector& t_output_expr, + const TResultFileSink& sink, + const std::vector& destinations, + ObjectPool* pool, int sender_id, DescriptorTbl& descs) + : VDataStreamSender(pool, sender_id, row_desc), + _t_output_expr(t_output_expr), + _output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false) { + CHECK(sink.__isset.file_options); + _file_opts.reset(new ResultFileOptions(sink.file_options)); + CHECK(sink.__isset.storage_backend_type); + _storage_type = sink.storage_backend_type; + _is_top_sink = false; + DCHECK_EQ(destinations.size(), 1); + _channel_shared_ptrs.emplace_back(new Channel( + this, _output_row_descriptor, destinations[0].brpc_server, + destinations[0].fragment_instance_id, sink.dest_node_id, _buf_size, true, true)); + _channels.push_back(_channel_shared_ptrs.back().get()); + + _name = "VResultFileSink"; +} + +Status VResultFileSink::prepare_exprs(RuntimeState* state) { + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR( + VExpr::create_expr_trees(state->obj_pool(), _t_output_expr, &_output_vexpr_ctxs)); + // Prepare the exprs to run. + RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc, _expr_mem_tracker)); + return Status::OK(); +} + +Status VResultFileSink::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSink::prepare(state)); + std::string title = fmt::format("DataBufferSender: ( dst_fragment_instance_id= {}).", + print_id(state->fragment_instance_id())); + // create profile + _profile = state->obj_pool()->add(new RuntimeProfile(title)); + + // prepare output_expr + RETURN_IF_ERROR(prepare_exprs(state)); + + CHECK(_file_opts.get() != nullptr); + if (_is_top_sink) { + // create sender + RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( + state->fragment_instance_id(), _buf_size, &_sender)); + // create writer + _writer.reset(new (std::nothrow) VFileResultWriter( + _file_opts.get(), _storage_type, state->fragment_instance_id(), &_output_vexpr_ctxs, + _profile, _sender.get(), nullptr, state->return_object_data_as_binary())); + + } else { + // init channel + _profile = _pool->add(new RuntimeProfile(title)); + _state = state; + _serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime"); + _bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES); + _local_bytes_send_counter = ADD_COUNTER(profile(), "LocalBytesSent", TUnit::BYTES); + _uncompressed_bytes_counter = + ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES); + _mem_tracker = MemTracker::create_tracker( + -1, "VResultFileSink:" + print_id(state->fragment_instance_id()), + state->instance_mem_tracker(), MemTrackerLevel::VERBOSE, _profile); + + _mutable_block.reset(new MutableBlock(_output_row_descriptor.tuple_descriptors())); + _writer.reset(new (std::nothrow) VFileResultWriter( + _file_opts.get(), _storage_type, state->fragment_instance_id(), &_output_vexpr_ctxs, + _profile, nullptr, _mutable_block.get(), state->return_object_data_as_binary())); + } + RETURN_IF_ERROR(_writer->init(state)); + for (int i = 0; i < _channels.size(); ++i) { + RETURN_IF_ERROR(_channels[i]->init(state)); + } + return Status::OK(); +} + +Status VResultFileSink::open(RuntimeState* state) { + return VExpr::open(_output_vexpr_ctxs, state); +} + +Status VResultFileSink::send(RuntimeState* state, RowBatch* batch) { + return Status::NotSupported( + "Not Implemented VResultFileSink::send(RuntimeState* state, RowBatch* batch)"); +} + +Status VResultFileSink::send(RuntimeState* state, Block* block) { + RETURN_IF_ERROR(_writer->append_block(*block)); + return Status::OK(); +} + +Status VResultFileSink::close(RuntimeState* state, Status exec_status) { + if (_closed) { + return Status::OK(); + } + + Status final_status = exec_status; + // close the writer + if (_writer) { + Status st = _writer->close(); + if (!st.ok() && exec_status.ok()) { + // close file writer failed, should return this error to client + final_status = st; + } + } + if (_is_top_sink) { + // close sender, this is normal path end + if (_sender) { + _sender->update_num_written_rows(_writer->get_written_rows()); + _sender->close(final_status); + } + state->exec_env()->result_mgr()->cancel_at_time( + time(nullptr) + config::result_buffer_cancelled_interval_time, + state->fragment_instance_id()); + } else { + if (final_status.ok()) { + auto block = _mutable_block->to_block(); + RETURN_IF_ERROR( + serialize_block(&block, _cur_pb_block, _channels.size())); + for (auto channel : _channels) { + RETURN_IF_ERROR(channel->send_block(_cur_pb_block)); + } + } + Status final_st = Status::OK(); + for (int i = 0; i < _channels.size(); ++i) { + Status st = _channels[i]->close(state); + if (!st.ok() && final_st.ok()) { + final_st = st; + } + } + // wait all channels to finish + for (int i = 0; i < _channels.size(); ++i) { + Status st = _channels[i]->close_wait(state); + if (!st.ok() && final_st.ok()) { + final_st = st; + } + } + _mutable_block->clear(); + } + + VExpr::close(_output_vexpr_ctxs, state); + + _closed = true; + return Status::OK(); +} + +void VResultFileSink::set_query_statistics(std::shared_ptr statistics) { + if (_is_top_sink) { + _sender->set_query_statistics(statistics); + } else { + _query_statistics = statistics; + } +} + +} // namespace doris::vectorized diff --git a/be/src/vec/sink/vresult_file_sink.h b/be/src/vec/sink/vresult_file_sink.h new file mode 100644 index 00000000000000..0c52673e58851f --- /dev/null +++ b/be/src/vec/sink/vresult_file_sink.h @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "gen_cpp/PaloInternalService_types.h" +#include "gen_cpp/PlanNodes_types.h" +#include "gen_cpp/Types_types.h" +#include "runtime/data_stream_sender.h" +#include "runtime/descriptors.h" +#include "vec/core/block.h" +#include "vec/sink/result_writer.h" +#include "vec/sink/vdata_stream_sender.h" + +namespace doris { + +class RowBatch; +class ObjectPool; +class RuntimeState; +class RuntimeProfile; +class BufferControlBlock; +class MemTracker; +class ResultFileOptions; + +namespace vectorized { +class VExprContext; +class Block; +class VResultFileSink : public VDataStreamSender { + +public: + VResultFileSink(const RowDescriptor& row_desc, const std::vector& select_exprs, + const TResultFileSink& sink); + + VResultFileSink(const RowDescriptor& row_desc, const std::vector& select_exprs, + const TResultFileSink& sink, + const std::vector& destinations, ObjectPool* pool, + int sender_id, DescriptorTbl& descs); + ~VResultFileSink() = default; + Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; + // send data in 'batch' to this backend stream mgr + // Blocks until all rows in batch are placed in the buffer + Status send(RuntimeState* state, RowBatch* batch) override; + Status send(RuntimeState* state, Block* block) override; + // Flush all buffered data and close all existing channels to destination + // hosts. Further send() calls are illegal after calling close(). + Status close(RuntimeState* state, Status exec_status) override; + + RuntimeProfile* profile() override { return _profile; } + + void set_query_statistics(std::shared_ptr statistics) override; + +private: + Status prepare_exprs(RuntimeState* state); + // set file options when sink type is FILE + std::unique_ptr _file_opts; + TStorageBackendType::type _storage_type; + + // Owned by the RuntimeState. + const std::vector& _t_output_expr; + std::vector _output_vexpr_ctxs; + RowDescriptor _output_row_descriptor; + + std::shared_ptr _sender; + std::shared_ptr _writer; + std::shared_ptr _mutable_block; + Block* _output_block; + + int _buf_size = 1024; // Allocated from _pool + bool _is_top_sink = true; +}; + +} // namespace vectorized + +} // namespace doris \ No newline at end of file