diff --git a/src/Common/SystemLogBase.cpp b/src/Common/SystemLogBase.cpp index 86adcbbd31b2..5e9ee9a1e04a 100644 --- a/src/Common/SystemLogBase.cpp +++ b/src/Common/SystemLogBase.cpp @@ -136,14 +136,40 @@ void SystemLogBase::add(const LogElement & element) template void SystemLogBase::flush(bool force) +{ + uint64_t this_thread_requested_offset = notifyFlushImpl(force); + if (this_thread_requested_offset == uint64_t(-1)) + return; + + // Use an arbitrary timeout to avoid endless waiting. 60s proved to be + // too fast for our parallel functional tests, probably because they + // heavily load the disk. + const int timeout_seconds = 180; + std::unique_lock lock(mutex); + bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds), [&] + { + return flushed_up_to >= this_thread_requested_offset && !is_force_prepare_tables; + }); + + if (!result) + { + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout exceeded ({} s) while flushing system log '{}'.", + toString(timeout_seconds), demangle(typeid(*this).name())); + } +} + +template +void SystemLogBase::notifyFlush(bool force) { notifyFlushImpl(force); } + +template +uint64_t SystemLogBase::notifyFlushImpl(bool force) { uint64_t this_thread_requested_offset; { std::lock_guard lock(mutex); - if (is_shutdown) - return; + return uint64_t(-1); this_thread_requested_offset = queue_front_index + queue.size(); @@ -156,22 +182,7 @@ void SystemLogBase::flush(bool force) } LOG_DEBUG(log, "Requested flush up to offset {}", this_thread_requested_offset); - - // Use an arbitrary timeout to avoid endless waiting. 60s proved to be - // too fast for our parallel functional tests, probably because they - // heavily load the disk. - const int timeout_seconds = 180; - std::unique_lock lock(mutex); - bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds), [&] - { - return flushed_up_to >= this_thread_requested_offset && !is_force_prepare_tables; - }); - - if (!result) - { - throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout exceeded ({} s) while flushing system log '{}'.", - toString(timeout_seconds), demangle(typeid(*this).name())); - } + return this_thread_requested_offset; } #define INSTANTIATE_SYSTEM_LOG_BASE(ELEMENT) template class SystemLogBase; diff --git a/src/Common/SystemLogBase.h b/src/Common/SystemLogBase.h index f8febd8b1595..92409028c22f 100644 --- a/src/Common/SystemLogBase.h +++ b/src/Common/SystemLogBase.h @@ -87,9 +87,12 @@ class SystemLogBase : public ISystemLog */ void add(const LogElement & element); - /// Flush data in the buffer to disk + /// Flush data in the buffer to disk. Block the thread until the data is stored on disk. void flush(bool force) override; + /// Non-blocking flush data in the buffer to disk. + void notifyFlush(bool force); + String getName() const override { return LogElement::name(); } static const char * getDefaultOrderBy() { return "event_date, event_time"; } @@ -112,6 +115,10 @@ class SystemLogBase : public ISystemLog uint64_t flushed_up_to = 0; // Logged overflow message at this queue front index uint64_t logged_queue_full_at_index = -1; + +private: + uint64_t notifyFlushImpl(bool force); + }; } diff --git a/src/Daemon/BaseDaemon.cpp b/src/Daemon/BaseDaemon.cpp index 319d2bc8b5ba..3852ec5ada52 100644 --- a/src/Daemon/BaseDaemon.cpp +++ b/src/Daemon/BaseDaemon.cpp @@ -173,6 +173,9 @@ static void signalHandler(int sig, siginfo_t * info, void * context) /// This coarse method of synchronization is perfectly ok for fatal signals. sleepForSeconds(1); } + + /// Wait for all logs flush operations + sleepForSeconds(3); call_default_signal_handler(sig); } diff --git a/src/Interpreters/CrashLog.cpp b/src/Interpreters/CrashLog.cpp index 08c08ffecd10..379c9122cc81 100644 --- a/src/Interpreters/CrashLog.cpp +++ b/src/Interpreters/CrashLog.cpp @@ -84,5 +84,8 @@ void collectCrashLog(Int32 signal, UInt64 thread_id, const String & query_id, co CrashLogElement element{static_cast(time / 1000000000), time, signal, thread_id, query_id, trace, trace_full}; crash_log_owned->add(element); + /// Notify savingThreadFunction to start flushing crash log + /// Crash log is storing in parallel with the signal processing thread. + crash_log_owned->notifyFlush(true); } } diff --git a/tests/integration/test_crash_log/__init__.py b/tests/integration/test_crash_log/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_crash_log/test.py b/tests/integration/test_crash_log/test.py new file mode 100644 index 000000000000..9f6eca794b1f --- /dev/null +++ b/tests/integration/test_crash_log/test.py @@ -0,0 +1,57 @@ +import os +import time +import pytest + +import helpers.cluster +import helpers.test_tools + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) + + +@pytest.fixture(scope="module") +def started_node(): + cluster = helpers.cluster.ClickHouseCluster(__file__) + try: + node = cluster.add_instance("node", stay_alive=True) + + cluster.start() + yield node + finally: + cluster.shutdown() + + +def send_signal(started_node, signal): + started_node.exec_in_container( + ["bash", "-c", f"pkill -{signal} clickhouse"], user="root" + ) + + +def wait_for_clickhouse_stop(started_node): + result = None + for attempt in range(60): + time.sleep(1) + pid = started_node.get_process_pid("clickhouse") + if pid is None: + result = "OK" + break + assert result == "OK", "ClickHouse process is still running" + + +def test_pkill(started_node): + if ( + started_node.is_built_with_thread_sanitizer() + or started_node.is_built_with_address_sanitizer() + or started_node.is_built_with_memory_sanitizer() + ): + pytest.skip("doesn't fit in timeouts for stacktrace generation") + + crashes_count = 0 + for signal in ["SEGV", "4"]: + send_signal(started_node, signal) + wait_for_clickhouse_stop(started_node) + started_node.restart_clickhouse() + crashes_count += 1 + assert ( + started_node.query("SELECT COUNT(*) FROM system.crash_log") + == f"{crashes_count}\n" + )