Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix allow cr end of line for csv #56901

Merged
merged 7 commits into from Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 16 additions & 4 deletions src/Formats/FormatFactory.cpp
Expand Up @@ -292,7 +292,7 @@ InputFormatPtr FormatFactory::getInput(
// Decide whether to use ParallelParsingInputFormat.

bool parallel_parsing =
max_parsing_threads > 1 && settings.input_format_parallel_parsing && creators.file_segmentation_engine &&
max_parsing_threads > 1 && settings.input_format_parallel_parsing && creators.file_segmentation_engine_creator &&
!creators.random_access_input_creator && !need_only_count;

if (settings.max_memory_usage && settings.min_chunk_bytes_for_parallel_parsing * max_parsing_threads * 2 > settings.max_memory_usage)
Expand Down Expand Up @@ -322,7 +322,7 @@ InputFormatPtr FormatFactory::getInput(
{ return input_getter(input, sample, row_input_format_params, format_settings); };

ParallelParsingInputFormat::Params params{
buf, sample, parser_creator, creators.file_segmentation_engine, name, max_parsing_threads,
buf, sample, parser_creator, creators.file_segmentation_engine_creator, name, format_settings, max_parsing_threads,
settings.min_chunk_bytes_for_parallel_parsing, max_block_size, context->getApplicationType() == Context::ApplicationType::SERVER};

format = std::make_shared<ParallelParsingInputFormat>(params);
Expand Down Expand Up @@ -668,10 +668,22 @@ String FormatFactory::getFormatFromFileDescriptor(int fd)

void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine)
{
auto & target = dict[name].file_segmentation_engine;
auto & target = dict[name].file_segmentation_engine_creator;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: File segmentation engine {} is already registered", name);
target = std::move(file_segmentation_engine);
auto creator = [file_segmentation_engine](const FormatSettings &)
{
return file_segmentation_engine;
};
target = std::move(creator);
}

void FormatFactory::registerFileSegmentationEngineCreator(const String & name, FileSegmentationEngineCreator file_segmentation_engine_creator)
{
auto & target = dict[name].file_segmentation_engine_creator;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: File segmentation engine creator {} is already registered", name);
target = std::move(file_segmentation_engine_creator);
}

void FormatFactory::registerSchemaReader(const String & name, SchemaReaderCreator schema_reader_creator)
Expand Down
7 changes: 6 additions & 1 deletion src/Formats/FormatFactory.h
Expand Up @@ -71,6 +71,9 @@ class FormatFactory final : private boost::noncopyable
size_t min_bytes,
size_t max_rows)>;

using FileSegmentationEngineCreator = std::function<FileSegmentationEngine(
const FormatSettings & settings)>;

private:
// On the input side, there are two kinds of formats:
// * InputCreator - formats parsed sequentially, e.g. CSV. Almost all formats are like this.
Expand Down Expand Up @@ -132,7 +135,7 @@ class FormatFactory final : private boost::noncopyable
InputCreator input_creator;
RandomAccessInputCreator random_access_input_creator;
OutputCreator output_creator;
FileSegmentationEngine file_segmentation_engine;
FileSegmentationEngineCreator file_segmentation_engine_creator;
SchemaReaderCreator schema_reader_creator;
ExternalSchemaReaderCreator external_schema_reader_creator;
bool supports_parallel_formatting{false};
Expand Down Expand Up @@ -203,6 +206,8 @@ class FormatFactory final : private boost::noncopyable

void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine);

void registerFileSegmentationEngineCreator(const String & name, FileSegmentationEngineCreator file_segmentation_engine_creator);

void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker);

