Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merging parallel format parsing #7780

Merged
merged 54 commits into from
Nov 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
d47d4cd
parallel parsing
nikitamikhaylov Oct 1, 2019
d9c12f7
lost files
nikitamikhaylov Oct 1, 2019
5a34e39
small changes after review
nikitamikhaylov Oct 2, 2019
34d8ade
Merge branch 'master' of github.com:ClickHouse/ClickHouse into parall…
nikitamikhaylov Oct 22, 2019
905a65b
some changes after review
nikitamikhaylov Oct 22, 2019
31b6928
fix data race on vector<bool>
nikitamikhaylov Oct 23, 2019
c867d7a
fix max_insert_block_size
nikitamikhaylov Oct 23, 2019
8b17ce4
remove unused mutex
nikitamikhaylov Oct 23, 2019
c7084ff
better
nikitamikhaylov Oct 24, 2019
342f259
comments
nikitamikhaylov Oct 24, 2019
76e64d1
empty commit
nikitamikhaylov Oct 24, 2019
78b6322
empty commit 2
nikitamikhaylov Oct 24, 2019
682b9df
remove SharedReadBuffer + remove lambdas
nikitamikhaylov Oct 25, 2019
60ee52c
remove unrelated changes in tests
nikitamikhaylov Oct 25, 2019
980528e
docs
nikitamikhaylov Oct 25, 2019
4fe0dda
add cancel test
nikitamikhaylov Oct 25, 2019
206800b
add stderr configs to kafka test
nikitamikhaylov Oct 28, 2019
756a107
Merge branch 'master' of github.com:ClickHouse/ClickHouse into parall…
nikitamikhaylov Oct 28, 2019
2bd7932
test kafka limit
nikitamikhaylov Oct 28, 2019
c5085b8
max_threads limit
nikitamikhaylov Oct 28, 2019
08f1930
Merge branch 'master' of github.com:ClickHouse/ClickHouse into parall…
nikitamikhaylov Oct 29, 2019
5bcfee1
empty + master
nikitamikhaylov Oct 29, 2019
b691cc7
comments and checks
nikitamikhaylov Oct 30, 2019
874f88e
disable feature to test
nikitamikhaylov Oct 30, 2019
cb48ee4
enable feature
nikitamikhaylov Oct 30, 2019
9d8bbeb
bad fix for PrettyCompact
nikitamikhaylov Oct 31, 2019
0e04d14
cancel
nikitamikhaylov Nov 1, 2019
3c57b8e
test
nikitamikhaylov Nov 1, 2019
817b523
Merge branch 'master' of github.com:ClickHouse/ClickHouse into parall…
nikitamikhaylov Nov 5, 2019
0d3a05c
remove logging + fix for PrettySpace + overrided readSuffix and readP…
nikitamikhaylov Nov 5, 2019
5416914
remove logging
nikitamikhaylov Nov 5, 2019
220ccca
executed in destructor
nikitamikhaylov Nov 6, 2019
8358b64
comment
nikitamikhaylov Nov 11, 2019
57663c3
processing unit added
nikitamikhaylov Nov 11, 2019
7dda8a9
fix stupid mistake
nikitamikhaylov Nov 11, 2019
c335c4f
empty
nikitamikhaylov Nov 11, 2019
5e789e4
Some renames & remove weird logic from cancel()
akuzm Nov 14, 2019
db81aae
Cleaup
akuzm Nov 14, 2019
7f34c00
Merge remote-tracking branch 'origin/master' into HEAD
akuzm Nov 14, 2019
ce92615
remove unused variable
akuzm Nov 14, 2019
8f3bd8f
Fix typo in Memory: reuse buffer if capacity allows.
akuzm Nov 15, 2019
5d5882d
wip: a saner segmentation function for TSV
akuzm Nov 15, 2019
83030b9
remove MemoryExt<>
akuzm Nov 18, 2019
4ab7ac1
Remove all segmentation engines except TSV
akuzm Nov 18, 2019
bb98328
Make parser and read buffer local to parser thread
akuzm Nov 18, 2019
17b4565
Cleanup
akuzm Nov 18, 2019
168e15b
chunk size -> chunk bytes
akuzm Nov 18, 2019
f8f6de8
remove separate control for threads
akuzm Nov 18, 2019
0d1933c
Calculate the number of parsing theads correctly
akuzm Nov 19, 2019
7e5731d
Merge remote-tracking branch 'origin/master' into HEAD
akuzm Nov 19, 2019
9a2b864
Cleanup
akuzm Nov 19, 2019
c913155
Fix TSV segmentation.
akuzm Nov 20, 2019
31cb692
small fix
nikitamikhaylov Nov 21, 2019
1d74aec
fix typo in asserts
nikitamikhaylov Nov 21, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions dbms/programs/client/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,14 @@ class Client : public Poco::Util::Application
/// Check if server send Exception packet
auto packet_type = connection->checkPacket();
if (packet_type && *packet_type == Protocol::Server::Exception)
{
/*
* We're exiting with error, so it makes sense to kill the
* input stream without waiting for it to complete.
*/
async_block_input->cancel(true);
return;
}

