Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,13 +462,24 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
size_t rows = 0;

bool success = false;
bool is_remove_bom = false;
if (_push_down_agg_type == TPushAggOp::type::COUNT) {
while (rows < batch_size && !_line_reader_eof) {
const uint8_t* ptr = nullptr;
size_t size = 0;
RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));

// _skip_lines == 0 means this line is the actual data beginning line for the entire file
// is_remove_bom means _remove_bom should only execute once
if (_skip_lines == 0 && !is_remove_bom) {
ptr = _remove_bom(ptr, size);
is_remove_bom = true;
}

// _skip_lines > 0 means we do not need to remove bom
if (_skip_lines > 0) {
_skip_lines--;
is_remove_bom = true;
continue;
}
if (size == 0) {
Expand All @@ -490,8 +501,18 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
const uint8_t* ptr = nullptr;
size_t size = 0;
RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));

// _skip_lines == 0 means this line is the actual data beginning line for the entire file
// is_remove_bom means _remove_bom should only execute once
if (!is_remove_bom && _skip_lines == 0) {
ptr = _remove_bom(ptr, size);
is_remove_bom = true;
}

// _skip_lines > 0 means we do not remove bom
if (_skip_lines > 0) {
_skip_lines--;
is_remove_bom = true;
continue;
}
if (size == 0) {
Expand Down Expand Up @@ -538,10 +559,11 @@ Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names,
} else { // parse csv file with names
RETURN_IF_ERROR(_parse_col_names(col_names));
}

