Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Distributed over distributed (v2) #9923

Merged
merged 8 commits into from
Apr 1, 2020

Conversation

azat
Copy link
Collaborator

@azat azat commented Mar 29, 2020

Fixes: #8640

Changelog category (leave one):

  • New Feature

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Add ability to query Distributed over Distributed (w/o distributed_group_by_no_merge)

P.S. Not sure that it is that easy (but looks like everything worksm and performance should not be worse since before an exception had been thrown anyway), hence RFC

P.P.S I asked the author of the #8640 (@Enmk) and he wasn't against I'm taking a look

@alexey-milovidov
Copy link
Member

Do I understand correctly that it forwards data when WithMergeableState -> WithMergeableState is requested and possibly, applies additional pre-sorting/distrinct/limit (but for what purpose?)

@alexey-milovidov
Copy link
Member

We can also test queries with GLOBAL IN/JOIN but don't worry - even if they don't work in Distributed over Distributed, it's still acceptable.

@azat
Copy link
Collaborator Author

azat commented Mar 29, 2020

Do I understand correctly that it forwards data when WithMergeableState -> WithMergeableState is requested and possibly, applies additional pre-sorting/distrinct/limit (but for what purpose?)

AFAIU It does not pre-sorting it sorts results that had been received so far from the shards, since otherwise this step will be skipped and only the final WithMergeableState -> Complete will sort, and not complete sort but only merge sorted already results.

@azat
Copy link
Collaborator Author

azat commented Mar 29, 2020

We can also test queries with GLOBAL IN/JOIN

JOIN works, but GLOBAL IN does not works even now (i.e. distributed_group_by_no_merge), fails with LOGACAL_ERROR Table expression is undefined, Method: ExpressionAnalyzer::interpretSubquery., which then will abort in debug build

@azat
Copy link
Collaborator Author

azat commented Mar 29, 2020

Oh, I see, looks like the failed tests are failed on upstream/master too

@alexey-milovidov
Copy link
Member

AFAIU It does not pre-sorting it sorts results that had been received so far from the shards, since otherwise this step will be skipped and only the final WithMergeableState -> Complete will sort, and not complete sort but only merge sorted already results.

If we have three servers:

A (local) -> B (distributed, intermediate) -> C (distributed, final)

I expect that it will work this way:
A will do pre-sorting and pre-limit.
B will do nothing except forwarding blocks.
C will do final sorting and limit.

In the same way as without B.

@alexey-milovidov
Copy link
Member

PS. We can do re-merging intermediate data on intermediate servers but it's out of scope of this task.

@alexey-milovidov
Copy link
Member

alexey-milovidov commented Mar 29, 2020

B will do nothing except forwarding blocks.

As I understand, the main challenge is that it cannot just forward blocks from different servers as a single stream... because it will interfere with merge sorting. Do I understand correctly that this is the reason why you are doing additional intermediate sorting?

@alexey-milovidov
Copy link
Member

We also should pay attention to two level GROUP BY and MergingAggregatedMemoryEfficient.
In this mode, every block has bucket_num assigned and data is merged per buckets.

@azat
Copy link
Collaborator Author

azat commented Mar 29, 2020

We also should pay attention to two level GROUP BY and MergingAggregatedMemoryEfficient.
In this mode, every block has bucket_num assigned and data is merged per buckets.

Looks ok (test updated)

@azat
Copy link
Collaborator Author

azat commented Mar 29, 2020

As I understand, the main challenge is that it cannot just concatenate blocks from different servers to a single stream... because it will interfere with merge sorting. Do I understand correctly that this is the reason why you are doing additional intermediate sorting?

Yes, the problem is that intermediate server need to sort the data, otherwise final server will receive unsorted data and it's merged sort will not work.

@qoega qoega added doc-alert pr-feature Pull request with new product feature labels Mar 30, 2020
@azat
Copy link
Collaborator Author

azat commented Mar 30, 2020

