Skip to content

Commit

Permalink
Merge pull request #55944 from azat/rmt-retriable-exception-log
Browse files Browse the repository at this point in the history
Do not write retriable errors for Replicated mutate/merge into error log
  • Loading branch information
tavplubix committed Oct 25, 2023
2 parents 0d68a52 + 66c4a3b commit c0482cb
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 5 deletions.
9 changes: 9 additions & 0 deletions src/Storages/MergeTree/IExecutableTask.h
Expand Up @@ -30,7 +30,16 @@ class IExecutableTask
{
public:
using TaskResultCallback = std::function<void(bool)>;

virtual bool executeStep() = 0;

/// Sometimes exceptions from the executeStep() had been already printed to
/// the log, but with different level (see
/// ReplicatedMergeMutateTaskBase::executeStep()), but the exception should
/// be throw, since there are some sanity assertions based on the
/// std::uncaught_exceptions() (i.e. WriteBuffer::~WriteBuffer())
virtual bool printExecutionException() const { return true; }

virtual void onCompleted() = 0;
virtual StorageID getStorageID() const = 0;
virtual String getQueryId() const = 0;
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp
Expand Up @@ -281,7 +281,8 @@ void MergeTreeBackgroundExecutor<Queue>::routine(TaskRuntimeDataPtr item)
}
catch (...)
{
printExceptionWithRespectToAbort(log, query_id);
if (item->task->printExecutionException())
printExceptionWithRespectToAbort(log, query_id);
/// Release the task with exception context.
/// An exception context is needed to proper delete write buffers without finalization
release_task(std::move(item));
Expand Down
10 changes: 6 additions & 4 deletions src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp
Expand Up @@ -69,10 +69,9 @@ bool ReplicatedMergeMutateTaskBase::executeStep()
else
tryLogCurrentException(log, __PRETTY_FUNCTION__);

/** This exception will be written to the queue element, and it can be looked up using `system.replication_queue` table.
* The thread that performs this action will sleep a few seconds after the exception.
* See `queue.processEntry` function.
*/
/// This exception will be written to the queue element, and it can be looked up using `system.replication_queue` table.
/// The thread that performs this action will sleep a few seconds after the exception.
/// See `queue.processEntry` function.
throw;
}
catch (...)
Expand Down Expand Up @@ -121,6 +120,9 @@ bool ReplicatedMergeMutateTaskBase::executeStep()
}
}

if (retryable_error)
print_exception = false;

if (saved_exception)
std::rethrow_exception(saved_exception);

Expand Down
3 changes: 3 additions & 0 deletions src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.h
Expand Up @@ -37,6 +37,8 @@ class ReplicatedMergeMutateTaskBase : public IExecutableTask
String getQueryId() const override { return getStorageID().getShortName() + "::" + selected_entry->log_entry->new_part_name; }
bool executeStep() override;

bool printExecutionException() const override { return print_exception; }

protected:
using PartLogWriter = std::function<void(const ExecutionStatus &)>;

Expand Down Expand Up @@ -91,6 +93,7 @@ class ReplicatedMergeMutateTaskBase : public IExecutableTask
PartLogWriter part_log_writer{};
State state{State::NEED_PREPARE};
IExecutableTask::TaskResultCallback task_result_callback;
bool print_exception = true;
};

}
@@ -0,0 +1 @@
Information 1
72 changes: 72 additions & 0 deletions tests/queries/0_stateless/02903_rmt_retriable_merge_exception.sh
@@ -0,0 +1,72 @@
#!/usr/bin/env bash
# Tags: no-ordinary-database
# Tag no-ordinary-database: requires UUID

CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh

# Test that retriable errors during merges/mutations
# (i.e. "No active replica has part X or covering part")
# does not appears as errors (level=Error), only as info message (level=Information).

$CLICKHOUSE_CLIENT -nm -q "
drop table if exists rmt1;
drop table if exists rmt2;
create table rmt1 (key Int) engine=ReplicatedMergeTree('/clickhouse/{database}', '1') order by key settings always_fetch_merged_part=1;
create table rmt2 (key Int) engine=ReplicatedMergeTree('/clickhouse/{database}', '2') order by key settings always_fetch_merged_part=0;
insert into rmt1 values (1);
insert into rmt1 values (2);
system sync replica rmt1;
system stop pulling replication log rmt2;
optimize table rmt1 final settings alter_sync=0, optimize_throw_if_noop=1;
" || exit 1

table_uuid=$($CLICKHOUSE_CLIENT -q "select uuid from system.tables where database = currentDatabase() and table = 'rmt1'")
if [[ -z $table_uuid ]]; then
echo "Table does not have UUID" >&2
exit 1
fi

# NOTE: that part name can be different from all_0_1_1, in case of ZooKeeper retries
part_name='%'

# wait while there be at least one 'No active replica has part all_0_1_1 or covering part' in logs
for _ in {0..50}; do
no_active_repilica_messages=$($CLICKHOUSE_CLIENT -nm -q "
system flush logs;
select count()
from system.text_log
where
event_date >= yesterday() and event_time >= now() - 600 and
(
(logger_name = 'MergeTreeBackgroundExecutor' and message like '%{$table_uuid::$part_name}%No active replica has part $part_name or covering part%') or
(logger_name like '$table_uuid::$part_name (MergeFromLogEntryTask)' and message like '%No active replica has part $part_name or covering part%')
);
")
if [[ $no_active_repilica_messages -gt 0 ]]; then
break
fi
# too frequent "system flush logs" causes troubles
sleep 1
done

$CLICKHOUSE_CLIENT -nm -q "
system start pulling replication log rmt2;
system flush logs;
select
level, count() > 0
from system.text_log
where
event_date >= yesterday() and event_time >= now() - 600 and
(
(logger_name = 'MergeTreeBackgroundExecutor' and message like '%{$table_uuid::$part_name}%No active replica has part $part_name or covering part%') or
(logger_name like '$table_uuid::$part_name (MergeFromLogEntryTask)' and message like '%No active replica has part $part_name or covering part%')
)
group by level;
"

0 comments on commit c0482cb

Please sign in to comment.