void registerAppendSupportChecker(const String & name, AppendSupportChecker append_support_checker);
Expand Down
17 changes: 12 additions & 5 deletions src/Processors/Formats/Impl/CSVRowInputFormat.cpp
Expand Up @@ -167,7 +167,9 @@ void CSVFormatReader::skipRow()
else if (*pos == '\r')
{
++istr.position();
if (!istr.eof() && *pos == '\n')
if (format_settings.csv.allow_cr_end_of_line)
return;
else if (!istr.eof() && *pos == '\n')
{
++pos;
return;
Expand Down Expand Up @@ -509,7 +511,7 @@ void registerInputFormatCSV(FormatFactory & factory)
registerWithNamesAndTypes("CSV", register_func);
}

std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t min_rows, size_t max_rows)
std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t min_rows, size_t max_rows, const FormatSettings & settings)
{
char * pos = in.position();
bool quotes = false;
Expand Down Expand Up @@ -561,7 +563,9 @@ std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memor
else if (*pos == '\r')
{
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\n')
if (settings.csv.allow_cr_end_of_line)
continue;
else if (loadAtPosition(in, memory, pos) && *pos == '\n')
++pos;
else
continue;
Expand All @@ -584,9 +588,12 @@ void registerFileSegmentationEngineCSV(FormatFactory & factory)
auto register_func = [&](const String & format_name, bool, bool)
{
static constexpr size_t min_rows = 3; /// Make it 3 for header auto detection (first 3 rows must be always in the same segment).
factory.registerFileSegmentationEngine(format_name, [](ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
factory.registerFileSegmentationEngineCreator(format_name, [](const FormatSettings & settings) -> FormatFactory::FileSegmentationEngine
{
return fileSegmentationEngineCSVImpl(in, memory, min_bytes, min_rows, max_rows);
return [settings] (ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
{
return fileSegmentationEngineCSVImpl(in, memory, min_bytes, min_rows, max_rows, settings);
};
});
};

Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/CSVRowInputFormat.h
Expand Up @@ -116,6 +116,6 @@ class CSVSchemaReader : public FormatWithNamesAndTypesSchemaReader
DataTypes buffered_types;
};

std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t min_rows, size_t max_rows);
std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t min_rows, size_t max_rows, const FormatSettings & settings);

}
9 changes: 6 additions & 3 deletions src/Processors/Formats/Impl/HiveTextRowInputFormat.cpp
Expand Up @@ -66,10 +66,13 @@ void registerInputFormatHiveText(FormatFactory & factory)

void registerFileSegmentationEngineHiveText(FormatFactory & factory)
{
factory.registerFileSegmentationEngine(
factory.registerFileSegmentationEngineCreator(
"HiveText",
[](ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows) -> std::pair<bool, size_t> {
return fileSegmentationEngineCSVImpl(in, memory, min_bytes, 0, max_rows);
[](const FormatSettings & settings) -> FormatFactory::FileSegmentationEngine {
return [settings] (ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
{
return fileSegmentationEngineCSVImpl(in, memory, min_bytes, 0, max_rows, settings);
};
});
}

Expand Down
1 change: 1 addition & 0 deletions src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp
Expand Up @@ -40,6 +40,7 @@ void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupPtr thread
unit.segment.resize(0);

size_t segment_start = getDataOffsetMaybeCompressed(*in);
auto file_segmentation_engine = file_segmentation_engine_creator(format_settings);
auto [have_more_data, currently_read_rows] = file_segmentation_engine(*in, unit.segment, min_chunk_bytes, max_block_size);

unit.original_segment_size = getDataOffsetMaybeCompressed(*in) - segment_start;
Expand Down
9 changes: 6 additions & 3 deletions src/Processors/Formats/Impl/ParallelParsingInputFormat.h
Expand Up @@ -85,8 +85,9 @@ class ParallelParsingInputFormat : public IInputFormat
ReadBuffer & in;
Block header;
InternalParserCreator internal_parser_creator;
FormatFactory::FileSegmentationEngine file_segmentation_engine;
FormatFactory::FileSegmentationEngineCreator file_segmentation_engine_creator;
String format_name;
FormatSettings format_settings;
size_t max_threads;
size_t min_chunk_bytes;
size_t max_block_size;
Expand All @@ -96,8 +97,9 @@ class ParallelParsingInputFormat : public IInputFormat
explicit ParallelParsingInputFormat(Params params)
: IInputFormat(std::move(params.header), &params.in)
, internal_parser_creator(params.internal_parser_creator)
, file_segmentation_engine(params.file_segmentation_engine)
, file_segmentation_engine_creator(params.file_segmentation_engine_creator)
, format_name(params.format_name)
, format_settings(params.format_settings)
, min_chunk_bytes(params.min_chunk_bytes)
, max_block_size(params.max_block_size)
, is_server(params.is_server)
Expand Down Expand Up @@ -196,8 +198,9 @@ class ParallelParsingInputFormat : public IInputFormat

const InternalParserCreator internal_parser_creator;
/// Function to segment the file. Then "parsers" will parse that segments.
FormatFactory::FileSegmentationEngine file_segmentation_engine;
FormatFactory::FileSegmentationEngineCreator file_segmentation_engine_creator;
const String format_name;
const FormatSettings format_settings;
const size_t min_chunk_bytes;
const size_t max_block_size;

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/System/StorageSystemFormats.cpp
Expand Up @@ -25,7 +25,7 @@ void StorageSystemFormats::fillData(MutableColumns & res_columns, ContextPtr, co
const auto & [format_name, creators] = pair;
UInt64 has_input_format(creators.input_creator != nullptr || creators.random_access_input_creator != nullptr);
UInt64 has_output_format(creators.output_creator != nullptr);
UInt64 supports_parallel_parsing(creators.file_segmentation_engine != nullptr || creators.random_access_input_creator != nullptr);
UInt64 supports_parallel_parsing(creators.file_segmentation_engine_creator != nullptr || creators.random_access_input_creator != nullptr);
UInt64 supports_parallel_formatting(creators.supports_parallel_formatting);

res_columns[0]->insert(format_name);
Expand Down
@@ -0,0 +1,2 @@
1010559
1010559
@@ -0,0 +1,16 @@
#!/usr/bin/env bash

# NOTE: this sh wrapper is required because of shell_config

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

USER_FILES_PATH=$($CLICKHOUSE_CLIENT --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep -E '^Code: 107.*FILE_DOESNT_EXIST' | head -1 | awk '{gsub("/nonexist.txt","",$9); print $9}')

cp "$CURDIR"/data_csv/1m_rows_cr_end_of_line.csv.xz $USER_FILES_PATH/

$CLICKHOUSE_CLIENT -q "SELECT count(1) from file('1m_rows_cr_end_of_line.csv.xz') settings input_format_csv_allow_cr_end_of_line=1, optimize_count_from_files=1"
$CLICKHOUSE_CLIENT -q "SELECT count(1) from file('1m_rows_cr_end_of_line.csv.xz') settings input_format_csv_allow_cr_end_of_line=1, optimize_count_from_files=0"

rm $USER_FILES_PATH/1m_rows_cr_end_of_line.csv.xz
Binary file not shown.