Skip to content

Commit

Permalink
Merge pull request #56901 from KevinyhZou/Fix_allow_cr_end_of_csv_line
Browse files Browse the repository at this point in the history
Fix allow cr end of line for csv
  • Loading branch information
Avogar committed Nov 29, 2023
2 parents b7c961d + 619b2f2 commit c6fecfb
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 18 deletions.
20 changes: 16 additions & 4 deletions src/Formats/FormatFactory.cpp
Expand Up @@ -293,7 +293,7 @@ InputFormatPtr FormatFactory::getInput(
// Decide whether to use ParallelParsingInputFormat.

bool parallel_parsing =
max_parsing_threads > 1 && settings.input_format_parallel_parsing && creators.file_segmentation_engine &&
max_parsing_threads > 1 && settings.input_format_parallel_parsing && creators.file_segmentation_engine_creator &&
!creators.random_access_input_creator && !need_only_count;

if (settings.max_memory_usage && settings.min_chunk_bytes_for_parallel_parsing * max_parsing_threads * 2 > settings.max_memory_usage)
Expand Down Expand Up @@ -323,7 +323,7 @@ InputFormatPtr FormatFactory::getInput(
{ return input_getter(input, sample, row_input_format_params, format_settings); };

ParallelParsingInputFormat::Params params{
buf, sample, parser_creator, creators.file_segmentation_engine, name, max_parsing_threads,
buf, sample, parser_creator, creators.file_segmentation_engine_creator, name, format_settings, max_parsing_threads,
settings.min_chunk_bytes_for_parallel_parsing, max_block_size, context->getApplicationType() == Context::ApplicationType::SERVER};

format = std::make_shared<ParallelParsingInputFormat>(params);
Expand Down Expand Up @@ -669,10 +669,22 @@ String FormatFactory::getFormatFromFileDescriptor(int fd)

void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine)
{
auto & target = dict[name].file_segmentation_engine;
auto & target = dict[name].file_segmentation_engine_creator;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: File segmentation engine {} is already registered", name);
target = std::move(file_segmentation_engine);
auto creator = [file_segmentation_engine](const FormatSettings &)
{
return file_segmentation_engine;
};
target = std::move(creator);
}

void FormatFactory::registerFileSegmentationEngineCreator(const String & name, FileSegmentationEngineCreator file_segmentation_engine_creator)
{
auto & target = dict[name].file_segmentation_engine_creator;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: File segmentation engine creator {} is already registered", name);
target = std::move(file_segmentation_engine_creator);
}

void FormatFactory::registerSchemaReader(const String & name, SchemaReaderCreator schema_reader_creator)
Expand Down
7 changes: 6 additions & 1 deletion src/Formats/FormatFactory.h
Expand Up @@ -71,6 +71,9 @@ class FormatFactory final : private boost::noncopyable
size_t min_bytes,
size_t max_rows)>;

using FileSegmentationEngineCreator = std::function<FileSegmentationEngine(
const FormatSettings & settings)>;

private:
// On the input side, there are two kinds of formats:
// * InputCreator - formats parsed sequentially, e.g. CSV. Almost all formats are like this.
Expand Down Expand Up @@ -132,7 +135,7 @@ class FormatFactory final : private boost::noncopyable
InputCreator input_creator;
RandomAccessInputCreator random_access_input_creator;
OutputCreator output_creator;
FileSegmentationEngine file_segmentation_engine;
FileSegmentationEngineCreator file_segmentation_engine_creator;
SchemaReaderCreator schema_reader_creator;
ExternalSchemaReaderCreator external_schema_reader_creator;
bool supports_parallel_formatting{false};
Expand Down Expand Up @@ -203,6 +206,8 @@ class FormatFactory final : private boost::noncopyable

void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine);

void registerFileSegmentationEngineCreator(const String & name, FileSegmentationEngineCreator file_segmentation_engine_creator);

void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker);