for (size_t j = 0; j < col_names->size(); ++j) {
col_types->emplace_back(TypeDescriptor::create_string_type());
}
} else { // parse csv file without names and types
} else { // parse csv file with names and types
RETURN_IF_ERROR(_parse_col_names(col_names));
RETURN_IF_ERROR(_parse_col_types(col_names->size(), col_types));
}
Expand Down Expand Up @@ -929,6 +951,7 @@ Status CsvReader::_parse_col_nums(size_t* col_nums) {
if (!validate_utf8(const_cast<char*>(reinterpret_cast<const char*>(ptr)), size)) {
return Status::InternalError<false>("Only support csv data in utf8 codec");
}
ptr = _remove_bom(ptr, size);
_split_line(Slice(ptr, size));
*col_nums = _split_values.size();
return Status::OK();
Expand All @@ -945,6 +968,7 @@ Status CsvReader::_parse_col_names(std::vector<std::string>* col_names) {
if (!validate_utf8(const_cast<char*>(reinterpret_cast<const char*>(ptr)), size)) {
return Status::InternalError<false>("Only support csv data in utf8 codec");
}
ptr = _remove_bom(ptr, size);
_split_line(Slice(ptr, size));
for (size_t idx = 0; idx < _split_values.size(); ++idx) {
col_names->emplace_back(_split_values[idx].to_string());
Expand All @@ -968,6 +992,15 @@ Status CsvReader::_parse_col_types(size_t col_nums, std::vector<TypeDescriptor>*
return Status::OK();
}

const uint8_t* CsvReader::_remove_bom(const uint8_t* ptr, size_t& size) {
if (size >= 3 && ptr[0] == 0xEF && ptr[1] == 0xBB && ptr[2] == 0xBF) {
LOG(INFO) << "remove bom";
size -= 3;
return ptr + 3;
}
return ptr;
}

Status CsvReader::close() {
if (_line_reader) {
_line_reader->close();
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/exec/format/csv/csv_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ class CsvReader : public GenericReader {
// and the line is skipped as unqualified row, and the process should continue.
Status _validate_line(const Slice& line, bool* success);

// If the CSV file is an UTF8 encoding with BOM,
// then remove the first 3 bytes at the beginning of this file
// and set size = size - 3.
const uint8_t* _remove_bom(const uint8_t* ptr, size_t& size);

RuntimeState* _state = nullptr;
RuntimeProfile* _profile = nullptr;
ScannerCounter* _counter = nullptr;
Expand Down
11 changes: 9 additions & 2 deletions be/src/vec/runtime/vcsv_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,18 @@

namespace doris::vectorized {

static const unsigned char bom[] = {0xEF, 0xBB, 0xBF};

VCSVTransformer::VCSVTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
bool output_object_data, std::string_view header_type,
std::string_view header, std::string_view column_separator,
std::string_view line_delimiter)
std::string_view line_delimiter, bool with_bom)
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
_column_separator(column_separator),
_line_delimiter(line_delimiter),
_file_writer(file_writer) {
_file_writer(file_writer),
_with_bom(with_bom) {
if (header.size() > 0) {
_csv_header = header;
if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
Expand All @@ -74,6 +77,10 @@ VCSVTransformer::VCSVTransformer(RuntimeState* state, doris::io::FileWriter* fil
}

Status VCSVTransformer::open() {
if (_with_bom) {
RETURN_IF_ERROR(
_file_writer->append(Slice(reinterpret_cast<const char*>(bom), sizeof(bom))));
}
if (!_csv_header.empty()) {
return _file_writer->append(Slice(_csv_header.data(), _csv_header.size()));
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/runtime/vcsv_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class VCSVTransformer final : public VFileFormatTransformer {
VCSVTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
const VExprContextSPtrs& output_vexpr_ctxs, bool output_object_data,
std::string_view header_type, std::string_view header,
std::string_view column_separator, std::string_view line_delimiter);
std::string_view column_separator, std::string_view line_delimiter,
bool with_bom);

~VCSVTransformer() = default;

Expand Down Expand Up @@ -71,6 +72,8 @@ class VCSVTransformer final : public VFileFormatTransformer {
// For example: bitmap_to_string() may return large volume of data.
// And the speed is relative low, in my test, is about 6.5MB/s.
fmt::memory_buffer _outstream_buffer;

bool _with_bom = false;
};

} // namespace doris::vectorized
3 changes: 3 additions & 0 deletions be/src/vec/sink/vresult_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ struct ResultFileOptions {

bool delete_existing_files = false;
std::string file_suffix;
//Bring BOM when exporting to CSV format
bool with_bom = false;

ResultFileOptions(const TResultFileSinkOptions& t_opt) {
file_path = t_opt.file_path;
Expand All @@ -81,6 +83,7 @@ struct ResultFileOptions {
delete_existing_files =
t_opt.__isset.delete_existing_files ? t_opt.delete_existing_files : false;
file_suffix = t_opt.file_suffix;
with_bom = t_opt.with_bom;

is_local_file = true;
if (t_opt.__isset.broker_addresses) {
Expand Down
7 changes: 4 additions & 3 deletions be/src/vec/sink/writer/vfile_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,10 @@ Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
_file_writer_impl));
switch (_file_opts->file_format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
_vfile_writer.reset(new VCSVTransformer(
_state, _file_writer_impl.get(), _vec_output_expr_ctxs, _output_object_data,
_header_type, _header, _file_opts->column_separator, _file_opts->line_delimiter));
_vfile_writer.reset(new VCSVTransformer(_state, _file_writer_impl.get(),
_vec_output_expr_ctxs, _output_object_data,
_header_type, _header, _file_opts->column_separator,
_file_opts->line_delimiter, _file_opts->with_bom));
break;
case TFileFormatType::FORMAT_PARQUET:
_vfile_writer.reset(new VParquetTransformer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,25 @@ The bottom layer of the `Export` statement actually executes the `select...outfi
The following parameters can be specified:

- `label`: This parameter is optional, specifies the label of the export task. If this parameter is not specified, the system randomly assigns a label to the export task.

- `column_separator`: Specifies the exported column separator, default is `\t`, mulit-bytes is supported. This parameter is only used for `CSV` file format.

- `line_delimiter`: Specifies the line delimiter for export, the default is `\n`, mulit-bytes is supported. This parameter is only used for `CSV` file format.

- `timeout`: The timeout period of the export job, the default is 2 hours, the unit is seconds.

- `columns`: Specifies certain columns of the export job table

- `format`: Specifies the file format, support: parquet, orc, csv, csv_with_names, csv_with_names_and_types.The default is csv format.

- `parallelism`: The concurrency degree of the `export` job, the default is `1`. The export job will be divided into `select..outfile..` statements of the number of `parallelism` to execute concurrently. (If the value of `parallelism` is greater than the number of tablets in the table, the system will automatically set `parallelism` to the number of tablets, that is, each `select..outfile..` statement is responsible for one tablet)

- `delete_existing_files`: default `false`. If it is specified as true, you will first delete all files specified in the directory specified by the file_path, and then export the data to the directory.For example: "file_path" = "/user/tmp", then delete all files and directory under "/user/"; "file_path" = "/user/tmp/", then delete all files and directory under "/user/tmp/"

- `max_file_size`: it is the limit for the size of a single file in the export job. If the result file exceeds this value, it will be split into multiple files. The valid range for `max_file_size` is [5MB, 2GB], with a default value of 1GB. (When exporting to the ORC file format, the actual size of the split files will be multiples of 64MB, for example, if max_file_size is specified as 5MB, the actual split size will be 64MB; if max_file_size is specified as 65MB, the actual split size will be 128MB.)

- `with_bom`: The default is false. If it is set to true, the exported file is encoded in UTF8 with BOM (valid only for CSV-related file format).

- `timeout`: This is the timeout parameter of the export job, the default timeout is 2 hours, and the unit is seconds.

> Note that to use the `delete_existing_files` parameter, you also need to add the configuration `enable_delete_existing_files = true` to the fe.conf file and restart the FE. Only then will the `delete_existing_files` parameter take effect. Setting `delete_existing_files = true` is a dangerous operation and it is recommended to only use it in a testing environment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ EXPORT

- `delete_existing_files`: 默认为false,若指定为true,则会先删除`export_path`所指定目录下的所有文件,然后导出数据到该目录下。例如:"export_path" = "/user/tmp", 则会删除"/user/"下所有文件及目录;"file_path" = "/user/tmp/", 则会删除"/user/tmp/"下所有文件及目录。

- `with_bom`: 默认为false,若指定为true,则导出的文件编码为带有BOM的UTF8编码(只对csv相关的文件格式生效)。

- `timeout`:导出作业的超时时间,默认为2小时,单位是秒。

> 注意:要使用delete_existing_files参数,还需要在fe.conf中添加配置`enable_delete_existing_files = true`并重启fe,此时delete_existing_files才会生效。delete_existing_files = true 是一个危险的操作,建议只在测试环境中使用。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class ExportStmt extends StatementBase {

private String maxFileSize;
private String deleteExistingFiles;
private String withBom;
private SessionVariable sessionVariables;

private String qualifiedUser;
Expand Down Expand Up @@ -228,6 +229,7 @@ private void setJob() throws UserException {
exportJob.setParallelism(this.parallelism);
exportJob.setMaxFileSize(this.maxFileSize);
exportJob.setDeleteExistingFiles(this.deleteExistingFiles);
exportJob.setWithBom(this.withBom);

if (columns != null) {
Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
Expand Down Expand Up @@ -354,6 +356,9 @@ private void checkProperties(Map<String, String> properties) throws UserExceptio
// generate a random label
this.label = "export_" + UUID.randomUUID();
}

// with bom
this.withBom = properties.getOrDefault(OutFileClause.PROP_WITH_BOM, "false");
}

private void checkColumns() throws DdlException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public class OutFileClause {
private static final String PROP_SUCCESS_FILE_NAME = "success_file_name";
public static final String PROP_DELETE_EXISTING_FILES = "delete_existing_files";
public static final String PROP_FILE_SUFFIX = "file_suffix";
public static final String PROP_WITH_BOM = "with_bom";

private static final String PARQUET_PROP_PREFIX = "parquet.";
private static final String SCHEMA = "schema";
Expand All @@ -155,6 +156,7 @@ public class OutFileClause {
private long maxFileSizeBytes = DEFAULT_MAX_FILE_SIZE_BYTES;
private boolean deleteExistingFiles = false;
private String fileSuffix = "";
private boolean withBom = false;
private BrokerDesc brokerDesc = null;
// True if result is written to local disk.
// If set to true, the brokerDesc must be null.
Expand Down Expand Up @@ -566,6 +568,11 @@ private void analyzeProperties() throws UserException {
processedPropKeys.add(PROP_FILE_SUFFIX);
}

if (properties.containsKey(PROP_WITH_BOM)) {
withBom = Boolean.valueOf(properties.get(PROP_WITH_BOM)).booleanValue();
processedPropKeys.add(PROP_WITH_BOM);
}

if (properties.containsKey(PROP_SUCCESS_FILE_NAME)) {
successFileName = properties.get(PROP_SUCCESS_FILE_NAME);
FeNameFormat.checkOutfileSuccessFileName("file name", successFileName);
Expand Down Expand Up @@ -805,6 +812,7 @@ public TResultFileSinkOptions toSinkOptions() {
sinkOptions.setMaxFileSizeBytes(maxFileSizeBytes);
sinkOptions.setDeleteExistingFiles(deleteExistingFiles);
sinkOptions.setFileSuffix(fileSuffix);
sinkOptions.setWithBom(withBom);

if (brokerDesc != null) {
sinkOptions.setBrokerProperties(brokerDesc.getProperties());
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ public class ExportJob implements Writable {

@SerializedName("tabletsNum")
private Integer tabletsNum;
@SerializedName("withBom")
private String withBom;

private TableRef tableRef;

Expand Down Expand Up @@ -219,6 +221,7 @@ public ExportJob() {
this.columnSeparator = "\t";
this.lineDelimiter = "\n";
this.columns = "";
this.withBom = "false";
}

public ExportJob(long jobId) {
Expand Down Expand Up @@ -554,6 +557,7 @@ private Map<String, String> convertOutfileProperties() {
if (!deleteExistingFiles.isEmpty()) {
outfileProperties.put(OutFileClause.PROP_DELETE_EXISTING_FILES, deleteExistingFiles);
}
outfileProperties.put(OutFileClause.PROP_WITH_BOM, withBom);

// broker properties
// outfile clause's broker properties need 'broker.' prefix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ private List<Comparable> composeExportJobInfo(ExportJob job) {
infoMap.put("broker", job.getBrokerDesc().getName());
infoMap.put("column_separator", job.getColumnSeparator());
infoMap.put("format", job.getFormat());
infoMap.put("with_bom", job.getWithBom());
infoMap.put("line_delimiter", job.getLineDelimiter());
infoMap.put("columns", job.getColumns());
infoMap.put("tablet_num", job.getTabletsNum());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.Lists;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
Expand All @@ -56,7 +55,6 @@ public class ExportTaskExecutor implements TransientTaskExecutor {

ExportJob exportJob;

@Setter
Long taskId;

private StmtExecutor stmtExecutor;
Expand Down Expand Up @@ -205,4 +203,8 @@ private Optional<UnboundRelation> findUnboundRelation(LogicalPlan plan) {
}
return Optional.empty();
}

public void setTaskId(Long taskId) {
this.taskId = taskId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class ExportCommand extends Command implements ForwardWithSync {
.add(PropertyAnalyzer.PROPERTIES_LINE_DELIMITER)
.add(PropertyAnalyzer.PROPERTIES_TIMEOUT)
.add("format")
.add(OutFileClause.PROP_WITH_BOM)
.build();

private final List<String> nameParts;
Expand Down Expand Up @@ -267,6 +268,9 @@ private ExportJob generateExportJob(ConnectContext ctx, Map<String, String> file
exportJob.setFormat(fileProperties.getOrDefault(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE, "csv")
.toLowerCase());

// set withBom
exportJob.setWithBom(fileProperties.getOrDefault(OutFileClause.PROP_WITH_BOM, "false"));

// set parallelism
int parallelism;
try {
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/DataSinks.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ struct TResultFileSinkOptions {

16: optional bool delete_existing_files;
17: optional string file_suffix;
18: optional bool with_bom;
}

struct TMemoryScratchSink {
Expand Down
Loading