Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Follow up to #60452 #61954

Merged
merged 5 commits into from Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Interpreters/MutationsInterpreter.cpp
Expand Up @@ -409,12 +409,13 @@ MutationsInterpreter::MutationsInterpreter(
, available_columns(std::move(available_columns_))
, settings(std::move(settings_))
, select_limits(SelectQueryOptions().analyze(!settings.can_execute).ignoreLimits())
, logger(getLogger("MutationsInterpreter"))
kssenii marked this conversation as resolved.
Show resolved Hide resolved
{
auto new_context = Context::createCopy(context_);
if (new_context->getSettingsRef().allow_experimental_analyzer)
{
new_context->setSetting("allow_experimental_analyzer", false);
LOG_DEBUG(&Poco::Logger::get("MutationsInterpreter"), "Will use old analyzer to prepare mutation");
LOG_DEBUG(logger, "Will use old analyzer to prepare mutation");
}
context = std::move(new_context);

Expand Down Expand Up @@ -997,6 +998,7 @@ void MutationsInterpreter::prepare(bool dry_run)
/// Always rebuild broken projections.
if (source.hasBrokenProjection(projection.name))
{
LOG_DEBUG(logger, "Will rebuild broken projection {}", projection.name);
materialized_projections.insert(projection.name);
continue;
}
Expand Down
2 changes: 2 additions & 0 deletions src/Interpreters/MutationsInterpreter.h
Expand Up @@ -175,6 +175,8 @@ class MutationsInterpreter
Settings settings;
SelectQueryOptions select_limits;

LoggerPtr logger;

/// A sequence of mutation commands is executed as a sequence of stages. Each stage consists of several
/// filters, followed by updating values of some columns. Commands can reuse expressions calculated by the
/// previous commands in the same stage, but at the end of each stage intermediate columns are thrown away
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Expand Up @@ -805,8 +805,8 @@ void IMergeTreeDataPart::loadProjections(bool require_columns_checksums, bool ch
throw;

auto message = getCurrentExceptionMessage(true);
LOG_ERROR(&Poco::Logger::get("IMergeTreeDataPart"),
"Cannot load projection {}, will consider it broken. Reason: {}", projection.name, message);
LOG_WARNING(storage.log, "Cannot load projection {}, "
"will consider it broken. Reason: {}", projection.name, message);

has_broken_projection = true;
part->setBrokenReason(message, getCurrentExceptionCode());
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
Expand Up @@ -386,12 +386,12 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St
throw;

PreformattedMessage message;
if (is_broken_projection)
if (is_broken_projection && throw_on_broken_projection)
{
WriteBufferFromOwnString wb;
message = PreformattedMessage::create(
"Part {} has a broken projections. It will be ignored. Broken projections info: {}",
part_name, getCurrentExceptionMessage(false));
part_name, getCurrentExceptionMessage(true));
LOG_DEBUG(log, message);
result.action = ReplicatedCheckResult::DoNothing;
}
Expand Down
23 changes: 11 additions & 12 deletions src/Storages/MergeTree/checkDataPart.cpp
Expand Up @@ -285,11 +285,6 @@ static IMergeTreeDataPart::Checksums checkDataPart(
return {};

auto projection_file = name + ".proj";
if (!throw_on_broken_projection && projection->is_broken)
{
projections_on_disk.erase(projection_file);
checksums_txt.remove(projection_file);
}

IMergeTreeDataPart::Checksums projection_checksums;
try
Expand All @@ -306,26 +301,30 @@ static IMergeTreeDataPart::Checksums checkDataPart(
if (isRetryableException(std::current_exception()))
throw;

is_broken_projection = true;
projections_on_disk.erase(projection_file);
checksums_txt.remove(projection_file);

const auto exception = getCurrentExceptionMessage(true);
kssenii marked this conversation as resolved.
Show resolved Hide resolved

if (!projection->is_broken)
{
LOG_TEST(log, "Marking projection {} as broken ({})", name, projection_file);
projection->setBrokenReason(getCurrentExceptionMessage(false), getCurrentExceptionCode());
LOG_WARNING(log, "Marking projection {} as broken ({}). Reason: {}",
name, projection_file, exception);
projection->setBrokenReason(exception, getCurrentExceptionCode());
}

is_broken_projection = true;
if (throw_on_broken_projection)
{
if (!broken_projections_message.empty())
broken_projections_message += "\n";

broken_projections_message += fmt::format(
"Part {} has a broken projection {} (error: {})",
data_part->name, name, getCurrentExceptionMessage(false));
continue;
data_part->name, name, exception);
}

projections_on_disk.erase(projection_file);
checksums_txt.remove(projection_file);
continue;
}

checksums_data.files[projection_file] = IMergeTreeDataPart::Checksums::Checksum(
Expand Down
191 changes: 183 additions & 8 deletions tests/integration/test_broken_projections/test.py
Expand Up @@ -148,7 +148,7 @@ def break_part(node, table, part):
bash(node, f"rm '{part_path}/columns.txt'")


def get_broken_projections_info(node, table):
def get_broken_projections_info(node, table, active=True):
return node.query(
f"""
SELECT parent_name, name, errors.name FROM
Expand All @@ -158,6 +158,7 @@ def get_broken_projections_info(node, table):
WHERE table='{table}'
AND database=currentDatabase()
AND is_broken = 1
AND active = {active}
) AS parts_info
INNER JOIN system.errors AS errors
ON parts_info.exception_code = errors.code
Expand Down Expand Up @@ -214,14 +215,21 @@ def random_str(length=6):
return "".join(random.SystemRandom().choice(alphabet) for _ in range(length))


def check(node, table, check_result, expect_broken_part="", expected_error=""):
def check(
node,
table,
check_result,
expect_broken_part="",
expected_error="",
do_check_command=True,
):
if expect_broken_part == "proj1":
assert expected_error in node.query_and_get_error(
f"SELECT c FROM '{table}' WHERE d == 12 ORDER BY c"
f"SELECT c FROM '{table}' WHERE d == 12 ORDER BY c SETTINGS force_optimize_projection_name = 'proj1'"
)
else:
query_id = node.query(
f"SELECT queryID() FROM (SELECT c FROM '{table}' WHERE d == 12 ORDER BY c)"
f"SELECT queryID() FROM (SELECT c FROM '{table}' WHERE d == 12 ORDER BY c SETTINGS force_optimize_projection_name = 'proj1')"
).strip()
node.query("SYSTEM FLUSH LOGS")
res = node.query(
Expand All @@ -244,11 +252,11 @@ def check(node, table, check_result, expect_broken_part="", expected_error=""):

if expect_broken_part == "proj2":
assert expected_error in node.query_and_get_error(
f"SELECT d FROM '{table}' WHERE c == 12 ORDER BY d"
f"SELECT d FROM '{table}' WHERE c == 12 ORDER BY d SETTINGS force_optimize_projection_name = 'proj2'"
)
else:
query_id = node.query(
f"SELECT queryID() FROM (SELECT d FROM '{table}' WHERE c == 12 ORDER BY d)"
f"SELECT queryID() FROM (SELECT d FROM '{table}' WHERE c == 12 ORDER BY d SETTINGS force_optimize_projection_name = 'proj2')"
).strip()
node.query("SYSTEM FLUSH LOGS")
res = node.query(
Expand All @@ -269,7 +277,8 @@ def check(node, table, check_result, expect_broken_part="", expected_error=""):
assert False
assert "proj2" in res

assert check_result == int(node.query(f"CHECK TABLE {table}"))
if do_check_command:
assert check_result == int(node.query(f"CHECK TABLE {table}"))


def test_broken_ignored(cluster):
Expand Down Expand Up @@ -352,7 +361,14 @@ def test_broken_ignored(cluster):
# """)
# )

assert "all_3_3_0" in get_broken_projections_info(node, table_name)
assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_5_1"] == get_parts(
node, table_name
)

assert "all_3_3_0" in get_broken_projections_info(node, table_name, active=False)
assert "all_2_2_0" in get_broken_projections_info(node, table_name, active=False)

# 0 because of all_2_2_0
check(node, table_name, 0)


Expand Down Expand Up @@ -574,3 +590,162 @@ def test_broken_projections_in_backups_3(cluster):
assert "all_1_1_0\tproj1\tNO_FILE_IN_DATA_PART" == get_broken_projections_info(
node, table_name
)


def test_check_part_thread(cluster):
node = cluster.instances["node"]

table_name = "check_part_thread_test1"
create_table(node, table_name, 1)

insert(node, table_name, 0, 5)
insert(node, table_name, 5, 5)
insert(node, table_name, 10, 5)
insert(node, table_name, 15, 5)

assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts(
node, table_name
)

# Break data file of projection 'proj2' for part all_2_2_0
break_projection(node, table_name, "proj2", "all_2_2_0", "data")

# It will not yet appear in broken projections info.
assert "proj2" not in get_broken_projections_info(node, table_name)

# Select now fails with error "File doesn't exist"
check(node, table_name, 0, "proj2", "FILE_DOESNT_EXIST", do_check_command=False)

good = False
for _ in range(10):
# We marked projection as broken, checkPartThread must not complain about the part.
good = node.contains_in_log(
f"{table_name} (ReplicatedMergeTreePartCheckThread): Part all_2_2_0 looks good"
)
if good:
break
time.sleep(1)

assert good


def test_broken_on_start(cluster):
node = cluster.instances["node"]

table_name = "test1"
create_table(node, table_name, 1)

insert(node, table_name, 0, 5)
insert(node, table_name, 5, 5)
insert(node, table_name, 10, 5)
insert(node, table_name, 15, 5)

assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts(
node, table_name
)

# Break data file of projection 'proj2' for part all_2_2_0
break_projection(node, table_name, "proj2", "all_2_2_0", "data")

# It will not yet appear in broken projections info.
assert "proj2" not in get_broken_projections_info(node, table_name)

# Select now fails with error "File doesn't exist"
# We will mark projection as broken.
check(node, table_name, 0, "proj2", "FILE_DOESNT_EXIST")

# Projection 'proj2' from part all_2_2_0 will now appear in broken parts info.
assert "all_2_2_0\tproj2\tNO_FILE_IN_DATA_PART" in get_broken_projections_info(
node, table_name
)

# Second select works, because projection is now marked as broken.
check(node, table_name, 0)

node.restart_clickhouse()

# It will not yet appear in broken projections info.
assert "proj2" in get_broken_projections_info(node, table_name)

# Select works
check(node, table_name, 0)


def test_mutation_with_broken_projection(cluster):
node = cluster.instances["node"]

table_name = "test1"
create_table(node, table_name, 1)

insert(node, table_name, 0, 5)
insert(node, table_name, 5, 5)
insert(node, table_name, 10, 5)
insert(node, table_name, 15, 5)

assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts(
node, table_name
)

check(node, table_name, 1)

node.query(
f"ALTER TABLE {table_name} DELETE WHERE c == 11 SETTINGS mutations_sync = 1"
)

assert ["all_0_0_0_4", "all_1_1_0_4", "all_2_2_0_4", "all_3_3_0_4"] == get_parts(
node, table_name
)

assert "" == get_broken_projections_info(node, table_name)

check(node, table_name, 1)

# Break data file of projection 'proj2' for part all_2_2_0_4
break_projection(node, table_name, "proj2", "all_2_2_0_4", "data")

# It will not yet appear in broken projections info.
assert "proj2" not in get_broken_projections_info(node, table_name)

# Select now fails with error "File doesn't exist"
# We will mark projection as broken.
check(node, table_name, 0, "proj2", "FILE_DOESNT_EXIST")

# Projection 'proj2' from part all_2_2_0_4 will now appear in broken parts info.
assert "all_2_2_0_4\tproj2\tNO_FILE_IN_DATA_PART" in get_broken_projections_info(
node, table_name
)

# Second select works, because projection is now marked as broken.
check(node, table_name, 0)

assert "all_2_2_0_4" in get_broken_projections_info(node, table_name)

node.query(
f"ALTER TABLE {table_name} DELETE WHERE _part == 'all_0_0_0_4' SETTINGS mutations_sync = 1"
)

# All parts changes because this is how alter delete works,
# but all parts apart from the first have only hardlinks to files in previous part.
assert ["all_0_0_0_5", "all_1_1_0_5", "all_2_2_0_5", "all_3_3_0_5"] == get_parts(
node, table_name
)

# Still broken because it was hardlinked.
assert "all_2_2_0_5" in get_broken_projections_info(node, table_name)

check(node, table_name, 0)

node.query(
f"ALTER TABLE {table_name} DELETE WHERE c == 13 SETTINGS mutations_sync = 1"
)

assert ["all_1_1_0_6", "all_2_2_0_6", "all_3_3_0_6"] == get_parts(
node, table_name
) or ["all_0_0_0_6", "all_1_1_0_6", "all_2_2_0_6", "all_3_3_0_6"] == get_parts(
node, table_name
)

# Not broken anymore.
assert "" == get_broken_projections_info(node, table_name)

check(node, table_name, 1)