void registerAppendSupportChecker(const String & name, AppendSupportChecker append_support_checker);
Expand Down
17 changes: 12 additions & 5 deletions src/Processors/Formats/Impl/CSVRowInputFormat.cpp
Expand Up @@ -167,7 +167,9 @@ void CSVFormatReader::skipRow()
else if (*pos == '\r')
{
++istr.position();
if (!istr.eof() && *pos == '\n')
if (format_settings.csv.allow_cr_end_of_line)
return;
else if (!istr.eof() && *pos == '\n')
{
++pos;
return;
Expand Down Expand Up @@ -509,7 +511,7 @@ void registerInputFormatCSV(FormatFactory & factory)
registerWithNamesAndTypes("CSV", register_func);
}

std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t min_rows, size_t max_rows)
std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t min_rows, size_t max_rows, const FormatSettings & settings)
{
char * pos = in.position();
bool quotes = false;
Expand Down Expand Up @@ -561,7 +563,9 @@ std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memor
else if (*pos == '\r')
{
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\n')
if (settings.csv.allow_cr_end_of_line)
continue;
else if (loadAtPosition(in, memory, pos) && *pos == '\n')
++pos;
else
continue;
Expand All @@ -584,9 +588,12 @@ void registerFileSegmentationEngineCSV(FormatFactory & factory)
auto register_func = [&](const String & format_name, bool, bool)
{
static constexpr size_t min_rows = 3; /// Make it 3 for header auto detection (first 3 rows must be always in the same segment).
factory.registerFileSegmentationEngine(format_name, [](ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
factory.registerFileSegmentationEngineCreator(format_name, [](const FormatSettings & settings) -> FormatFactory::FileSegmentationEngine
{
return fileSegmentationEngineCSVImpl(in, memory, min_bytes, min_rows, max_rows);
return [settings] (ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
{
return fileSegmentationEngineCSVImpl(in, memory, min_bytes, min_rows, max_rows, settings);
};
});
};

Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/CSVRowInputFormat.h
Expand Up @@ -116,6 +116,6 @@ class CSVSchemaReader : public FormatWithNamesAndTypesSchemaReader
DataTypes buffered_types;
};

std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t min_rows, size_t max_rows);
std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t min_rows, size_t max_rows, const FormatSettings & settings);

}
9 changes: 6 additions & 3 deletions src/Processors/Formats/Impl/HiveTextRowInputFormat.cpp
Expand Up @@ -66,10 +66,13 @@ void registerInputFormatHiveText(FormatFactory & factory)

void registerFileSegmentationEngineHiveText(FormatFactory & factory)
{
factory.registerFileSegmentationEngine(
factory.registerFileSegmentationEngineCreator(
"HiveText",
[](ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows) -> std::pair<bool, size_t> {
return fileSegmentationEngineCSVImpl(in, memory, min_bytes, 0, max_rows);
[](const FormatSettings & settings) -> FormatFactory::FileSegmentationEngine {
return [settings] (ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
{
return fileSegmentationEngineCSVImpl(in, memory, min_bytes, 0, max_rows, settings);
};
});
}

Expand Down
1 change: 1 addition & 0 deletions src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp
Expand Up @@ -40,6 +40,7 @@ void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupPtr thread
unit.segment.resize(0);

size_t segment_start = getDataOffsetMaybeCompressed(*in);
auto file_segmentation_engine = file_segmentation_engine_creator(format_settings);
auto [have_more_data, currently_read_rows] = file_segmentation_engine(*in, unit.segment, min_chunk_bytes, max_block_size);

unit.original_segment_size = getDataOffsetMaybeCompressed(*in) - segment_start;
Expand Down
9 changes: 6 additions & 3 deletions src/Processors/Formats/Impl/ParallelParsingInputFormat.h
Expand Up @@ -86,8 +86,9 @@ class ParallelParsingInputFormat : public IInputFormat
ReadBuffer & in;
Block header;
InternalParserCreator internal_parser_creator;
FormatFactory::FileSegmentationEngine file_segmentation_engine;
FormatFactory::FileSegmentationEngineCreator file_segmentation_engine_creator;
String format_name;
FormatSettings format_settings;
size_t max_threads;
size_t min_chunk_bytes;
size_t max_block_size;
Expand All @@ -97,8 +98,9 @@ class ParallelParsingInputFormat : public IInputFormat
explicit ParallelParsingInputFormat(Params params)
: IInputFormat(std::move(params.header), &params.in)
, internal_parser_creator(params.internal_parser_creator)
, file_segmentation_engine(params.file_segmentation_engine)
, file_segmentation_engine_creator(params.file_segmentation_engine_creator)
, format_name(params.format_name)
, format_settings(params.format_settings)
, min_chunk_bytes(params.min_chunk_bytes)
, max_block_size(params.max_block_size)
, is_server(params.is_server)
Expand Down Expand Up @@ -197,8 +199,9 @@ class ParallelParsingInputFormat : public IInputFormat

const InternalParserCreator internal_parser_creator;
/// Function to segment the file. Then "parsers" will parse that segments.
FormatFactory::FileSegmentationEngine file_segmentation_engine;
FormatFactory::FileSegmentationEngineCreator file_segmentation_engine_creator;
const String format_name;
const FormatSettings format_settings;
const size_t min_chunk_bytes;
const size_t max_block_size;

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/System/StorageSystemFormats.cpp
Expand Up @@ -25,7 +25,7 @@ void StorageSystemFormats::fillData(MutableColumns & res_columns, ContextPtr, co
const auto & [format_name, creators] = pair;
UInt64 has_input_format(creators.input_creator != nullptr || creators.random_access_input_creator != nullptr);
UInt64 has_output_format(creators.output_creator != nullptr);
UInt64 supports_parallel_parsing(creators.file_segmentation_engine != nullptr || creators.random_access_input_creator != nullptr);
UInt64 supports_parallel_parsing(creators.file_segmentation_engine_creator != nullptr || creators.random_access_input_creator != nullptr);
UInt64 supports_parallel_formatting(creators.supports_parallel_formatting);

res_columns[0]->insert(format_name);
Expand Down
@@ -0,0 +1,2 @@
1010559
1010559
@@ -0,0 +1,16 @@
#!/usr/bin/env bash

# NOTE: this sh wrapper is required because of shell_config

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

USER_FILES_PATH=$($CLICKHOUSE_CLIENT --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep -E '^Code: 107.*FILE_DOESNT_EXIST' | head -1 | awk '{gsub("/nonexist.txt","",$9); print $9}')

cp "$CURDIR"/data_csv/1m_rows_cr_end_of_line.csv.xz $USER_FILES_PATH/

$CLICKHOUSE_CLIENT -q "SELECT count(1) from file('1m_rows_cr_end_of_line.csv.xz') settings input_format_csv_allow_cr_end_of_line=1, optimize_count_from_files=1"
$CLICKHOUSE_CLIENT -q "SELECT count(1) from file('1m_rows_cr_end_of_line.csv.xz') settings input_format_csv_allow_cr_end_of_line=1, optimize_count_from_files=0"

rm $USER_FILES_PATH/1m_rows_cr_end_of_line.csv.xz
Binary file not shown.

0 comments on commit c6fecfb

Please sign in to comment.