Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions docs/en/engines/table-engines/mergetree-family/partition_export.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# ALTER TABLE EXPORT PARTITION

## Overview

The `ALTER TABLE EXPORT PARTITION` command exports entire partitions from Replicated*MergeTree tables to object storage (S3, Azure Blob Storage, etc.), typically in Parquet format. This feature coordinates export part operations across all replicas using ZooKeeper.

Each MergeTree part will become a separate file with the following name convention: `<table_directory>/<partitioning>/<data_part_name>_<merge_tree_part_checksum>.<format>`. To ensure atomicity, a commit file containing the relative paths of all exported parts is also shipped. A data file should only be considered part of the dataset if a commit file references it. The commit file will be named using the following convention: `<table_directory>/commit_<partition_id>_<transaction_id>`.

The set of parts that are exported is based on the list of parts the replica that received the export command sees. The other replicas will assist in the export process if they have those parts locally. Otherwise they will ignore it.

The partition export tasks can be observed through `system.replicated_partition_exports`. Querying this table results in a query to ZooKeeper, so it must be used with care. Individual part export progress can be observed as usual through `system.exports`.

The same partition can not be exported to the same destination more than once. There are two ways to override this behavior: either by setting the `export_merge_tree_partition_force_export` setting or waiting for the task to expire.

The export task can be killed by issuing the kill command: `KILL EXPORT PARTITION <where predicate for system.replicated_partition_exports>`.

The task is persistent - it should be resumed after crashes, failures and etc.

## Syntax

```sql
ALTER TABLE [database.]table_name
EXPORT PARTITION ID 'partition_id'
TO TABLE [destination_database.]destination_table
[SETTINGS setting_name = value, ...]
```

### Parameters

- **`table_name`**: The source Replicated*MergeTree table containing the partition to export
- **`partition_id`**: The partition identifier to export (e.g., `'2020'`, `'2021'`)
- **`destination_table`**: The target table for the export (typically an S3, Azure, or other object storage table)

## Settings

### Server Settings

#### `enable_experimental_export_merge_tree_partition_feature` (Required)

- **Type**: `Bool`
- **Default**: `false`
- **Description**: Enable export replicated merge tree partition feature. It is experimental and not yet ready for production use.

### Query Settings

#### `export_merge_tree_partition_force_export` (Optional)

- **Type**: `Bool`
- **Default**: `false`
- **Description**: Ignore existing partition export and overwrite the ZooKeeper entry. Allows re-exporting a partition to the same destination before the manifest expires.

#### `export_merge_tree_partition_max_retries` (Optional)

- **Type**: `UInt64`
- **Default**: `3`
- **Description**: Maximum number of retries for exporting a merge tree part in an export partition task. If it exceeds, the entire task fails.

#### `export_merge_tree_partition_manifest_ttl` (Optional)

- **Type**: `UInt64`
- **Default**: `180` (seconds)
- **Description**: Determines how long the manifest will live in ZooKeeper. It prevents the same partition from being exported twice to the same destination. This setting does not affect or delete in-progress tasks; it only cleans up completed ones.

#### `export_merge_tree_part_file_already_exists_policy` (Optional)

- **Type**: `MergeTreePartExportFileAlreadyExistsPolicy`
- **Default**: `skip`
- **Description**: Policy for handling files that already exist during export. Possible values:
- `skip` - Skip the file if it already exists
- `error` - Throw an error if the file already exists
- `overwrite` - Overwrite the file

## Examples

### Basic Export to S3

```sql
CREATE TABLE rmt_table (id UInt64, year UInt16)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/rmt_table', 'replica1')
PARTITION BY year ORDER BY tuple();

CREATE TABLE s3_table (id UInt64, year UInt16)
ENGINE = S3(s3_conn, filename='data', format=Parquet, partition_strategy='hive')
PARTITION BY year;

INSERT INTO rmt_table VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021);

ALTER TABLE rmt_table EXPORT PARTITION ID '2020' TO TABLE s3_table;

## Killing Exports

You can cancel in-progress partition exports using the `KILL EXPORT PARTITION` command:

```sql
KILL EXPORT PARTITION
WHERE partition_id = '2020'
AND source_table = 'rmt_table'
AND destination_table = 's3_table'
```

The `WHERE` clause filters exports from the `system.replicated_partition_exports` table. You can use any columns from that table in the filter.

## Monitoring

### Active and Completed Exports

Monitor partition exports using the `system.replicated_partition_exports` table:

```sql
arthur :) select * from system.replicated_partition_exports Format Vertical;