Functional stateless tests (release, streams) — fail: 38, passed: 74
Functional stateless tests (thread) — fail: 20, passed: 344, skipped: 5

No errors in clickhouse-server.log, OOM/reboot?
(Other tests fails in upstream/master too and performance looks unrelated)

@azat
Copy link
Collaborator Author

azat commented Mar 30, 2020

but GLOBAL IN does not works even now (i.e. distributed_group_by_no_merge), fails with LOGACAL_ERROR Table expression is undefined, Method: ExpressionAnalyzer::interpretSubquery., which then will abort in debug build

Added a test into bugs

@alexey-milovidov
Copy link
Member

Ok. But it also means that memory-efficient aggregation won't work without remerging.
You can test the case when servers send data by buckets...

The stacktrace looks like:

    [ 1393930 ] {61e2952b-5039-497a-8f8c-923644541261} <Debug> executeQuery: (from [::1]:28770) SELECT DISTINCT * FROM dist_01224 WHERE key GLOBAL IN (1) SETTINGS distributed_group_by_no_merge = 1
    [ 1393930 ] {61e2952b-5039-497a-8f8c-923644541261} <Error> : Logical error: 'Table expression is undefined, Method: ExpressionAnalyzer::interpretSubquery.'.
    src/Common/Exception.cpp:37: DB::Exception::Exception(const string&, int): Assertion `false' failed.
    [ 1393906 ] {} <Trace> BaseDaemon: Received signal 6
    [ 1421018 ] {} <Fatal> BaseDaemon: ########################################
    [ 1421018 ] {} <Fatal> BaseDaemon: (version 20.4.1.1) (from thread 1393930) (query_id: 61e2952b-5039-497a-8f8c-923644541261) Received signal Aborted (6).
    [ 1421018 ] {} <Fatal> BaseDaemon:
    [ 1421018 ] {} <Fatal> BaseDaemon: Stack trace: 0x7ffff6f22ce5 0x7ffff6f0c857 0x7ffff6f0c727 0x7ffff6f1b426 0x7ffff7bc5094 0x7fffeae131e6 0x7fffeae12f8b 0x7fffea7697a1 0x7fffea769f26 0x7fffea769e1f 0x7fffea76bc21 0x7fffea76e612 0x7fffea76bc0b 0x7fffea76078b 0x7fffea75fa5f 0x7fffea88742a 0x7fffea89ffa4 0x7fffea872162 0x7fffea8740e1 0x7fffea8715ef 0x7fffea8fcd4d 0x7fffea8facd1 0x7fffea8414b9 0x7fffea83f8a9 0x7fffeae04914 0x7fffeae05f39 0x7ffff7f68506 0x7ffff7f6f4d9
    [ 1421018 ] {} <Fatal> BaseDaemon: 4. gsignal @ 0x3bce5 in /usr/lib/libc-2.31.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 5. abort @ 0x25857 in /usr/lib/libc-2.31.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 6. _nl_load_domain.cold @ 0x25727 in /usr/lib/libc-2.31.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 7. ? @ 0x34426 in /usr/lib/libc-2.31.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 8. /ch-cmake/../dbms/src/Common/Exception.cpp:37: DB::Exception::Exception(...) @ 0x1c1094 in /ch-cmake/dbms/libclickhouse_common_iod.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 9. /ch-cmake/../dbms/src/Interpreters/interpretSubquery.cpp:52: DB::interpretSubquery() [clone .localalias] @ 0x1d831e6 in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 10. /ch-cmake/../dbms/src/Interpreters/interpretSubquery.cpp:28: DB::interpretSubquery(std::shared_ptr<DB::IAST> const&, DB::Context const&, unsigned long, std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > const&) @ 0x1d82f8b in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 11. /ch-cmake/../dbms/src/Interpreters/GlobalSubqueriesVisitor.h:100: DB::GlobalSubqueriesMatcher::Data::addExternalStorage(std::shared_ptr<DB::IAST>&, bool) @ 0x16d97a1 in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 12. /ch-cmake/../dbms/src/Interpreters/GlobalSubqueriesVisitor.h:170: DB::GlobalSubqueriesMatcher::visit(DB::ASTFunction&, std::shared_ptr<DB::IAST>&, DB::GlobalSubqueriesMatcher::Data&) @ 0x16d9f26 in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 13. /ch-cmake/../dbms/src/Interpreters/GlobalSubqueriesVisitor.h:151: DB::GlobalSubqueriesMatcher::visit(std::shared_ptr<DB::IAST>&, DB::GlobalSubqueriesMatcher::Data&) @ 0x16d9e1f in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 14. /ch-cmake/../dbms/src/Interpreters/InDepthNodeVisitor.h:27: DB::InDepthNodeVisitor<DB::GlobalSubqueriesMatcher, false, std::shared_ptr<DB::IAST> >::visit(std::shared_ptr<DB::IAST>&) @ 0x16dbc21 in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 15. /ch-cmake/../dbms/src/Interpreters/InDepthNodeVisitor.h:45: DB::InDepthNodeVisitor<DB::GlobalSubqueriesMatcher, false, std::shared_ptr<DB::IAST> >::visitChildren(std::shared_ptr<DB::IAST>&) @ 0x16de612 in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 16. /ch-cmake/../dbms/src/Interpreters/InDepthNodeVisitor.h:32: DB::InDepthNodeVisitor<DB::GlobalSubqueriesMatcher, false, std::shared_ptr<DB::IAST> >::visit(std::shared_ptr<DB::IAST>&) @ 0x16dbc0b in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 17. /ch-cmake/../dbms/src/Interpreters/ExpressionAnalyzer.cpp:265: DB::ExpressionAnalyzer::initGlobalSubqueriesAndExternalTables(bool) @ 0x16d078b in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 18. /ch-cmake/../dbms/src/Interpreters/ExpressionAnalyzer.cpp:140: DB::ExpressionAnalyzer::ExpressionAnalyzer(std::shared_ptr<DB::IAST> const&, std::shared_ptr<DB::SyntaxAnalyzerResult const> const&, DB::Context const&, unsigned long, bool) @ 0x16cfa5f in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 19. /ch-cmake/../dbms/src/Interpreters/ExpressionAnalyzer.h:235: DB::SelectQueryExpressionAnalyzer::SelectQueryExpressionAnalyzer() @ 0x17f742a in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 20. /usr/include/c++/9.3.0/bits/unique_ptr.h:857: std::_MakeUniq<DB::SelectQueryExpressionAnalyzer>::__single_object std::make_unique<..>(std::shared_ptr<DB::IAST>&, std::shared_ptr<DB::SyntaxAnalyzerResult const>&, DB::Context&, std::unordered_set<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::hash<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::equal_to<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >&&, bool&&, DB::SelectQueryOptions&) @ 0x180ffa4 in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 21. /ch-cmake/../dbms/src/Interpreters/InterpreterSelectQuery.cpp:320: DB::InterpreterSelectQuery::InterpreterSelectQuery(std::shared_ptr<DB::IAST> const&, DB::Context const&, std::shared_ptr<DB::IBlockInputStream> const&, std::optional<DB::Pipe>, std::shared_ptr<DB::IStorage> const&, DB::SelectQueryOptions const&, std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > const&)::{}::operator()(bool) const @ 0x17e2162 in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 22. /ch-cmake/../dbms/src/Interpreters/InterpreterSelectQuery.cpp:383: DB::InterpreterSelectQuery::InterpreterSelectQuery(...) @ 0x17e40e1 in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 23. /ch-cmake/../dbms/src/Interpreters/InterpreterSelectQuery.cpp:167: DB::InterpreterSelectQuery::InterpreterSelectQuery(...) @ 0x17e15ef in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 24. /usr/include/c++/9.3.0/bits/unique_ptr.h:857: std::_MakeUniq<DB::InterpreterSelectQuery>::__single_object std::make_unique<...>(...) @ 0x186cd4d in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 25. /ch-cmake/../dbms/src/Interpreters/InterpreterSelectWithUnionQuery.cpp:94: DB::InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(...) @ 0x186acd1 in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 26. /usr/include/c++/9.3.0/bits/unique_ptr.h:857: std::_MakeUniq<DB::InterpreterSelectWithUnionQuery>::__single_object std::make_unique<...>(std::shared_ptr<DB::IAST>&, DB::Context&, DB::SelectQueryOptions&&) @ 0x17b14b9 in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 27. /ch-cmake/../dbms/src/Interpreters/InterpreterFactory.cpp:100: DB::InterpreterFactory::get(std::shared_ptr<DB::IAST>&, DB::Context&, DB::QueryProcessingStage::Enum) @ 0x17af8a9 in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 28. /ch-cmake/../dbms/src/Interpreters/executeQuery.cpp:310: DB::executeQueryImpl(char const*, char const*, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer*, bool) @ 0x1d74914 in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 29. /ch-cmake/../dbms/src/Interpreters/executeQuery.cpp:578: DB::executeQuery(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool, bool) @ 0x1d75f39 in /ch-cmake/dbms/libclickhouse_interpretersd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 30. /ch-cmake/../dbms/programs/server/TCPHandler.cpp:249: DB::TCPHandler::runImpl() @ 0x23a506 in /ch-cmake/dbms/programs/server/libclickhouse-server-libd.so
    [ 1421018 ] {} <Fatal> BaseDaemon: 31. /ch-cmake/../dbms/programs/server/TCPHandler.cpp:1239: DB::TCPHandler::run() @ 0x2414d9 in /ch-cmake/dbms/programs/server/libclickhouse-server-libd.so
    Aborted (core dumped)
@azat
Copy link
Collaborator Author

azat commented Mar 30, 2020

memory-efficient aggregation won't work without remerging.

Nice catch! Fixed and covered!

…ributed

Error before this patch:
  Logical error: 'SortingAggregatedTransform already got bucket with number 115'.
@azat
Copy link
Collaborator Author

azat commented Mar 31, 2020

Test failures looks unrelated.

test_settings_constraints
01098_temporary_and_external_tables

@azat
Copy link
Collaborator Author

azat commented Mar 31, 2020

And seems that performance tests is unstable too (since it shows Performance 3 faster, 35 unstable, 5 errors in upstream/master), and plus this PR should not affect them, since it does not changes existing code paths.

@alexey-milovidov
Copy link
Member

I'm only afraid that some settings are ineffective when specified in SETTINGS clause (when they are applied before the SETTINGS clause being interpreted). Did you check that two level memory-efficient aggregation is active?

@alexey-milovidov
Copy link
Member

alexey-milovidov commented Apr 1, 2020

Yes, it is in effect (I checked server logs).

@alexey-milovidov
Copy link
Member

Now it looks like all cases are covered :)

@alexey-milovidov alexey-milovidov merged commit c7afc51 into ClickHouse:master Apr 1, 2020
@alexey-milovidov
Copy link
Member

@azat Maybe also grab integration tests from https://github.com/ClickHouse/ClickHouse/pull/8640/files just in case?

@azat azat deleted the dist-on-dist branch April 1, 2020 06:19
@azat
Copy link
Collaborator Author

azat commented Apr 1, 2020

@azat Maybe also grab integration tests from https://github.com/ClickHouse/ClickHouse/pull/8640/files just in case?

Sure, why not, will cherry pick it.

Now it looks like all cases are covered :)

Just found one issue left (the case when StorageDistributed::getQueryProcessingStage() returns Complete when there is only one shard to query), will get back soon

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pr-feature Pull request with new product feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants