Skip to content

Commit

Permalink
Backport #12315 to 20.5: Fix race condition in ReplicatedMergeTreeQue…
Browse files Browse the repository at this point in the history
…ue (#12385)

* Update SortDescription.h

* Update AggregateDescription.cpp

* add parseDateTimeBestEffortUS function (#12028)

* add function parseDateTimeBestEffortUS
* add test
* add doc

Co-authored-by: alexey-milovidov <milovidov@yandex-team.ru>

* Cleanup changelog (half done). Now it is acceptable #12104

* Cleanup changelog (half done). Now it is more acceptable #12104

* Cleanup changelog (half done). Now it is more acceptable #12104

* Cleanup changelog (half done). Additions requested by @filimonov #12104

* Remove underscore as word-break character. This partially reverts #11975

* Fix bad test number

* Added a test

* Change exception code from LOGICAL_ERROR to BAD_ARGUMENTS when the name of remote table is empty

* fix segfault with -StateResample combinators

* Fix bad code in redundant ORDER BY optimization #10067

* Add a test

* Support KILL QUERY [connection_id] for MySQL

* Fix error

* Autocomplete does not have to work in "Unbundled" build

* Fix transform query for external databases in presense of aliases #12032

* Whitespace

* Add MySQL to ClickHouse query replacement mapping table

* Style

* update ya.make

* Added function "hasThreadFuzzer"

* Fix test under ThreadFuzzer

* Test for issue #9088

ALTER DELETE unexpectedly deletes NULL rows

* Remove unused potentially dangerous function

* Fix mutations interpreter #9088

* Added a test

* Added another test just in case

* Added yet another test just in case

* Update 01358_mutation_delete_null_rows.sql

* Fix test

* Fix flaky test

* Attempt to fix flaky test 00721_force_by_identical_result_after_merge_zookeeper

* [anchore] more detailed Dockerfile scan reports (#12159)

* Update Dockerfile

* Update array-functions.md (#12130)

修改编码错误

* Add integration test for mysql replacement query

* Update array-functions.md (#12129)

修改编码错误

* fix ubsan final

* [docs] improve redirects destination

* Update SECURITY.md (#12161)

* Update Dockerfile

* Update questdb_sum_double.xml

* Place common docker compose files to integration docker container

* done

* add cluster() adn clusterAllReplicas() table functions description, add signatures to remoteSecure() table function (#12156)

* add cluster() adn clusterAllReplicas() table functions description, add signatures to remoteSecure() table function

Signed-off-by: Slach <bloodjazman@gmail.com>

* small russian fixes

Signed-off-by: Slach <bloodjazman@gmail.com>

* Update docs/en/sql-reference/table-functions/cluster.md

Co-authored-by: Ivan Blinkov <github@blinkov.ru>

* Update docs/en/sql-reference/table-functions/cluster.md

Co-authored-by: Ivan Blinkov <github@blinkov.ru>

* Update docs/en/sql-reference/table-functions/cluster.md

Co-authored-by: Ivan Blinkov <github@blinkov.ru>

* Update docs/en/sql-reference/table-functions/cluster.md

Co-authored-by: Ivan Blinkov <github@blinkov.ru>

* Update docs/en/sql-reference/table-functions/cluster.md

Co-authored-by: Ivan Blinkov <github@blinkov.ru>

* Update docs/en/sql-reference/table-functions/cluster.md

Co-authored-by: Ivan Blinkov <github@blinkov.ru>

Co-authored-by: Ivan Blinkov <github@blinkov.ru>

* Remove -v from ninja

* [docs] introduction for integration table engines (#12167)

* [docs] introduction for integration table engines

* Update jdbc.md

* Update odbc.md

* Update mysql.md

* Update kafka.md

* Update hdfs.md

* Fix #10437, CR fixes

* fix style

* add select final to test

* Same change for Kafka - just in case, and to make it conform.

* Fix dictGet with bad arguments during GROUP BY injective functions elimination

* Fix dictGet arguments check during GROUP BY injective functions elimination

This patch changes the place where the dictionary will be loaded (during
syntax analysis), but I guess this is fine, it will be loaded anyway.

Fixes: #10342

* [docs] add redirect from an introduction index page (#12176)

* [website] add apple-touch-icon (#12164)

* Use ENABLE_LIBRARIES option for AMQP-CPP

This is tiny fix, there are more problems that just this small little
bit.

* Set GOOGLETEST_VERSION for googletest

Otherwise cmake reports:

    CMake Warning at contrib/googletest/googletest/CMakeLists.txt:54 (project):
      VERSION keyword not followed by a value or was followed by a value that
      expanded to nothing.

(since GOOGLETEST_VERSION is set in contrib/googletest/CMakeLists.txt)

* Bump googletest to master (to fix gcc10 builds)

gcc10 reports:

    FAILED: src/CMakeFiles/unit_tests_dbms.dir/Columns/tests/gtest_column_unique.cpp.o
    <snip>
    ../contrib/googletest/googletest/include/gtest/gtest-printers.h:287:7: error: use of deleted function ‘std::basic_ostream<char, _Traits>& std::operator<<(std::basic_ostream<char, _Traits>&, char8_t) [with _Traits = std::char_traits<char>]’
      287 |   *os << value;
          |   ~~~~^~~~~~~~
    In file included from ../base/common/../common/StringRef.h:6,
                     from ../src/Columns/IColumn.h:7,
                     from ../src/Columns/IColumnUnique.h:2,
                     from ../src/Columns/ColumnUnique.h:2,
                     from ../src/Columns/tests/gtest_column_unique.cpp:1:
    /usr/include/c++/10.1.0/ostream:544:5: note: declared here
      544 |     operator<<(basic_ostream<char, _Traits>&, char8_t) = delete;
          |

* gtest_compressionCodec: is_trivial+is_standard_layout over deprecated is_pod

* gtest_compressionCodec: use fmt over boost::format

boost::format is not compiled under gcc10:

                     from ../src/Compression/tests/gtest_compressionCodec.cpp:14:
    /usr/include/boost/format/alt_sstream_impl.hpp: In instantiation of ‘boost::io::basic_altstringbuf<Ch, Tr, Alloc>::int_type boost::io::basic_altstringbuf<Ch, Tr, Alloc>::overflow(boost::io::basic_altstringbuf<Ch, Tr, Alloc>::int_type) [with Ch = char; Tr = std::char_traits<char>; Alloc = std::allocator<char>; boost::io::basic_altstringbuf<Ch, Tr, Alloc>::int_type = int]’:
    /usr/include/boost/format/alt_sstream_impl.hpp:227:9:   required from here
    /usr/include/boost/format/alt_sstream_impl.hpp:261:45: error: no matching function for call to ‘std::allocator<char>::allocate(std::size_t&, char*)’
      261 |                     newptr = alloc_.allocate(new_size, is_allocated_? oldptr : 0);

(although this is system-wide boost, it is pretty recent - 1.72)

* gtest_compressionCodec: fix lack of operator<< for char8_t

* gtest_weak_hash_32: fix lack of operator<< for char8_t

* Force CMP0022 for googletest (to avoid using LINK_INTERFACE_LIBRARIES(_<CONFIG>)?)

Othewise cmake reports:

    -- Configuring done
    CMake Warning (dev) in contrib/googletest/googletest/CMakeLists.txt:
      Policy CMP0022 is not set: INTERFACE_LINK_LIBRARIES defines the link
      interface.  Run "cmake --help-policy CMP0022" for policy details.  Use the
      cmake_policy command to set the policy and suppress this warning.

      Target "gtest" has an INTERFACE_LINK_LIBRARIES property which differs from
      its LINK_INTERFACE_LIBRARIES_DEBUG properties.

      INTERFACE_LINK_LIBRARIES:

        global-group;Threads::Threads

      LINK_INTERFACE_LIBRARIES_DEBUG:

        Threads::Threads

    This warning is for project developers.  Use -Wno-dev to suppress it.

    -- Generating done
    -- Build files have been written to: /src/ch/clickhouse/.cmake-tmp

* Set CMP0077 for re2

cmake reports:

    CMake Warning (dev) at contrib/re2/CMakeLists.txt:15 (option):
      Policy CMP0077 is not set: option() honors normal variables.  Run "cmake
      --help-policy CMP0077" for policy details.  Use the cmake_policy command to
      set the policy and suppress this warning.

      For compatibility with older versions of CMake, option is clearing the
      normal variable 'BUILD_SHARED_LIBS'.
    This warning is for project developers.  Use -Wno-dev to suppress it.

* Better assert

* Don't split dictionary source's table name into schema and table name itself
if ODBC driver doesn't support schema.

* Sync reference file with changes in sql file

* [docs] introduction for third-party interfaces (#12175)

* [docs] introduction for third-party interfaces

* Update index.md

* Update index.md

* [docs] introduction for special table engines (#12170)

* [docs] introduction for integration table engines

* Update jdbc.md

* Update odbc.md

* Update mysql.md

* Update kafka.md

* Update hdfs.md

* [docs] introduction for special table engines

* Update index.md

* Update index.md

* Cap max_memory_usage* limits to the process resident memory

There are still some issues with memory tracking, but now with per-user
tracking:

    executeQuery: Code: 241, e.displayText() = DB::Exception: Memory limit (for user) exceeded: would use 437.72 GiB (attempt to allocate chunk of 4200926 bytes), maximum: 437.72 GiB (version 20.6.1.1) (from 10.7.140.7:31318)

Although the server is mostly idle:

    SELECT formatReadableSize(memory_usage)
    FROM system.processes

    ┌─formatReadableSize(memory_usage)─┐
    │ 289.28 MiB                       │
    │ 155.75 MiB                       │
    │ 0.00 B                           │
    └──────────────────────────────────┘

Refs: https://github.com/ClickHouse/ClickHouse/pull/10496/files#r450206865
Cc: @alexey-milovidov

* gtest_compressionCodec: suppress non instantiated gtest warning

gtest reports:

    [ RUN      ] GoogleTestVerification.UninstantiatedParameterizedTestSuite<CodecTestPerformance>
    ../src/Compression/tests/gtest_compressionCodec.cpp:590: Failure
    Parameterized test suite CodecTestPerformance is defined via TEST_P, but never instantiated. None of the test cases will run. Either no INSTANTIATE_TEST_SUITE_P is provided or the only ones provided expand to nothing.

    Ideally, TEST_P definitions should only ever be included as part of binaries that intend to use them. (As opposed to, for example, being placed in a library that may be linked in to get other utilities.)

    To suppress this error for this test suite, insert the following line (in a non-header) in the namespace it is defined in:

    GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(CodecTestPerformance);
    [  FAILED  ] GoogleTestVerification.UninstantiatedParameterizedTestSuite<CodecTestPerformance> (0 ms)

* Fix tests.

* split

* Do not try to adjust memory tracker amount if it is not larger then in total

* A test for UInt8 as bool

* Included const uint8 values in test

* changelog fixes

* remove questionable functionality

* Tests for fixed issues #10846 and #7347

* Simple (and fast) inplace fix for UInt8 -> bool

* style fix for #12152

* Added test for #3767

* Update zh kafka.md title (#12192)

* Update index.md (#12191)

Fix merge link broken

* changelog fixes

* done

* DOCSUP-1348 Russian translation for new functions (#133) (#12194)

* Russian translation for new functions

* Apply suggestions from code review

Co-authored-by: BayoNet <da-daos@yandex.ru>

* Minor updates to russian text.

Co-authored-by: Olga Revyakina <revolg@yandex-team.ru>
Co-authored-by: BayoNet <da-daos@yandex.ru>

Co-authored-by: Sergei Shtykov <bayonet@yandex-team.ru>
Co-authored-by: olgarev <56617294+olgarev@users.noreply.github.com>
Co-authored-by: Olga Revyakina <revolg@yandex-team.ru>

* Add runner for testflows

* fixes

* No color

* throw exception on redirect limit in S3 request

* fix test

* [docs] add intrdocution for commercial page (#12187)

* DOCS-647: toStartOfSecond (#12190)

* DOCSUP-1120 Documentation for the toStartOfSecond function (#131)

* Doc toStartOfSecond function

* Update docs/en/sql-reference/functions/date-time-functions.md

Co-authored-by: BayoNet <da-daos@yandex.ru>

* Update docs/en/sql-reference/functions/date-time-functions.md

Co-authored-by: BayoNet <da-daos@yandex.ru>

* Minor update for english text, russian translation added.

Co-authored-by: Olga Revyakina <revolg@yandex-team.ru>
Co-authored-by: BayoNet <da-daos@yandex.ru>

* CLICKHOUSEDOCS-647: Minor text edits.

* Update docs/en/sql-reference/functions/date-time-functions.md

* Update docs/en/sql-reference/functions/date-time-functions.md

Co-authored-by: olgarev <56617294+olgarev@users.noreply.github.com>
Co-authored-by: Olga Revyakina <revolg@yandex-team.ru>
Co-authored-by: Sergei Shtykov <bayonet@yandex-team.ru>
Co-authored-by: Ivan Blinkov <github@blinkov.ru>

* [docs] refactor Domains overview (#12186)

* Add to images.json

* Sanitize LINK_LIBRARIES property for the directories (#12160)

When you will try to link target with the directory (that exists), cmake will
skip this without an error, only the following warning will be reported:

    target_link_libraries(main /tmp)

    WARNING: Target "main" requests linking to directory "/tmp".  Targets may link only to libraries.  CMake is dropping the item.

And there is no cmake policy that controls this.
(I guess the reason that it is allowed is because of FRAMEWORK for OSX).

So to avoid error-prone cmake rules, this can be sanitized.
There are the following ways:
- overwrite target_link_libraries()/link_libraries() and check *before*
  calling real macro, but this requires duplicate all supported syntax
  -- too complex
- overwrite target_link_libraries() and check LINK_LIBRARIES property, this
  works great
  -- but cannot be used with link_libraries()
- use BUILDSYSTEM_TARGETS property to get list of all targets and sanitize
  -- this will work.

I also tested it with the following patch:

    $ git di
    diff --git a/base/daemon/CMakeLists.txt b/base/daemon/CMakeLists.txt
    index 26d59a57e7..35e6ff6432 100644
    --- a/base/daemon/CMakeLists.txt
    +++ b/base/daemon/CMakeLists.txt
    @@ -9,4 +9,5 @@ target_link_libraries (daemon PUBLIC loggers PRIVATE clickhouse_common_io clickh

     if (USE_SENTRY)
         target_link_libraries (daemon PRIVATE ${SENTRY_LIBRARY})
    +    target_link_libraries (daemon PRIVATE /tmp)
     endif ()

And it works:

    CMake Error at cmake/sanitize_target_link_libraries.cmake:48 (message):
       daemon requested to link with directory: /tmp
    Call Stack (most recent call first):
      cmake/sanitize_target_link_libraries.cmake:55 (sanitize_link_libraries)
      CMakeLists.txt:425 (include)

Refs: #12041

* cleanup

* cleanup

* Revert "Run perf tests with memory sampling (for allocations >1M)"

* too slow

* style

* [docs] add intrdocution for statements page (#12189)

* [docs] add intrdocution for statements page

* Update index.md

* Update index.md

* Update README.md

* Update README.md

* Update README.md

* AMQP requires libuv

Otherwise fails:

    FAILED: src/CMakeFiles/dbms.dir/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp.o
    ...
    In file included from ../src/Storages/RabbitMQ/RabbitMQHandler.h:9,
                     from ../src/Storages/RabbitMQ/StorageRabbitMQ.h:11,
                     from ../src/Storages/RabbitMQ/RabbitMQBlockInputStream.h:5,
                     from ../src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp:3:
    ../contrib/AMQP-CPP/include/amqpcpp/libuv.h:22:10: fatal error: uv.h: No such file or directory

* Rename ident to indent.

* Bump arrow to 0.17 (and flatbuffers to v1.12, required by arrow)

MOTIVATION:
- remove double-conversion external dependency
- remove flatc (but flatbuffers is still required, arrow just shipped
  with generated files and that's it)

CHANGED:
- remove pre-generated headers, it is shipped with the arrow
- remove flatc (see above)

NOTES (see tests changes):
- and snappy error is reported as unsupported compression.

* Fix minor issues after #12196

* Reset CurrentMetrics::MemoryTracking periodically to the process RSS

* Allow isInjective() with empty block (is function injective with any arguments)

Since most of the time function will ignore it anyway, and creating
arguments just for checking is function injective or not is overkill

* Implement getLeastSuperType for LowCardinality #8212

* Implement supertype for LowCardinality

* Whitespace

* Update test

* Added a test for PaddedPODArray just in case

* Add documentation for arrayFill

* Fix ugly ugliness

* Added a test

* Non-significant changes

* Update README.md

* Fix skip lists for old branches

* Update int_parsing.xml

* Update build.sh

* Fixes

* Update packager

* Hide nonzero error code on testflows runner

* fix builds

* Show error after TrieDictionary failed to load.

* try remove strange logic in DuplicateOrderByVisitor (#12267)

* Update README.md

* Fix typo in setting name

* Update README.md

* Fix typo in setting name

* Add tests to skip list

* Update README.md

* Update README.md

* Rerun tests

* Ignore testflows exit code

* More skip checks

* get hostname without mutex

* Add more tests to skip

* Bump CI (after non-restartable inner CI issue)

This reverts commit d199961.

* Added a test

* Fix strange code CC @Enmk. Prove: g++ -xc++ -include vector - <<< 'int main() { return std::vector<const char>{1, 2, 3}.size(); }'

* fix TTL after renaming column

* Fix ORC build (#12258)

* first try

* change submodule

* Update .gitmodules

* include build directory

* Update .gitmodules

Co-authored-by: Nikita Mikhailov <jakalletti@jakalletti-build.sas.yp-c.yandex.net>

* Fix filtering by virtual columns #12166

* Added a test

* fix order of columns in WITH FILL modifier

* Fix "Arcadia" build

* Log sanitizer trap messages from separate thread

* Revert strange file

* Tested with "trap" function

* Support MySQL 'SELECT DATABASE()' query replacement

* Fix race condition in ReplicatedMergeTreeQueue

* Set CMAKE_POLICY_DEFAULT_CMP0022/CMAKE_POLICY_DEFAULT_CMP0077 globally

This will fix CMAKE_POLICY_DEFAULT_CMP0077 for snappy:

    CMake Warning (dev) at contrib/snappy/CMakeLists.txt:11 (option):
      Policy CMP0077 is not set: option() honors normal variables.  Run
    "cmake
      --help-policy CMP0077" for policy details.  Use the cmake_policy
    command to
      set the policy and suppress this warning.

      For compatibility with older versions of CMake, option is clearing the
      normal variable 'BUILD_SHARED_LIBS'.
    This warning is for project developers.  Use -Wno-dev to suppress it.

* Update config.h for arrow

Yes ARROW_FULL_SO_VERSION/ARROW_SO_VERSION is empty right now, like
other version variables (ARROW_VERSION_*)

Refs: #12181

* Fix jemalloc enabled detection (should goes after contrib inclusion)

* Warn if jemalloc is not enabled for non-linux too

Refs: #11897 (osx)
Refs: #11774 (freebsd)

* DOCS-679: netloc function (#12321)

* DOCSUP-1377 (netloc function) (#135)

* add EN description

* changes in query

* changes after review

* add RU description

* CLICKHOUSEDOCS-679: Text fixes.

Co-authored-by: Sergei Shtykov <bayonet@yandex-team.ru>
Co-authored-by: emironyuk <em@don.ru>
Co-authored-by: Evgenia Sudarikova <56156889+otrazhenia@users.noreply.github.com>

* add docker image for fuzzer

* fixes in fuzzer docker image

* [docs] engine family introduction refactoring (#12268)

* base refactoring

* adjust links

* Update index.md

* Implemented single part uploads for DiskS3 (#12026)

* Implemented single part uploads for DiskS3.
* Added `min_multi_part_upload_size` to disk configuration.

* Rename test 01378

* Add integration test for mysql replacement query

* Fix obvious race condition in test

* [docs] split various kinds of CREATE queries into separate articles (#12328)

* normalize

* split & adjust links

* re-normalize

* adjust ru links

* adjust ja/tr links

* partially apply e0d19d2

* reset contribs

* Update skip_list.json

* fuzzer container fix

* fuzzer docker image

* Fix error

* Miscellaneous

Co-authored-by: alexey-milovidov <milovidov@yandex-team.ru>
Co-authored-by: flynn <fenglv15@mails.ucas.ac.cn>
Co-authored-by: Anton Popov <pad11rus@gmail.com>
Co-authored-by: BohuTANG <overred.shuttler@gmail.com>
Co-authored-by: Guillaume Tassery <gtassery@partners.accedian.com>
Co-authored-by: Mikhail Filimonov <mfilimonov@altinity.com>
Co-authored-by: Yatsishin Ilya <2159081+qoega@users.noreply.github.com>
Co-authored-by: Ivan Blinkov <github@blinkov.ru>
Co-authored-by: yhgcn <yhg_cn@163.com>
Co-authored-by: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com>
Co-authored-by: Nikita Mikhaylov <mikhaylovnikitka@gmail.com>
Co-authored-by: alesapin <alesapin@gmail.com>
Co-authored-by: Eugene Klimov <bloodjazman@gmail.com>
Co-authored-by: Nikita Mikhailov <jakalletti@jakalletti-build.sas.yp-c.yandex.net>
Co-authored-by: Azat Khuzhin <a3at.mail@gmail.com>
Co-authored-by: Vitaly Baranov <vitbar@yandex-team.ru>
Co-authored-by: Alexander Kazakov <Akazz@users.noreply.github.com>
Co-authored-by: Nikolai Kochetov <nik-kochetov@yandex-team.ru>
Co-authored-by: tavplubix <avtokmakov@yandex-team.ru>
Co-authored-by: Alexander Kuzmenkov <akuzm@yandex-team.ru>
Co-authored-by: BayoNet <da-daos@yandex.ru>
Co-authored-by: Sergei Shtykov <bayonet@yandex-team.ru>
Co-authored-by: olgarev <56617294+olgarev@users.noreply.github.com>
Co-authored-by: Olga Revyakina <revolg@yandex-team.ru>
Co-authored-by: Anton Ivashkin <iantonspb@yandex-team.ru>
Co-authored-by: Artem Zuikov <chertus@gmail.com>
Co-authored-by: emironyuk <em@don.ru>
Co-authored-by: Evgenia Sudarikova <56156889+otrazhenia@users.noreply.github.com>
Co-authored-by: Vladimir Chebotarev <vladimir.chebotarev@gmail.com>
  • Loading branch information
1 parent 4dcb467 commit f953b5d
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 31 deletions.
26 changes: 12 additions & 14 deletions src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,18 @@ ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree &
, format_version(storage.format_version)
, current_parts(format_version)
, virtual_parts(format_version)
{}
{
zookeeper_path = storage.zookeeper_path;
replica_path = storage.replica_path;
logger_name = storage.getStorageID().getFullTableName() + " (ReplicatedMergeTreeQueue)";
log = &Poco::Logger::get(logger_name);
}


void ReplicatedMergeTreeQueue::initialize(const MergeTreeData::DataParts & parts)
{
addVirtualParts(parts);
}


void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts & parts)
Expand Down Expand Up @@ -109,19 +120,6 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
}


void ReplicatedMergeTreeQueue::initialize(
const String & zookeeper_path_, const String & replica_path_, const String & logger_name_,
const MergeTreeData::DataParts & parts)
{
zookeeper_path = zookeeper_path_;
replica_path = replica_path_;
logger_name = logger_name_;
log = &Poco::Logger::get(logger_name);

addVirtualParts(parts);
}


void ReplicatedMergeTreeQueue::insertUnlocked(
const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
std::lock_guard<std::mutex> & state_lock)
Expand Down
7 changes: 3 additions & 4 deletions src/Storages/MergeTree/ReplicatedMergeTreeQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ class ReplicatedMergeTreeQueue
void notifySubscribers(size_t new_queue_size);

/// Check that entry_ptr is REPLACE_RANGE entry and can be removed from queue because current entry covers it
bool checkReplaceRangeCanBeRemoved(const MergeTreePartInfo & part_info, const LogEntryPtr entry_ptr, const ReplicatedMergeTreeLogEntryData & current) const;
bool checkReplaceRangeCanBeRemoved(
const MergeTreePartInfo & part_info, const LogEntryPtr entry_ptr, const ReplicatedMergeTreeLogEntryData & current) const;

/// Ensures that only one thread is simultaneously updating mutations.
std::mutex update_mutations_mutex;
Expand Down Expand Up @@ -251,12 +252,10 @@ class ReplicatedMergeTreeQueue

public:
ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_);

~ReplicatedMergeTreeQueue();


void initialize(const String & zookeeper_path_, const String & replica_path_, const String & logger_name_,
const MergeTreeData::DataParts & parts);
void initialize(const MergeTreeData::DataParts & parts);

/** Inserts an action to the end of the queue.
* To restore broken parts during operation.
Expand Down
29 changes: 16 additions & 13 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,18 @@ zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeper() const
}


static std::string normalizeZooKeeperPath(std::string zookeeper_path)
{
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
zookeeper_path.resize(zookeeper_path.size() - 1);
/// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
if (!zookeeper_path.empty() && zookeeper_path.front() != '/')
zookeeper_path = "/" + zookeeper_path;

return zookeeper_path;
}


StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
Expand All @@ -175,8 +187,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
true, /// require_part_metadata
attach,
[this] (const std::string & name) { enqueuePartForCheck(name); })
, zookeeper_path(global_context.getMacros()->expand(zookeeper_path_, table_id_.database_name, table_id_.table_name))
, zookeeper_path(normalizeZooKeeperPath(global_context.getMacros()->expand(zookeeper_path_, table_id_.database_name, table_id_.table_name)))
, replica_name(global_context.getMacros()->expand(replica_name_, table_id_.database_name, table_id_.table_name))
, replica_path(zookeeper_path + "/replicas/" + replica_name)
, reader(*this)
, writer(*this)
, merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads())
Expand All @@ -186,13 +199,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, part_check_thread(*this)
, restarting_thread(*this)
{
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
zookeeper_path.resize(zookeeper_path.size() - 1);
/// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
if (!zookeeper_path.empty() && zookeeper_path.front() != '/')
zookeeper_path = "/" + zookeeper_path;
replica_path = zookeeper_path + "/replicas/" + replica_name;

queue_updating_task = global_context.getSchedulePool().createTask(
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::queueUpdatingTask)", [this]{ queueUpdatingTask(); });

Expand All @@ -201,6 +207,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(

merge_selecting_task = global_context.getSchedulePool().createTask(
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mergeSelectingTask)", [this] { mergeSelectingTask(); });

/// Will be activated if we win leader election.
merge_selecting_task->deactivate();

Expand Down Expand Up @@ -1416,7 +1423,6 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
}
}


MergeTreePartInfo new_part_info = MergeTreePartInfo::fromPartName(
entry.new_part_name, format_version);
MutationCommands commands = queue.getMutationCommands(source_part, new_part_info.mutation);
Expand Down Expand Up @@ -3219,10 +3225,7 @@ void StorageReplicatedMergeTree::startup()

try
{
queue.initialize(
zookeeper_path, replica_path,
getStorageID().getFullTableName() + " (ReplicatedMergeTreeQueue)",
getDataParts());
queue.initialize(getDataParts());

data_parts_exchange_endpoint = std::make_shared<DataPartsExchange::Service>(*this);
global_context.getInterserverIOHandler().addEndpoint(data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint);
Expand Down

0 comments on commit f953b5d

Please sign in to comment.