Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix mutations in tables with columns of type Object #37266

Merged
merged 1 commit into from May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/Interpreters/MutationsInterpreter.cpp
Expand Up @@ -758,7 +758,9 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)

ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> & prepared_stages, bool dry_run)
{
NamesAndTypesList all_columns = metadata_snapshot->getColumns().getAllPhysical();
auto storage_snapshot = storage->getStorageSnapshot(metadata_snapshot, context);
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
auto all_columns = storage_snapshot->getColumns(options);

/// Next, for each stage calculate columns changed by this and previous stages.
for (size_t i = 0; i < prepared_stages.size(); ++i)
Expand Down Expand Up @@ -802,7 +804,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
/// e.g. ALTER referencing the same table in scalar subquery
bool execute_scalar_subqueries = !dry_run;
auto syntax_result = TreeRewriter(context).analyze(
all_asts, all_columns, storage, storage->getStorageSnapshot(metadata_snapshot, context),
all_asts, all_columns, storage, storage_snapshot,
false, true, execute_scalar_subqueries);

if (execute_scalar_subqueries && context->hasQueryContext())
Expand Down
1 change: 0 additions & 1 deletion src/Storages/MergeTree/MergeTask.cpp
Expand Up @@ -149,7 +149,6 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
global_ctx->merging_columns,
global_ctx->merging_column_names);


auto local_single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + global_ctx->future_part->name, ctx->disk, 0);
global_ctx->new_data_part = global_ctx->data->createPart(
global_ctx->future_part->name,
Expand Down
15 changes: 10 additions & 5 deletions src/Storages/MergeTree/MutateTask.cpp
Expand Up @@ -624,7 +624,9 @@ struct MutationContext
FutureMergedMutatedPartPtr future_part;
MergeTreeData::DataPartPtr source_part;

StoragePtr storage_from_source_part;
StorageMetadataPtr metadata_snapshot;

MutationCommandsConstPtr commands;
time_t time_of_mutation;
ContextPtr context;
Expand Down Expand Up @@ -1367,6 +1369,11 @@ MutateTask::MutateTask(
ctx->space_reservation = space_reservation_;
ctx->storage_columns = metadata_snapshot_->getColumns().getAllPhysical();
ctx->txn = txn;
ctx->source_part = ctx->future_part->parts[0];
ctx->storage_from_source_part = std::make_shared<StorageFromMergeTreeDataPart>(ctx->source_part);

auto storage_snapshot = ctx->storage_from_source_part->getStorageSnapshot(ctx->metadata_snapshot, context_);
extendObjectColumns(ctx->storage_columns, storage_snapshot->object_columns, /*with_subcolumns=*/ false);
}


Expand Down Expand Up @@ -1405,8 +1412,6 @@ bool MutateTask::prepare()
"This is a bug.", toString(ctx->future_part->parts.size()));

ctx->num_mutations = std::make_unique<CurrentMetrics::Increment>(CurrentMetrics::PartMutation);
ctx->source_part = ctx->future_part->parts[0];
auto storage_from_source_part = std::make_shared<StorageFromMergeTreeDataPart>(ctx->source_part);

auto context_for_reading = Context::createCopy(ctx->context);
context_for_reading->setSetting("max_streams_to_max_threads_ratio", 1);
Expand All @@ -1417,13 +1422,13 @@ bool MutateTask::prepare()

for (const auto & command : *ctx->commands)
{
if (command.partition == nullptr || ctx->future_part->parts[0]->info.partition_id == ctx->data->getPartitionIDFromQuery(
if (command.partition == nullptr || ctx->source_part->info.partition_id == ctx->data->getPartitionIDFromQuery(
command.partition, context_for_reading))
ctx->commands_for_part.emplace_back(command);
}

if (ctx->source_part->isStoredOnDisk() && !isStorageTouchedByMutations(
storage_from_source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading)))
ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading)))
{
LOG_TRACE(ctx->log, "Part {} doesn't change up to mutation version {}", ctx->source_part->name, ctx->future_part->part_info.mutation);
promise.set_value(ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_clone_", ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn, &ctx->hardlinked_files, false));
Expand All @@ -1441,7 +1446,7 @@ bool MutateTask::prepare()
if (!ctx->for_interpreter.empty())
{
ctx->interpreter = std::make_unique<MutationsInterpreter>(
storage_from_source_part, ctx->metadata_snapshot, ctx->for_interpreter, context_for_reading, true);
ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->for_interpreter, context_for_reading, true);
ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices();
ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections();
ctx->mutation_kind = ctx->interpreter->getMutationKind();
Expand Down
17 changes: 17 additions & 0 deletions src/Storages/MergeTree/StorageFromMergeTreeDataPart.h
Expand Up @@ -3,6 +3,7 @@
#include <Storages/IStorage.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <DataTypes/ObjectUtils.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
Expand Down Expand Up @@ -36,6 +37,20 @@ class StorageFromMergeTreeDataPart final : public IStorage

String getName() const override { return "FromMergeTreeDataPart"; }

StorageSnapshotPtr getStorageSnapshot(
const StorageMetadataPtr & metadata_snapshot, ContextPtr /*query_context*/) const override
{
const auto & storage_columns = metadata_snapshot->getColumns();
if (!hasObjectColumns(storage_columns))
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot);

auto object_columns = getObjectColumns(
parts.begin(), parts.end(),
storage_columns, [](const auto & part) -> const auto & { return part->getColumns(); });

return std::make_shared<StorageSnapshot>(*this, metadata_snapshot, object_columns);
}

Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
Expand Down Expand Up @@ -65,6 +80,8 @@ class StorageFromMergeTreeDataPart final : public IStorage

bool supportsIndexForIn() const override { return true; }

bool supportsDynamicSubcolumns() const override { return true; }

bool mayBenefitFromIndexForIn(
const ASTPtr & left_in_operand, ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot) const override
{
Expand Down
7 changes: 7 additions & 0 deletions tests/queries/0_stateless/01825_type_json_mutations.reference
@@ -0,0 +1,7 @@
1 q (1,2,[('aaa'),('bbb')])
2 w (3,4,[('ccc')])
3 e (5,6,[])
1 q (1,2,[('aaa'),('bbb')])
3 e (5,6,[])
1 foo
3 foo
21 changes: 21 additions & 0 deletions tests/queries/0_stateless/01825_type_json_mutations.sql
@@ -0,0 +1,21 @@
-- Tags: no-fasttest

DROP TABLE IF EXISTS t_json_mutations;

SET allow_experimental_object_type = 1;
SET output_format_json_named_tuples_as_objects = 1;
SET mutations_sync = 2;

CREATE TABLE t_json_mutations(id UInt32, s String, obj JSON) ENGINE = MergeTree ORDER BY id;

INSERT INTO t_json_mutations VALUES (1, 'q', '{"k1": 1, "k2": 2, "k3": [{"k4": "aaa"}, {"k4": "bbb"}]}');
INSERT INTO t_json_mutations VALUES (2, 'w', '{"k1": 3, "k2": 4, "k3": [{"k4": "ccc"}]}');
INSERT INTO t_json_mutations VALUES (3, 'e', '{"k1": 5, "k2": 6}');

SELECT * FROM t_json_mutations ORDER BY id;
ALTER TABLE t_json_mutations DELETE WHERE id = 2;
SELECT * FROM t_json_mutations ORDER BY id;
ALTER TABLE t_json_mutations DROP COLUMN s, DROP COLUMN obj, ADD COLUMN t String DEFAULT 'foo';
SELECT * FROM t_json_mutations ORDER BY id;

DROP TABLE t_json_mutations;