diff --git a/docs/en/engines/table-engines/mergetree-family/partition_export.md b/docs/en/engines/table-engines/mergetree-family/partition_export.md new file mode 100644 index 000000000000..1b91cf9bdeb9 --- /dev/null +++ b/docs/en/engines/table-engines/mergetree-family/partition_export.md @@ -0,0 +1,170 @@ +# ALTER TABLE EXPORT PARTITION + +## Overview + +The `ALTER TABLE EXPORT PARTITION` command exports entire partitions from Replicated*MergeTree tables to object storage (S3, Azure Blob Storage, etc.), typically in Parquet format. This feature coordinates export part operations across all replicas using ZooKeeper. + +Each MergeTree part will become a separate file with the following name convention: `//_.`. To ensure atomicity, a commit file containing the relative paths of all exported parts is also shipped. A data file should only be considered part of the dataset if a commit file references it. The commit file will be named using the following convention: `/commit__`. + +The set of parts that are exported is based on the list of parts the replica that received the export command sees. The other replicas will assist in the export process if they have those parts locally. Otherwise they will ignore it. + +The partition export tasks can be observed through `system.replicated_partition_exports`. Querying this table results in a query to ZooKeeper, so it must be used with care. Individual part export progress can be observed as usual through `system.exports`. + +The same partition can not be exported to the same destination more than once. There are two ways to override this behavior: either by setting the `export_merge_tree_partition_force_export` setting or waiting for the task to expire. + +The export task can be killed by issuing the kill command: `KILL EXPORT PARTITION `. + +The task is persistent - it should be resumed after crashes, failures and etc. + +## Syntax + +```sql +ALTER TABLE [database.]table_name +EXPORT PARTITION ID 'partition_id' +TO TABLE [destination_database.]destination_table +[SETTINGS setting_name = value, ...] +``` + +### Parameters + +- **`table_name`**: The source Replicated*MergeTree table containing the partition to export +- **`partition_id`**: The partition identifier to export (e.g., `'2020'`, `'2021'`) +- **`destination_table`**: The target table for the export (typically an S3, Azure, or other object storage table) + +## Settings + +### Server Settings + +#### `enable_experimental_export_merge_tree_partition_feature` (Required) + +- **Type**: `Bool` +- **Default**: `false` +- **Description**: Enable export replicated merge tree partition feature. It is experimental and not yet ready for production use. + +### Query Settings + +#### `export_merge_tree_partition_force_export` (Optional) + +- **Type**: `Bool` +- **Default**: `false` +- **Description**: Ignore existing partition export and overwrite the ZooKeeper entry. Allows re-exporting a partition to the same destination before the manifest expires. + +#### `export_merge_tree_partition_max_retries` (Optional) + +- **Type**: `UInt64` +- **Default**: `3` +- **Description**: Maximum number of retries for exporting a merge tree part in an export partition task. If it exceeds, the entire task fails. + +#### `export_merge_tree_partition_manifest_ttl` (Optional) + +- **Type**: `UInt64` +- **Default**: `180` (seconds) +- **Description**: Determines how long the manifest will live in ZooKeeper. It prevents the same partition from being exported twice to the same destination. This setting does not affect or delete in-progress tasks; it only cleans up completed ones. + +#### `export_merge_tree_part_file_already_exists_policy` (Optional) + +- **Type**: `MergeTreePartExportFileAlreadyExistsPolicy` +- **Default**: `skip` +- **Description**: Policy for handling files that already exist during export. Possible values: + - `skip` - Skip the file if it already exists + - `error` - Throw an error if the file already exists + - `overwrite` - Overwrite the file + +## Examples + +### Basic Export to S3 + +```sql +CREATE TABLE rmt_table (id UInt64, year UInt16) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/rmt_table', 'replica1') +PARTITION BY year ORDER BY tuple(); + +CREATE TABLE s3_table (id UInt64, year UInt16) +ENGINE = S3(s3_conn, filename='data', format=Parquet, partition_strategy='hive') +PARTITION BY year; + +INSERT INTO rmt_table VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021); + +ALTER TABLE rmt_table EXPORT PARTITION ID '2020' TO TABLE s3_table; + +## Killing Exports + +You can cancel in-progress partition exports using the `KILL EXPORT PARTITION` command: + +```sql +KILL EXPORT PARTITION +WHERE partition_id = '2020' + AND source_table = 'rmt_table' + AND destination_table = 's3_table' +``` + +The `WHERE` clause filters exports from the `system.replicated_partition_exports` table. You can use any columns from that table in the filter. + +## Monitoring + +### Active and Completed Exports + +Monitor partition exports using the `system.replicated_partition_exports` table: + +```sql +arthur :) select * from system.replicated_partition_exports Format Vertical; + +SELECT * +FROM system.replicated_partition_exports +FORMAT Vertical + +Query id: 9efc271a-a501-44d1-834f-bc4d20156164 + +Row 1: +────── +source_database: default +source_table: replicated_source +destination_database: default +destination_table: replicated_destination +create_time: 2025-11-21 18:21:51 +partition_id: 2022 +transaction_id: 7397746091717128192 +source_replica: r1 +parts: ['2022_0_0_0','2022_1_1_0','2022_2_2_0'] +parts_count: 3 +parts_to_do: 0 +status: COMPLETED +exception_replica: +last_exception: +exception_part: +exception_count: 0 + +Row 2: +────── +source_database: default +source_table: replicated_source +destination_database: default +destination_table: replicated_destination +create_time: 2025-11-21 18:20:35 +partition_id: 2021 +transaction_id: 7397745772618674176 +source_replica: r1 +parts: ['2021_0_0_0'] +parts_count: 1 +parts_to_do: 0 +status: COMPLETED +exception_replica: +last_exception: +exception_part: +exception_count: 0 + +2 rows in set. Elapsed: 0.019 sec. + +arthur :) +``` + +Status values include: +- `PENDING` - Export is queued / in progress +- `COMPLETED` - Export finished successfully +- `FAILED` - Export failed +- `KILLED` - Export was cancelled + +## Related Features + +- [ALTER TABLE EXPORT PART](/docs/en/engines/table-engines/mergetree-family/part_export.md) - Export individual parts (non-replicated) + diff --git a/src/Interpreters/ClientInfo.h b/src/Interpreters/ClientInfo.h index c360e723641a..025f84bdd4f9 100644 --- a/src/Interpreters/ClientInfo.h +++ b/src/Interpreters/ClientInfo.h @@ -139,6 +139,7 @@ class ClientInfo NOT_A_BACKGROUND_OPERATION = 0, MERGE = 1, MUTATION = 2, + EXPORT_PART = 3, }; /// It's ClientInfo and context created for background operation (not real query) diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index c9f6e9ad84b6..6323ee3da084 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -3136,6 +3136,13 @@ void Context::makeQueryContextForMutate(const MergeTreeSettings & merge_tree_set = merge_tree_settings[MergeTreeSetting::mutation_workload].value.empty() ? getMutationWorkload() : merge_tree_settings[MergeTreeSetting::mutation_workload]; } +void Context::makeQueryContextForExportPart() +{ + makeQueryContext(); + classifier.reset(); // It is assumed that there are no active queries running using this classifier, otherwise this will lead to crashes + // Export part operations don't have a specific workload setting, so we leave the default workload +} + void Context::makeSessionContext() { session_context = shared_from_this(); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 583dd6509ebe..28a228fdf843 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1135,6 +1135,7 @@ class Context: public ContextData, public std::enable_shared_from_this void makeQueryContext(); void makeQueryContextForMerge(const MergeTreeSettings & merge_tree_settings); void makeQueryContextForMutate(const MergeTreeSettings & merge_tree_settings); + void makeQueryContextForExportPart(); void makeSessionContext(); void makeGlobalContext(); diff --git a/src/Storages/MergeTree/ExportPartTask.cpp b/src/Storages/MergeTree/ExportPartTask.cpp index cdb61350e376..a43c45d0edaf 100644 --- a/src/Storages/MergeTree/ExportPartTask.cpp +++ b/src/Storages/MergeTree/ExportPartTask.cpp @@ -47,7 +47,7 @@ ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExpo bool ExportPartTask::executeStep() { - const auto & metadata_snapshot = manifest.storage_snapshot->metadata; + const auto & metadata_snapshot = manifest.metadata_snapshot; Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); @@ -142,9 +142,13 @@ bool ExportPartTask::executeStep() bool read_with_direct_io = local_context->getSettingsRef()[Setting::min_bytes_to_use_direct_io] > manifest.data_part->getBytesOnDisk(); bool prefetch = false; - const auto & snapshot_data = assert_cast(*manifest.storage_snapshot->data); - auto mutations_snapshot = snapshot_data.mutations_snapshot; + MergeTreeData::IMutationsSnapshot::Params mutations_snapshot_params + { + .metadata_version = metadata_snapshot->getMetadataVersion(), + .min_part_metadata_version = manifest.data_part->getMetadataVersion() + }; + auto mutations_snapshot = storage.getMutationsSnapshot(mutations_snapshot_params); auto alter_conversions = MergeTreeData::getAlterConversionsForPart( manifest.data_part, mutations_snapshot, @@ -156,7 +160,7 @@ bool ExportPartTask::executeStep() read_type, plan_for_part, storage, - manifest.storage_snapshot, + storage.getStorageSnapshot(metadata_snapshot, local_context), RangesInDataPart(manifest.data_part), alter_conversions, nullptr, diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index 528841e21188..ab3a8ce361c7 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -22,6 +22,8 @@ namespace ContextPtr getContextCopyWithTaskSettings(const ContextPtr & context, const ExportReplicatedMergeTreePartitionManifest & manifest) { auto context_copy = Context::createCopy(context); + context_copy->makeQueryContextForExportPart(); + context_copy->setCurrentQueryId(manifest.transaction_id); context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting); context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding); context_copy->setSetting("max_threads", manifest.max_threads); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 0bc0851746e6..6f1c59239b93 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6262,7 +6262,7 @@ void MergeTreeData::exportPartToTable( transaction_id, query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value, format_settings, - getStorageSnapshot(source_metadata_ptr, query_context), + source_metadata_ptr, completion_callback); std::lock_guard lock(export_manifests_mutex); @@ -9132,7 +9132,12 @@ bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & assignee) continue; } - auto task = std::make_shared(*this, manifest, getContext()); + auto context_copy = Context::createCopy(getContext()); + context_copy->makeQueryContextForExportPart(); + context_copy->setCurrentQueryId(manifest.transaction_id); + context_copy->setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType::EXPORT_PART); + + auto task = std::make_shared(*this, manifest, context_copy); manifest.in_progress = assignee.scheduleMoveTask(task); diff --git a/src/Storages/MergeTree/MergeTreePartExportManifest.h b/src/Storages/MergeTree/MergeTreePartExportManifest.h index f7d1bf1f1623..533eeb6decdd 100644 --- a/src/Storages/MergeTree/MergeTreePartExportManifest.h +++ b/src/Storages/MergeTree/MergeTreePartExportManifest.h @@ -47,14 +47,14 @@ struct MergeTreePartExportManifest const String & transaction_id_, FileAlreadyExistsPolicy file_already_exists_policy_, const FormatSettings & format_settings_, - const StorageSnapshotPtr & storage_snapshot_, + const StorageMetadataPtr & metadata_snapshot_, std::function completion_callback_ = {}) : destination_storage_id(destination_storage_id_), data_part(data_part_), transaction_id(transaction_id_), file_already_exists_policy(file_already_exists_policy_), format_settings(format_settings_), - storage_snapshot(storage_snapshot_), + metadata_snapshot(metadata_snapshot_), completion_callback(completion_callback_), create_time(time(nullptr)) {} @@ -65,9 +65,9 @@ struct MergeTreePartExportManifest FileAlreadyExistsPolicy file_already_exists_policy; FormatSettings format_settings; - /// Storage snapshot captured at the time of query validation to prevent race conditions with mutations + /// Metadata snapshot captured at the time of query validation to prevent race conditions with mutations /// Otherwise the export could fail if the schema changes between validation and execution - StorageSnapshotPtr storage_snapshot; + StorageMetadataPtr metadata_snapshot; std::function completion_callback;