Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turn off parallel parsing when memory limit is small. #16721

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/DataStreams/ParallelParsingBlockInputStream.cpp
Expand Up @@ -17,7 +17,7 @@ ParallelParsingBlockInputStream::ParallelParsingBlockInputStream(const Params &
// Subtract one thread that we use for segmentation and one for
// reading. After that, must have at least two threads left for
// parsing. See the assertion below.
pool(std::max(2, params.max_threads - 2)),
pool(std::max(2, static_cast<int>(params.max_threads) - 2)),
file_segmentation_engine(params.file_segmentation_engine)
{
// See comment above.
Expand Down
2 changes: 1 addition & 1 deletion src/DataStreams/ParallelParsingBlockInputStream.h
Expand Up @@ -69,7 +69,7 @@ class ParallelParsingBlockInputStream : public IBlockInputStream
const InputProcessorCreator & input_processor_creator;
const InputCreatorParams & input_creator_params;
FormatFactory::FileSegmentationEngine file_segmentation_engine;
int max_threads;
size_t max_threads;
size_t min_chunk_bytes;
};

Expand Down
6 changes: 4 additions & 2 deletions src/Formats/FormatFactory.cpp
Expand Up @@ -160,6 +160,9 @@ BlockInputStreamPtr FormatFactory::getInput(
// (segmentator + two parsers + reader).
bool parallel_parsing = settings.input_format_parallel_parsing && file_segmentation_engine && settings.max_threads >= 4;

if (settings.min_chunk_bytes_for_parallel_parsing * settings.max_threads * 2 > settings.max_memory_usage)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like max_memory_usage_for_user also should be checked

BTW can't this be done by lowering the number of threads dynamically instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW can't this be done by lowering the number of threads dynamically instead?

I don't know( I think there will be a complex logic because of corner cases. Chosen number of threads must be > 4, otherwise it will be ineffective. and if settings.max_memory_usage < settings.min_chunk_bytes_for_parallel_parsing, we have to turn it off...

parallel_parsing = false;

if (parallel_parsing && name == "JSONEachRow")
{
/// FIXME ParallelParsingBlockInputStream doesn't support formats with non-trivial readPrefix() and readSuffix()
Expand Down Expand Up @@ -188,7 +191,7 @@ BlockInputStreamPtr FormatFactory::getInput(
row_input_format_params, format_settings};
ParallelParsingBlockInputStream::Params params{buf, input_getter,
input_creator_params, file_segmentation_engine,
static_cast<int>(settings.max_threads),
settings.max_threads,
settings.min_chunk_bytes_for_parallel_parsing};
return std::make_shared<ParallelParsingBlockInputStream>(params);
}
Expand Down Expand Up @@ -334,7 +337,6 @@ void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegm
target = std::move(file_segmentation_engine);
}


FormatFactory & FormatFactory::instance()
{
static FormatFactory ret;
Expand Down
2 changes: 0 additions & 2 deletions src/Formats/registerFormats.cpp
Expand Up @@ -15,7 +15,6 @@ void registerFileSegmentationEngineCSV(FormatFactory & factory);
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory);
void registerFileSegmentationEngineRegexp(FormatFactory & factory);
void registerFileSegmentationEngineJSONAsString(FormatFactory & factory);
void registerFileSegmentationEngineLineAsString(FormatFactory & factory);

/// Formats for both input/output.

Expand Down Expand Up @@ -90,7 +89,6 @@ void registerFormats()
registerFileSegmentationEngineJSONEachRow(factory);
registerFileSegmentationEngineRegexp(factory);
registerFileSegmentationEngineJSONAsString(factory);
registerFileSegmentationEngineLineAsString(factory);

registerInputFormatNative(factory);
registerOutputFormatNative(factory);
Expand Down
70 changes: 19 additions & 51 deletions src/Processors/Formats/Impl/LineAsStringRowInputFormat.cpp
Expand Up @@ -12,7 +12,7 @@ namespace ErrorCodes
}

LineAsStringRowInputFormat::LineAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) :
IRowInputFormat(header_, in_, std::move(params_)), buf(in)
IRowInputFormat(header_, in_, std::move(params_))
{
if (header_.columns() > 1 || header_.getDataTypes()[0]->getTypeId() != TypeIndex::String)
{
Expand All @@ -23,42 +23,37 @@ LineAsStringRowInputFormat::LineAsStringRowInputFormat(const Block & header_, Re
void LineAsStringRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
buf.reset();
}

void LineAsStringRowInputFormat::readLineObject(IColumn & column)
{
PeekableReadBufferCheckpoint checkpoint{buf};
bool newline = true;
bool over = false;
DB::Memory<> object;

char * pos;
char * pos = in.position();
bool need_more_data = true;

while (newline)
while (loadAtPosition(in, object, pos) && need_more_data)
{
pos = find_first_symbols<'\n'>(buf.position(), buf.buffer().end());
buf.position() = pos;
if (buf.position() == buf.buffer().end())
{
over = true;
break;
}
else if (*buf.position() == '\n')
{
newline = false;
}
pos = find_first_symbols<'\n'>(pos, in.buffer().end());
if (pos == in.buffer().end())
continue;

if (*pos == '\n')
need_more_data = false;

++pos;
}

buf.makeContinuousMemoryFromCheckpointToPos();
char * end = over ? buf.position(): ++buf.position();
buf.rollbackToCheckpoint();
column.insertData(buf.position(), end - (over ? 0 : 1) - buf.position());
buf.position() = end;
saveUpToPosition(in, object, pos);
loadAtPosition(in, object, pos);

/// Last character is always \n.
column.insertData(object.data(), object.size() - 1);
}

bool LineAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
if (buf.eof())
if (in.eof())
return false;

readLineObject(*columns[0]);
Expand All @@ -77,31 +72,4 @@ void registerInputFormatProcessorLineAsString(FormatFactory & factory)
return std::make_shared<LineAsStringRowInputFormat>(sample, buf, params);
});
}

static bool fileSegmentationEngineLineAsStringpImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
char * pos = in.position();
bool need_more_data = true;

while (loadAtPosition(in, memory, pos) && need_more_data)
{
pos = find_first_symbols<'\n'>(pos, in.buffer().end());
if (pos == in.buffer().end())
continue;

if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
need_more_data = false;

++pos;
}

saveUpToPosition(in, memory, pos);
return loadAtPosition(in, memory, pos);
}

void registerFileSegmentationEngineLineAsString(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("LineAsString", &fileSegmentationEngineLineAsStringpImpl);
}

}
3 changes: 0 additions & 3 deletions src/Processors/Formats/Impl/LineAsStringRowInputFormat.h
Expand Up @@ -2,7 +2,6 @@

#include <Processors/Formats/IRowInputFormat.h>
#include <Formats/FormatFactory.h>
#include <IO/PeekableReadBuffer.h>

namespace DB
{
Expand All @@ -24,8 +23,6 @@ class LineAsStringRowInputFormat : public IRowInputFormat

private:
void readLineObject(IColumn & column);

PeekableReadBuffer buf;
};

}
@@ -0,0 +1 @@
19884108
@@ -0,0 +1,8 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh

yes http://foobarfoobarfoobarfoobarfoobarfoobarfoobar.com | head -c1G > 1g.csv

$CLICKHOUSE_LOCAL --stacktrace --input_format_parallel_parsing=1 --max_memory_usage=100Mi -q "select count() from file('1g.csv', 'TSV', 'URL String')"