Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: Flush async insert queue first on shutdown #53547

Merged
merged 3 commits into from Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Interpreters/Context.cpp
Expand Up @@ -479,6 +479,9 @@ struct ContextSharedPart : boost::noncopyable
return;
shutdown_called = true;

/// Need to flush the async insert queue before shutting down the database catalog
async_insert_queue.reset();

/// Stop periodic reloading of the configuration files.
/// This must be done first because otherwise the reloading may pass a changed config
/// to some destroyed parts of ContextSharedPart.
Expand Down
18 changes: 16 additions & 2 deletions tests/integration/test_restart_server/test.py
Expand Up @@ -5,7 +5,7 @@
node = cluster.add_instance("node", stay_alive=True)


@pytest.fixture(scope="module")
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
Expand All @@ -14,9 +14,23 @@ def start_cluster():
cluster.shutdown()


def test_drop_memory_database(start_cluster):
def test_drop_memory_database():
node.query("CREATE DATABASE test ENGINE Memory")
node.query("CREATE TABLE test.test_table(a String) ENGINE Memory")
node.query("DROP DATABASE test")
node.restart_clickhouse(kill=True)
assert node.query("SHOW DATABASES LIKE 'test'").strip() == ""


def test_flushes_async_insert_queue():
node.query(
"""
CREATE TABLE flush_test (a String, b UInt64) ENGINE = MergeTree ORDER BY a;
SET async_insert = 1;
SET wait_for_async_insert = 0;
SET async_insert_busy_timeout_ms = 1000000;
INSERT INTO flush_test VALUES ('world', 23456);
"""
)
node.restart_clickhouse()
assert node.query("SELECT * FROM flush_test") == "world\t23456\n"