Skip to content

Commit

Permalink
Merge pull request #51720 from arenadata/ADQM-970
Browse files Browse the repository at this point in the history
Added a crash log flush to the disk after the unexpected crash.
  • Loading branch information
hanfei1991 committed Jul 13, 2023
2 parents 9df928e + 7b4e7cd commit f3684f7
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 19 deletions.
47 changes: 29 additions & 18 deletions src/Common/SystemLogBase.cpp
Expand Up @@ -136,14 +136,40 @@ void SystemLogBase<LogElement>::add(const LogElement & element)

template <typename LogElement>
void SystemLogBase<LogElement>::flush(bool force)
{
uint64_t this_thread_requested_offset = notifyFlushImpl(force);
if (this_thread_requested_offset == uint64_t(-1))
return;

// Use an arbitrary timeout to avoid endless waiting. 60s proved to be
// too fast for our parallel functional tests, probably because they
// heavily load the disk.
const int timeout_seconds = 180;
std::unique_lock lock(mutex);
bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds), [&]
{
return flushed_up_to >= this_thread_requested_offset && !is_force_prepare_tables;
});

if (!result)
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout exceeded ({} s) while flushing system log '{}'.",
toString(timeout_seconds), demangle(typeid(*this).name()));
}
}

template <typename LogElement>
void SystemLogBase<LogElement>::notifyFlush(bool force) { notifyFlushImpl(force); }

template <typename LogElement>
uint64_t SystemLogBase<LogElement>::notifyFlushImpl(bool force)
{
uint64_t this_thread_requested_offset;

{
std::lock_guard lock(mutex);

if (is_shutdown)
return;
return uint64_t(-1);

this_thread_requested_offset = queue_front_index + queue.size();

Expand All @@ -156,22 +182,7 @@ void SystemLogBase<LogElement>::flush(bool force)
}

LOG_DEBUG(log, "Requested flush up to offset {}", this_thread_requested_offset);

// Use an arbitrary timeout to avoid endless waiting. 60s proved to be
// too fast for our parallel functional tests, probably because they
// heavily load the disk.
const int timeout_seconds = 180;
std::unique_lock lock(mutex);
bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds), [&]
{
return flushed_up_to >= this_thread_requested_offset && !is_force_prepare_tables;
});

if (!result)
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout exceeded ({} s) while flushing system log '{}'.",
toString(timeout_seconds), demangle(typeid(*this).name()));
}
return this_thread_requested_offset;
}

#define INSTANTIATE_SYSTEM_LOG_BASE(ELEMENT) template class SystemLogBase<ELEMENT>;
Expand Down
9 changes: 8 additions & 1 deletion src/Common/SystemLogBase.h
Expand Up @@ -87,9 +87,12 @@ class SystemLogBase : public ISystemLog
*/
void add(const LogElement & element);

/// Flush data in the buffer to disk
/// Flush data in the buffer to disk. Block the thread until the data is stored on disk.
void flush(bool force) override;

/// Non-blocking flush data in the buffer to disk.
void notifyFlush(bool force);

String getName() const override { return LogElement::name(); }

static const char * getDefaultOrderBy() { return "event_date, event_time"; }
Expand All @@ -112,6 +115,10 @@ class SystemLogBase : public ISystemLog
uint64_t flushed_up_to = 0;
// Logged overflow message at this queue front index
uint64_t logged_queue_full_at_index = -1;

private:
uint64_t notifyFlushImpl(bool force);

};

}
3 changes: 3 additions & 0 deletions src/Daemon/BaseDaemon.cpp
Expand Up @@ -173,6 +173,9 @@ static void signalHandler(int sig, siginfo_t * info, void * context)
/// This coarse method of synchronization is perfectly ok for fatal signals.
sleepForSeconds(1);
}

/// Wait for all logs flush operations
sleepForSeconds(3);
call_default_signal_handler(sig);
}

Expand Down
3 changes: 3 additions & 0 deletions src/Interpreters/CrashLog.cpp
Expand Up @@ -84,5 +84,8 @@ void collectCrashLog(Int32 signal, UInt64 thread_id, const String & query_id, co

CrashLogElement element{static_cast<time_t>(time / 1000000000), time, signal, thread_id, query_id, trace, trace_full};
crash_log_owned->add(element);
/// Notify savingThreadFunction to start flushing crash log
/// Crash log is storing in parallel with the signal processing thread.
crash_log_owned->notifyFlush(true);
}
}
Empty file.
57 changes: 57 additions & 0 deletions tests/integration/test_crash_log/test.py
@@ -0,0 +1,57 @@
import os
import time
import pytest

import helpers.cluster
import helpers.test_tools

SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))


@pytest.fixture(scope="module")
def started_node():
cluster = helpers.cluster.ClickHouseCluster(__file__)
try:
node = cluster.add_instance("node", stay_alive=True)

cluster.start()
yield node
finally:
cluster.shutdown()


def send_signal(started_node, signal):
started_node.exec_in_container(
["bash", "-c", f"pkill -{signal} clickhouse"], user="root"
)


def wait_for_clickhouse_stop(started_node):
result = None
for attempt in range(60):
time.sleep(1)
pid = started_node.get_process_pid("clickhouse")
if pid is None:
result = "OK"
break
assert result == "OK", "ClickHouse process is still running"


def test_pkill(started_node):
if (
started_node.is_built_with_thread_sanitizer()
or started_node.is_built_with_address_sanitizer()
or started_node.is_built_with_memory_sanitizer()
):
pytest.skip("doesn't fit in timeouts for stacktrace generation")

crashes_count = 0
for signal in ["SEGV", "4"]:
send_signal(started_node, signal)
wait_for_clickhouse_stop(started_node)
started_node.restart_clickhouse()
crashes_count += 1
assert (
started_node.query("SELECT COUNT(*) FROM system.crash_log")
== f"{crashes_count}\n"
)

0 comments on commit f3684f7

Please sign in to comment.