connection->sendData(block);
processed_rows += block.rows();
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.", 0) \
M(SettingBool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \
\
M(SettingBool, input_format_parallel_parsing, true, "Enable parallel parsing for some data formats.", 0) \
M(SettingUInt64, min_chunk_bytes_for_parallel_parsing, (1024 * 1024), "The minimum chunk size in bytes, which each thread will parse in parallel.", 0) \
\
M(SettingUInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized.", 0) \
M(SettingUInt64, merge_tree_min_bytes_for_concurrent_read, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized.", 0) \
M(SettingUInt64, merge_tree_min_rows_for_seek, 0, "You can skip reading more than that number of rows at the price of one seek per file.", 0) \
Expand Down
203 changes: 203 additions & 0 deletions dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
#include <DataStreams/ParallelParsingBlockInputStream.h>
#include "ParallelParsingBlockInputStream.h"

namespace DB
{

void ParallelParsingBlockInputStream::segmentatorThreadFunction()
{
setThreadName("Segmentator");
try
{
while (!finished)
{
const auto current_unit_number = segmentator_ticket_number % processing_units.size();
auto & unit = processing_units[current_unit_number];

{
std::unique_lock lock(mutex);
segmentator_condvar.wait(lock,
[&]{ return unit.status == READY_TO_INSERT || finished; });
}

if (finished)
{
break;
}

assert(unit.status == READY_TO_INSERT);

// Segmentating the original input.
unit.segment.resize(0);

const bool have_more_data = file_segmentation_engine(original_buffer,
unit.segment, min_chunk_bytes);

unit.is_last = !have_more_data;
unit.status = READY_TO_PARSE;
scheduleParserThreadForUnitWithNumber(current_unit_number);
++segmentator_ticket_number;

if (!have_more_data)
{
break;
}
}
}
catch (...)
{
onBackgroundException();
}
}

void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_number)
{
try
{
setThreadName("ChunkParser");

auto & unit = processing_units[current_unit_number];

/*
* This is kind of suspicious -- the input_process_creator contract with
* respect to multithreaded use is not clear, but we hope that it is
* just a 'normal' factory class that doesn't have any state, and so we
* can use it from multiple threads simultaneously.
*/
ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0);
auto parser = std::make_unique<InputStreamFromInputFormat>(
input_processor_creator(read_buffer, header, context,
row_input_format_params, format_settings));

unit.block_ext.block.clear();
unit.block_ext.block_missing_values.clear();

// We don't know how many blocks will be. So we have to read them all
// until an empty block occured.
Block block;
while (!finished && (block = parser->read()) != Block())
{
unit.block_ext.block.emplace_back(block);
unit.block_ext.block_missing_values.emplace_back(parser->getMissingValues());
}

// We suppose we will get at least some blocks for a non-empty buffer,
// except at the end of file. Also see a matching assert in readImpl().
assert(unit.is_last || unit.block_ext.block.size() > 0);

std::unique_lock lock(mutex);
unit.status = READY_TO_READ;
reader_condvar.notify_all();
}
catch (...)
{
onBackgroundException();
}
}

void ParallelParsingBlockInputStream::onBackgroundException()
{
tryLogCurrentException(__PRETTY_FUNCTION__);

std::unique_lock lock(mutex);
if (!background_exception)
{
background_exception = std::current_exception();
}
finished = true;
reader_condvar.notify_all();
segmentator_condvar.notify_all();
}

Block ParallelParsingBlockInputStream::readImpl()
{
if (isCancelledOrThrowIfKilled() || finished)
{
/**
* Check for background exception and rethrow it before we return.
*/
std::unique_lock lock(mutex);
if (background_exception)
{
lock.unlock();
cancel(false);
std::rethrow_exception(background_exception);
}

return Block{};
}

const auto current_unit_number = reader_ticket_number % processing_units.size();
auto & unit = processing_units[current_unit_number];

if (!next_block_in_current_unit.has_value())
{
// We have read out all the Blocks from the previous Processing Unit,
// wait for the current one to become ready.
std::unique_lock lock(mutex);
reader_condvar.wait(lock, [&](){ return unit.status == READY_TO_READ || finished; });

if (finished)
{
/**
* Check for background exception and rethrow it before we return.
*/
if (background_exception)
{
lock.unlock();
cancel(false);
std::rethrow_exception(background_exception);
}

return Block{};
}

assert(unit.status == READY_TO_READ);
next_block_in_current_unit = 0;
}

if (unit.block_ext.block.size() == 0)
{
/*
* Can we get zero blocks for an entire segment, when the format parser
* skips it entire content and does not create any blocks? Probably not,
* but if we ever do, we should add a loop around the above if, to skip
* these. Also see a matching assert in the parser thread.
*/
assert(unit.is_last);
finished = true;
return Block{};
}

assert(next_block_in_current_unit.value() < unit.block_ext.block.size());

Block res = std::move(unit.block_ext.block.at(*next_block_in_current_unit));
last_block_missing_values = std::move(unit.block_ext.block_missing_values[*next_block_in_current_unit]);

next_block_in_current_unit.value() += 1;

if (*next_block_in_current_unit == unit.block_ext.block.size())
{
// Finished reading this Processing Unit, move to the next one.
next_block_in_current_unit.reset();
++reader_ticket_number;

if (unit.is_last)
{
// It it was the last unit, we're finished.
finished = true;
}
else
{
// Pass the unit back to the segmentator.
std::unique_lock lock(mutex);
unit.status = READY_TO_INSERT;
segmentator_condvar.notify_all();
}
}

return res;
}


}