SELECT *
FROM system.replicated_partition_exports
FORMAT Vertical

Query id: 9efc271a-a501-44d1-834f-bc4d20156164

Row 1:
──────
source_database: default
source_table: replicated_source
destination_database: default
destination_table: replicated_destination
create_time: 2025-11-21 18:21:51
partition_id: 2022
transaction_id: 7397746091717128192
source_replica: r1
parts: ['2022_0_0_0','2022_1_1_0','2022_2_2_0']
parts_count: 3
parts_to_do: 0
status: COMPLETED
exception_replica:
last_exception:
exception_part:
exception_count: 0

Row 2:
──────
source_database: default
source_table: replicated_source
destination_database: default
destination_table: replicated_destination
create_time: 2025-11-21 18:20:35
partition_id: 2021
transaction_id: 7397745772618674176
source_replica: r1
parts: ['2021_0_0_0']
parts_count: 1
parts_to_do: 0
status: COMPLETED
exception_replica:
last_exception:
exception_part:
exception_count: 0

2 rows in set. Elapsed: 0.019 sec.

arthur :)
```

Status values include:
- `PENDING` - Export is queued / in progress
- `COMPLETED` - Export finished successfully
- `FAILED` - Export failed
- `KILLED` - Export was cancelled

## Related Features

- [ALTER TABLE EXPORT PART](/docs/en/engines/table-engines/mergetree-family/part_export.md) - Export individual parts (non-replicated)

1 change: 1 addition & 0 deletions src/Interpreters/ClientInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class ClientInfo
NOT_A_BACKGROUND_OPERATION = 0,
MERGE = 1,
MUTATION = 2,
EXPORT_PART = 3,
};

/// It's ClientInfo and context created for background operation (not real query)
Expand Down
7 changes: 7 additions & 0 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3136,6 +3136,13 @@ void Context::makeQueryContextForMutate(const MergeTreeSettings & merge_tree_set
= merge_tree_settings[MergeTreeSetting::mutation_workload].value.empty() ? getMutationWorkload() : merge_tree_settings[MergeTreeSetting::mutation_workload];
}

void Context::makeQueryContextForExportPart()
{
makeQueryContext();
classifier.reset(); // It is assumed that there are no active queries running using this classifier, otherwise this will lead to crashes
// Export part operations don't have a specific workload setting, so we leave the default workload
}

void Context::makeSessionContext()
{
session_context = shared_from_this();
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,7 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
void makeQueryContext();
void makeQueryContextForMerge(const MergeTreeSettings & merge_tree_settings);
void makeQueryContextForMutate(const MergeTreeSettings & merge_tree_settings);
void makeQueryContextForExportPart();
void makeSessionContext();
void makeGlobalContext();

Expand Down
12 changes: 8 additions & 4 deletions src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExpo

bool ExportPartTask::executeStep()
{
const auto & metadata_snapshot = manifest.storage_snapshot->metadata;
const auto & metadata_snapshot = manifest.metadata_snapshot;

Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();

Expand Down Expand Up @@ -142,9 +142,13 @@ bool ExportPartTask::executeStep()
bool read_with_direct_io = local_context->getSettingsRef()[Setting::min_bytes_to_use_direct_io] > manifest.data_part->getBytesOnDisk();
bool prefetch = false;

const auto & snapshot_data = assert_cast<const MergeTreeData::SnapshotData &>(*manifest.storage_snapshot->data);
auto mutations_snapshot = snapshot_data.mutations_snapshot;
MergeTreeData::IMutationsSnapshot::Params mutations_snapshot_params
{
.metadata_version = metadata_snapshot->getMetadataVersion(),
.min_part_metadata_version = manifest.data_part->getMetadataVersion()
};

auto mutations_snapshot = storage.getMutationsSnapshot(mutations_snapshot_params);
Comment on lines +147 to +151

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve query mutation settings when exporting parts

The export task now reconstructs the mutations snapshot with only metadata_version and min_part_metadata_version, relying on the storage context defaults. User settings such as apply_mutations_on_fly (needed to apply unfinished mutations or lightweight deletes during export) are no longer propagated from the original EXPORT PART query, so getMutationsSnapshot returns an empty data/alter mutation set and the exported part can miss pending mutations even when the query explicitly requested them. Previously the snapshot carried the query-context settings via storage_snapshot, so this is a regression for exports that expect on-the-fly mutation application.

Useful? React with 👍 / 👎.

Comment on lines +147 to +151

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep data/alter mutations in export part snapshot

The new mutations snapshot built in executeStep only sets metadata_version and min_part_metadata_version, leaving need_data_mutations, need_alter_mutations, and patch parameters at their defaults. getMutationsSnapshot therefore returns an empty snapshot whenever metadata versions match, so DELETE/UPDATE mutations or patch parts in progress are no longer applied during export. Compared to the previous use of manifest.storage_snapshot->data->mutations_snapshot (which honored apply_mutations_on_fly/apply_patch_parts), exporting a part while mutations are pending will emit stale data that does not match normal query results.

Useful? React with 👍 / 👎.

auto alter_conversions = MergeTreeData::getAlterConversionsForPart(
manifest.data_part,
mutations_snapshot,
Expand All @@ -156,7 +160,7 @@ bool ExportPartTask::executeStep()
read_type,
plan_for_part,
storage,
manifest.storage_snapshot,
storage.getStorageSnapshot(metadata_snapshot, local_context),
RangesInDataPart(manifest.data_part),
alter_conversions,
nullptr,
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace
ContextPtr getContextCopyWithTaskSettings(const ContextPtr & context, const ExportReplicatedMergeTreePartitionManifest & manifest)
{
auto context_copy = Context::createCopy(context);
context_copy->makeQueryContextForExportPart();
context_copy->setCurrentQueryId(manifest.transaction_id);
context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting);
context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding);
context_copy->setSetting("max_threads", manifest.max_threads);
Expand Down
9 changes: 7 additions & 2 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6262,7 +6262,7 @@ void MergeTreeData::exportPartToTable(
transaction_id,
query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value,
format_settings,
getStorageSnapshot(source_metadata_ptr, query_context),
source_metadata_ptr,
completion_callback);

std::lock_guard lock(export_manifests_mutex);
Expand Down Expand Up @@ -9132,7 +9132,12 @@ bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & assignee)
continue;
}

auto task = std::make_shared<ExportPartTask>(*this, manifest, getContext());
auto context_copy = Context::createCopy(getContext());
context_copy->makeQueryContextForExportPart();
context_copy->setCurrentQueryId(manifest.transaction_id);
context_copy->setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType::EXPORT_PART);

auto task = std::make_shared<ExportPartTask>(*this, manifest, context_copy);

manifest.in_progress = assignee.scheduleMoveTask(task);

Expand Down
8 changes: 4 additions & 4 deletions src/Storages/MergeTree/MergeTreePartExportManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ struct MergeTreePartExportManifest
const String & transaction_id_,
FileAlreadyExistsPolicy file_already_exists_policy_,
const FormatSettings & format_settings_,
const StorageSnapshotPtr & storage_snapshot_,
const StorageMetadataPtr & metadata_snapshot_,
std::function<void(CompletionCallbackResult)> completion_callback_ = {})
: destination_storage_id(destination_storage_id_),
data_part(data_part_),
transaction_id(transaction_id_),
file_already_exists_policy(file_already_exists_policy_),
format_settings(format_settings_),
storage_snapshot(storage_snapshot_),
metadata_snapshot(metadata_snapshot_),
completion_callback(completion_callback_),
create_time(time(nullptr)) {}

Expand All @@ -65,9 +65,9 @@ struct MergeTreePartExportManifest
FileAlreadyExistsPolicy file_already_exists_policy;
FormatSettings format_settings;

/// Storage snapshot captured at the time of query validation to prevent race conditions with mutations
/// Metadata snapshot captured at the time of query validation to prevent race conditions with mutations
/// Otherwise the export could fail if the schema changes between validation and execution
StorageSnapshotPtr storage_snapshot;
StorageMetadataPtr metadata_snapshot;

std::function<void(CompletionCallbackResult)> completion_callback;

Expand Down
Loading