-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before asking
- I had searched in the issues and found no similar issues.
Description
1. Background & Objectives
1.1 Business Scenarios
In Doris Cloud mode, users require periodic snapshot backups of cluster metadata to support:
- Disaster Recovery: Restore FE metadata to a specific point in time when corruption occurs
- Instance Cloning: Clone read-only/writable/rollback instances from a snapshot for testing and data analysis
- Compliance Auditing: Meet data backup SLA requirements for industries such as finance and healthcare
1.2 Framework Status
Doris already provides a complete snapshot framework skeleton, but all business methods are Not implemented stubs:
| Layer | Component | Status |
|---|---|---|
| C++ Meta-Service | SnapshotManager base class |
18 virtual methods, all returning UNDEFINED_ERR or no-op |
| C++ Meta-Service | MetaServiceImpl RPC delegation |
7 RPC handlers with parameter validation + delegation logic implemented |
| Java FE | CloudSnapshotHandler base class |
submitJob / refreshAutoSnapshotJob / cloneSnapshot all throw NotImplementedException |
| Java FE | SQL Commands | AdminCreate/Set/Drop ClusterSnapshot — 3 commands completed |
| Proto | cloud.proto |
All RPC message definitions ready (7 RPCs + 14 messages) |
| TxnKv Keys | keys.h |
snapshot_full_key / snapshot_reference_key defined |
| RPC Channel | MetaServiceProxy / MetaServiceClient |
7 snapshot methods wrapped |
| BE Display | SchemaClusterSnapshotsScanner |
information_schema.cluster_snapshots implemented |
| Config | Config.java |
6 cloud_snapshot_* / multi_part_upload_* config items defined |
| Tests | meta_service_snapshot_test.cpp |
Only BeginSnapshotTest exists (DISABLED) |
1.3 Design Goals
- G-1: Implement all 18 virtual methods of
SnapshotManager, covering the complete snapshot lifecycle - G-2: Implement a
CloudSnapshotHandlersubclass, completing FE-side scheduling, upload, and recovery logic - G-3: Do not modify any existing Proto definitions or RPC signatures — develop entirely based on framework extension points
- G-4: All core flows covered by unit tests + integration tests, with 100% executable test cases
- G-5: Support S3/OSS/GCS/HDFS multi-storage backends via the
StorageVaultAccessorabstraction layer
2. Technical Infrastructure Analysis
2.1 Framework Extension Point Verification
Extension Point 1: C++ SnapshotManager (Meta-Service Side)
Verified through source code — all business methods in the base class default to returning UNDEFINED_ERR:
// cloud/src/snapshot/snapshot_manager.cpp (L72-L77)
void SnapshotManager::begin_snapshot(std::string_view instance_id,
const BeginSnapshotRequest& request,
BeginSnapshotResponse* response) {
response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR);
response->mutable_status()->set_msg("Not implemented");
}Injection point confirmed: meta_server.cpp L82-83 and recycler.cpp L594 are the only locations where SnapshotManager is instantiated. Customization only requires replacing these two locations:
// cloud/src/meta-service/meta_server.cpp (L82-83)
auto snapshot_mgr = std::make_shared<SnapshotManager>(txn_kv_);
auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv_, rc_mgr, rate_limiter, std::move(snapshot_mgr));
// cloud/src/recycler/recycler.cpp (L594)
snapshot_manager_ = std::make_shared<SnapshotManager>(txn_kv_);Extension Point 2: Java CloudSnapshotHandler (FE Scheduling Side)
Dynamically loaded via reflection, with the class name controlled by Config.cloud_snapshot_handler_class:
// CloudSnapshotHandler.java (L42-53)
public static CloudSnapshotHandler getInstance() {
Class<CloudSnapshotHandler> theClass = (Class<CloudSnapshotHandler>) Class.forName(
Config.cloud_snapshot_handler_class);
Constructor<CloudSnapshotHandler> constructor = theClass.getDeclaredConstructor();
return constructor.newInstance();
}CloudEnv calls the handler's initialize() during initialize(), and starts the background daemon in startMasterOnlyDaemonThreads().
2.2 TxnKv Key Format Confirmation
// keys.h (L110-111)
0x03 "snapshot" ${instance_id} "full" ${timestamp} -> SnapshotPB
0x03 "snapshot" ${instance_id} "reference" ${timestamp} ${instance_id} -> ${empty_value}
- Full Key: Stores snapshot metadata
SnapshotPB, with Versionstamp as the timestamp suffix - Reference Key: Records instance IDs derived from this snapshot, used for reference counting and deletion protection
Existing utility functions:
// keys.h (L539-545)
versioned::snapshot_full_key({instance_id})
versioned::snapshot_reference_key({instance_id, versionstamp, ref_instance_id})
versioned::snapshot_reference_key_prefix(instance_id)
versioned::snapshot_key_prefix(instance_id)2.3 Snapshot State Machine Confirmation
// cloud.proto (L821-830)
enum SnapshotStatus {
SNAPSHOT_PREPARE = 0; // Creating
SNAPSHOT_NORMAL = 1; // Ready
SNAPSHOT_ABORTED = 2; // Aborted
SNAPSHOT_RECYCLED = 3; // Marked for recycling
}State transitions: PREPARE → NORMAL (success) | PREPARE → ABORTED (failure) | NORMAL → RECYCLED (deletion/expiration)
2.4 Existing RPC Parameter Validation Logic
MetaServiceImpl has already completed unified cloud_unique_id → instance_id conversion and rate limiting. Subclass implementations do not need to repeat this:
// meta_service_snapshot.cpp (L27-49) — using begin_snapshot as an example
void MetaServiceImpl::begin_snapshot(...) {
RPC_PREPROCESS(begin_snapshot, get, put, del);
// cloud_unique_id validation
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
RPC_RATE_LIMIT(begin_snapshot);
// Delegate to SnapshotManager
snapshot_manager_->begin_snapshot(instance_id, *request, response);
}2.5 Object Storage Access Interface Confirmation
StorageVaultAccessor provides a complete storage operation abstraction:
// storage_vault_accessor.h (L51-97)
class StorageVaultAccessor {
virtual int delete_prefix(const std::string& path_prefix, int64_t expiration_time = 0) = 0;
virtual int delete_directory(const std::string& dir_path) = 0;
virtual int delete_file(const std::string& path) = 0;
virtual int list_directory(const std::string& dir_path, std::unique_ptr<ListIterator>* res) = 0;
virtual int put_file(const std::string& path, const std::string& content) = 0;
virtual int exists(const std::string& path) = 0;
virtual int abort_multipart_upload(const std::string& path, const std::string& upload_id) = 0;
};The abort_multipart_upload method is built-in and can be used directly to clean up multipart uploads during abort_snapshot.
2.6 FE Configuration Items Status
| Config Item | Default Value | Description |
|---|---|---|
cloud_snapshot_handler_class |
"...CloudSnapshotHandler" |
Handler implementation class name |
cloud_snapshot_handler_interval_second |
3600 | Background scheduling interval (seconds) |
cloud_snapshot_timeout_seconds |
600 | PREPARE state timeout (seconds) |
cloud_auto_snapshot_max_reversed_num |
35 | Maximum retained auto-snapshots |
cloud_auto_snapshot_min_interval_seconds |
3600 | Minimum auto-snapshot interval (seconds) |
multi_part_upload_part_size_in_bytes |
256MB | Multipart upload chunk size |
multi_part_upload_max_seconds |
3600 | Multipart upload timeout (seconds) |
multi_part_upload_pool_size |
10 | Multipart upload thread pool size |
3. Technical Constraints & Design Principles
3.1 Hard Constraints
| ID | Constraint | Impact |
|---|---|---|
| C-1 | SnapshotManager base class txn_kv_ is private |
Subclass must receive TxnKv reference via constructor, or base class must be changed to protected |
| C-2 | Versionstamp depends on FDB transaction commit | begin_snapshot must use atomic versionstamp operations within an FDB transaction |
| C-3 | CloudSnapshotHandler loaded via reflection |
Subclass must have a no-argument constructor |
| C-4 | MetaServiceImpl RPC preprocessing already completes parameter validation |
SnapshotManager methods do not need to re-validate cloud_unique_id |
| C-5 | Proto message definitions cannot be modified | All field semantics are finalized; no fields may be added or removed |
| C-6 | MasterDaemon.runOneCycle() waits for Catalog readiness |
runAfterCatalogReady() executes after FE image loading is complete |
| C-7 | snapshot_full_key uses encode_versioned_key encoding |
Range scans must use snapshot_key_prefix + range queries |
3.2 Design Trade-offs
| Trade-off | Decision | Rationale |
|---|---|---|
| Subclass vs. direct base class modification | Inherit via subclass | Aligns with framework design intent; does not affect other consumers |
| FE Image upload: synchronous vs. asynchronous | Asynchronous thread pool | Large Image uploads may take tens of minutes; cannot block the main thread |
| Snapshot concurrency: parallel vs. serial | Serial (single-thread pool) | Avoids consistency issues from concurrent checkpoints |
| Versionstamp generation: client vs. server | Server-side (FDB atomic op) | Ensures globally monotonic ordering; avoids clock drift |
| Auto-snapshot expiration: scheduled scan vs. event-driven | Scheduled scan | Reuses MasterDaemon scheduling framework; simple and reliable |
| Clone types: READ_ONLY only vs. all | Implement all | Proto already defines READ_ONLY/WRITABLE/ROLLBACK; full support required |
4. Core Implementation Plan
4.1 Layered Implementation Strategy
graph TB
subgraph "Phase 1: C++ SnapshotManager Core Implementation"
A1["DorisSnapshotManager extends SnapshotManager"]
A2["begin_snapshot: validation → build SnapshotPB → FDB atomic write"]
A3["update_snapshot: update upload_file/upload_id"]
A4["commit_snapshot: PREPARE→NORMAL + write finish_at/sizes"]
A5["abort_snapshot: →ABORTED + abort multipart upload"]
A6["drop_snapshot: →RECYCLED"]
A7["list_snapshot: range scan + SnapshotInfoPB conversion"]
A8["clone_instance: metadata clone + reference recording"]
end
subgraph "Phase 2: C++ Operational Capabilities"
B1["recycle_snapshots: TTL + max_reserved cleanup"]
B2["recycle_snapshot_meta_and_data: object store + KV dual deletion"]
B3["check_snapshots / inverted_check: consistency verification"]
B4["set_multi_version_status: MVCC state management"]
B5["migrate_to_versioned_keys: key migration"]
B6["compact_snapshot_chains: snapshot chain compression"]
end
subgraph "Phase 3: Java CloudSnapshotHandler Implementation"
C1["DorisCloudSnapshotHandler extends CloudSnapshotHandler"]
C2["initialize(): load persisted state + init S3 client"]
C3["runAfterCatalogReady(): auto-snapshot trigger check"]
C4["submitJob(ttl, label): five-step workflow"]
C5["refreshAutoSnapshotJob(): reload scheduling config"]
C6["cloneSnapshot(file): restore FE from snapshot"]
end
subgraph "Phase 4: Testing + Integration"
D1["C++ unit tests: cover all SnapshotManager methods"]
D2["Java unit tests: mock MetaServiceProxy"]
D3["Integration tests: end-to-end SQL command verification"]
D4["Regression tests: regression-test/suites"]
end
A1 --> A2 --> A3 --> A4 --> A5 --> A6 --> A7 --> A8
A8 --> B1 --> B2 --> B3 --> B4 --> B5 --> B6
B6 --> C1 --> C2 --> C3 --> C4 --> C5 --> C6
C6 --> D1 --> D2 --> D3 --> D4
4.2 Snapshot Lifecycle Core Workflow
sequenceDiagram
participant User as User/Scheduler
participant Handler as DorisCloudSnapshotHandler
participant Proxy as MetaServiceProxy
participant SnapMgr as DorisSnapshotManager
participant FDB as FoundationDB
participant ObjStore as Object Storage
User->>Handler: submitJob(ttl, label)
Handler->>Proxy: beginSnapshot(cloud_unique_id, label, timeout, ttl)
Proxy->>SnapMgr: begin_snapshot(instance_id, req, resp)
SnapMgr->>FDB: atomic write SnapshotPB{PREPARE} + get versionstamp
SnapMgr-->>Handler: {snapshot_id, image_url, obj_info}
Handler->>Handler: Catalog.checkpoint() to get last_journal_id
Handler->>ObjStore: multipart upload FE Image to image_url
loop Each part
Handler->>Proxy: updateSnapshot(snapshot_id, upload_file, upload_id)
Proxy->>SnapMgr: update_snapshot()
SnapMgr->>FDB: update SnapshotPB.upload_file/upload_id
end
Handler->>Proxy: commitSnapshot(snapshot_id, image_url, journal_id, sizes)
Proxy->>SnapMgr: commit_snapshot()
SnapMgr->>FDB: update SnapshotPB{NORMAL, finish_at}
Note over Handler,ObjStore: Error path
Handler->>Proxy: abortSnapshot(snapshot_id, reason)
Proxy->>SnapMgr: abort_snapshot()
SnapMgr->>FDB: update SnapshotPB{ABORTED}
SnapMgr->>ObjStore: abort_multipart_upload(path, upload_id)
5. C++ DorisSnapshotManager Method-Level Design
5.1 Class Definition
File: cloud/src/snapshot/doris_snapshot_manager.h
class DorisSnapshotManager : public SnapshotManager {
public:
DorisSnapshotManager(std::shared_ptr<TxnKv> txn_kv);
~DorisSnapshotManager() override = default;
void begin_snapshot(std::string_view instance_id, const BeginSnapshotRequest& request,
BeginSnapshotResponse* response) override;
void update_snapshot(std::string_view instance_id, const UpdateSnapshotRequest& request,
UpdateSnapshotResponse* response) override;
void commit_snapshot(std::string_view instance_id, const CommitSnapshotRequest& request,
CommitSnapshotResponse* response) override;
void abort_snapshot(std::string_view instance_id, const AbortSnapshotRequest& request,
AbortSnapshotResponse* response) override;
void drop_snapshot(std::string_view instance_id, const DropSnapshotRequest& request,
DropSnapshotResponse* response) override;
void list_snapshot(std::string_view instance_id, const ListSnapshotRequest& request,
ListSnapshotResponse* response) override;
void clone_instance(const CloneInstanceRequest& request,
CloneInstanceResponse* response) override;
std::pair<MetaServiceCode, std::string> set_multi_version_status(
std::string_view instance_id, MultiVersionStatus multi_version_status) override;
int check_snapshots(InstanceChecker* checker) override;
int inverted_check_snapshots(InstanceChecker* checker) override;
int check_mvcc_meta_key(InstanceChecker* checker) override;
int inverted_check_mvcc_meta_key(InstanceChecker* checker) override;
int check_meta(MetaChecker* meta_checker) override;
int recycle_snapshots(InstanceRecycler* recycler) override;
int recycle_snapshot_meta_and_data(std::string_view instance_id,
std::string_view resource_id,
StorageVaultAccessor* accessor,
Versionstamp snapshot_version,
const SnapshotPB& snapshot_pb) override;
int migrate_to_versioned_keys(InstanceDataMigrator* migrator) override;
int compact_snapshot_chains(InstanceChainCompactor* compactor) override;
private:
std::shared_ptr<TxnKv> txn_kv_; // Independently held reference
// Internal helper methods
int read_snapshot_pb(std::string_view instance_id, const Versionstamp& vs, SnapshotPB* pb);
int write_snapshot_pb(std::string_view instance_id, const Versionstamp& vs, const SnapshotPB& pb);
int read_instance_info(std::string_view instance_id, InstanceInfoPB* info);
int get_storage_vault_accessor(const InstanceInfoPB& instance, const std::string& resource_id,
std::unique_ptr<StorageVaultAccessor>* accessor);
bool validate_ip_address(const std::string& ip);
std::string build_image_url(const InstanceInfoPB& instance, const std::string& snapshot_id);
};5.2 Method Implementation Specifications
| Method | Key Input Fields | Core Implementation Logic | Key Return Fields | Error Codes |
|---|---|---|---|---|
begin_snapshot |
snapshot_label, timeout_seconds, ttl_seconds, auto_snapshot, request_ip |
1. Validate parameters (timeout>0, ttl>0, label non-empty); 2. Optionally validate IP format (IPv4/IPv6); 3. Read InstanceInfoPB to obtain obj_info or storage_vault config; 4. Use FDB atomic versionstamp operation to write SnapshotPB{PREPARE, create_at=now()}; 5. Construct image_url = <prefix>/snapshot/<snapshot_id>/image/ |
snapshot_id, image_url, obj_info |
INVALID_ARGUMENT / ALREADY_EXISTED |
update_snapshot |
snapshot_id, upload_file, upload_id |
1. Parse snapshot_id to Versionstamp; 2. Read SnapshotPB, verify status==PREPARE; 3. Update upload_file / upload_id; 4. Write back to TxnKv |
status only | INVALID_ARGUMENT / SNAPSHOT_NOT_FOUND |
commit_snapshot |
snapshot_id, image_url, last_journal_id, snapshot_meta_image_size, snapshot_logical_data_size |
1. Read SnapshotPB, verify status==PREPARE; 2. Timeout check now < create_at + timeout_seconds; 3. Update status to NORMAL, write finish_at, last_journal_id, sizes; 4. Write back to TxnKv |
status only | SNAPSHOT_EXPIRED / SNAPSHOT_NOT_FOUND |
abort_snapshot |
snapshot_id, reason |
1. Read SnapshotPB; 2. If upload_file/upload_id exist, abort multipart upload via StorageVaultAccessor; 3. Status→ABORTED, write reason; 4. Write back to TxnKv |
status only | SNAPSHOT_NOT_FOUND |
drop_snapshot |
snapshot_id |
1. Read SnapshotPB; 2. Only allow deletion of NORMAL/ABORTED snapshots; 3. Check if reference keys exist (if derived instances exist, only mark RECYCLED; otherwise can clean immediately) | status only | INVALID_ARGUMENT / SNAPSHOT_NOT_FOUND |
list_snapshot |
required_snapshot_id, include_aborted, instance_id |
1. Range scan TxnKv with snapshot_key_prefix(instance_id) as prefix; 2. Deserialize each SnapshotPB to SnapshotInfoPB; 3. Query reference keys to populate derived_instance_ids; 4. Filter by include_aborted; 5. Sort by creation time descending |
repeated SnapshotInfoPB |
— |
clone_instance |
from_snapshot_id, from_instance_id, new_instance_id, clone_type |
1. Read source snapshot SnapshotPB (verify NORMAL); 2. Read source instance InstanceInfoPB; 3. Construct new instance metadata by clone_type; 4. Write snapshot_reference_key to record derivation; 5. Write new instance InstanceInfoPB | obj_info, image_url |
INVALID_ARGUMENT / SNAPSHOT_NOT_FOUND |
recycle_snapshots |
InstanceRecycler* |
Scan all SnapshotPBs: a) TTL expired (now > create_at + ttl_seconds) → mark RECYCLED; b) status==RECYCLED → call recycle_snapshot_meta_and_data(); c) NORMAL count > max_reserved → mark oldest as RECYCLED |
0/non-zero | — |
recycle_snapshot_meta_and_data |
instance_id, resource_id, StorageVaultAccessor*, Versionstamp, SnapshotPB |
1. accessor->delete_directory(image_url) to delete object storage files; 2. Delete snapshot_full_key from TxnKv; 3. Delete all associated snapshot_reference_keys |
0/non-zero | — |
5.3 begin_snapshot Key Implementation (Simulated Walkthrough)
Input: instance_id="inst_001", label="daily_backup", timeout=3600, ttl=86400, auto=true
Step 1: Validation
- timeout_seconds=3600 > 0 ✓
- ttl_seconds=86400 > 0 ✓
- label="daily_backup" non-empty ✓
Step 2: Read Instance Info
- key: instance_key({instance_id}) → InstanceInfoPB
- Obtain obj_info[0] = {bucket="my-bucket", prefix="doris-data", endpoint="s3.cn-north.amazonaws.com"}
- resource_id = obj_info[0].id
Step 3: FDB Atomic Write
- snapshot_full_key = versioned::snapshot_full_key({instance_id})
- Construct SnapshotPB:
status = SNAPSHOT_PREPARE
type = SNAPSHOT_REFERENCE
instance_id = "inst_001"
create_at = 1742313600 (current UNIX timestamp)
timeout_seconds = 3600
ttl_seconds = 86400
auto = true
label = "daily_backup"
resource_id = obj_info[0].id
- txn->atomic_set_versionstamped_key(snapshot_full_key, SnapshotPB)
- txn->commit() → obtain versionstamp
Step 4: Construct Response
- snapshot_id = serialize_snapshot_id(versionstamp) → "0a1b2c3d4e5f6789a0b1"
- image_url = "doris-data/snapshot/0a1b2c3d4e5f6789a0b1/image/"
- obj_info = instance's ObjectStoreInfoPB
Result: response = {snapshot_id, image_url, obj_info} ✓
5.4 clone_instance Three Clone Types Implementation
| Field | READ_ONLY | WRITABLE | ROLLBACK |
|---|---|---|---|
ready_only |
true |
false |
false |
source_instance_id |
Source instance ID | Source instance ID | Source instance ID |
source_snapshot_id |
Snapshot ID | Snapshot ID | Snapshot ID |
original_instance_id |
— | — | Earliest source instance ID |
successor_instance_id |
— | — | Previous inherited instance ID |
obj_info |
Inherited from source | Uses new obj_info from request | Inherited from source |
storage_vault |
Inherited from source | Uses new vault from request | Inherited from source |
| Data visibility | Read-only source data | Independent copy | Rolled back to snapshot point |
6. Java DorisCloudSnapshotHandler Detailed Design
6.1 Class Definition
File: fe/fe-core/src/main/java/org/apache/doris/cloud/snapshot/DorisCloudSnapshotHandler.java
public class DorisCloudSnapshotHandler extends CloudSnapshotHandler {
private ExecutorService snapshotExecutor; // Single-thread serial execution
private volatile SnapshotState currentSnapshot; // Currently in-progress snapshot
@Override
public void initialize() {
snapshotExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("snapshot-worker-%d").setDaemon(true).build());
// Load FE persisted snapshot state (if any)
}
@Override
protected void runAfterCatalogReady() { /* Auto-snapshot trigger logic */ }
@Override
public void submitJob(long ttl, String label) throws Exception { /* Manual snapshot submission */ }
@Override
public synchronized void refreshAutoSnapshotJob() throws Exception { /* Reload auto-snapshot config */ }
@Override
public void cloneSnapshot(String clusterSnapshotFile) throws Exception { /* Restore from snapshot */ }
}6.2 Method Implementation Specifications
| Method | Core Implementation Logic |
|---|---|
initialize() |
1. Create single-thread snapshotExecutor; 2. Initialize object storage client (based on AK/SK/Endpoint from obj_info); 3. Load FE persisted SnapshotState (if an in-progress snapshot exists, check whether abort is needed) |
runAfterCatalogReady() |
1. Execute only on Master (Env.isMaster() check); 2. Call MetaServiceProxy.getInstance().getSnapshotProperties() to get switch_status / max_reserved / interval; 3. Skip if switch_status != SNAPSHOT_SWITCH_ON; 4. Call listSnapshot(false) to get NORMAL list; 5. Trigger if now - latest snapshot time >= snapshot_interval_seconds; 6. Abort PREPARE snapshots exceeding cloud_snapshot_timeout_seconds |
submitJob(ttl, label) |
1. Verify no in-progress PREPARE snapshot; 2. Submit async task to snapshotExecutor; 3. Async task executes five-step workflow: beginSnapshot → checkpoint → multipartUpload → updateSnapshot → commitSnapshot; 4. Call abortSnapshot on any step failure |
refreshAutoSnapshotJob() |
1. Read latest auto-snapshot configuration from MetaService; 2. If scheduling interval changed, call setInterval() to update MasterDaemon.intervalMs |
cloneSnapshot(file) |
1. Parse clusterSnapshotFile to obtain snapshot_id; 2. Call MetaServiceProxy.cloneInstance(); 3. Download FE Image locally from obj_info + image_url; 4. FE framework automatically rebuilds metadata from the Image |
6.3 FE Image Upload Five-Step Workflow
Step 1: beginSnapshot(cloud_unique_id, label, timeout=600, ttl)
→ Obtain snapshot_id, image_url, obj_info
Step 2: Catalog.checkpoint()
→ Obtain last_journal_id, image file path
Step 3: S3 Multipart Upload Initialization
→ Create S3Client based on obj_info
→ initiateMultipartUpload(image_url + "image.xxx")
→ Obtain upload_id
Step 4: updateSnapshot(snapshot_id, upload_file, upload_id)
→ Record upload info for fault recovery
Step 5: Chunked Upload
→ Split by multi_part_upload_part_size_in_bytes (256MB)
→ Parallel upload (pool size = multi_part_upload_pool_size)
→ completeMultipartUpload
Step 6: commitSnapshot(snapshot_id, image_url, last_journal_id, meta_size, data_size)
→ Snapshot complete
6.4 Auto-Snapshot Trigger Decision Flow
flowchart TD
A["runAfterCatalogReady() triggered every interval seconds"] --> B{Env.isMaster?}
B -- No --> Z["Skip"]
B -- Yes --> C["getSnapshotProperties()"]
C --> D{switch_status == ON?}
D -- No --> Z
D -- Yes --> E["listSnapshot(false) get NORMAL list"]
E --> F{NORMAL list empty?}
F -- Yes --> G["Trigger snapshot immediately"]
F -- No --> H{now - latest.finish_at >= interval?}
H -- No --> I["Check PREPARE timeout"]
H -- Yes --> J{NORMAL count >= max_reserved?}
J -- No --> G
J -- Yes --> K["Recycle oldest NORMAL snapshot"]
K --> G
I --> L{PREPARE timed out?}
L -- Yes --> M["abortSnapshot + trigger new snapshot"]
L -- No --> Z
G --> N["submitJob(auto_ttl, auto_label)"]
6.5 Configuration Injection
The FE side needs to update the cloud_snapshot_handler_class default value in Config.java (or configure via fe.conf):
# fe.conf
cloud_snapshot_handler_class = org.apache.doris.cloud.snapshot.DorisCloudSnapshotHandler7. Recycling & Cleanup Mechanism
7.1 Recycling Trigger Conditions
| Condition | Judgment Logic | Action |
|---|---|---|
| TTL expired (manual snapshot) | now > snapshot.create_at + snapshot.ttl_seconds |
Mark RECYCLED |
| Exceeds max retention (auto snapshot) | NORMAL count > instance.max_reserved_snapshot |
Mark oldest NORMAL as RECYCLED |
| Already marked RECYCLED | snapshot.status == SNAPSHOT_RECYCLED |
Execute actual cleanup |
| ABORTED state | snapshot.status == SNAPSHOT_ABORTED |
Clean up residual uploads + delete KV |
| PREPARE timeout | now > snapshot.create_at + snapshot.timeout_seconds |
Mark ABORTED |
7.2 Cleanup Workflow
flowchart TD
A["Recycler background instance scan"] --> B["InstanceRecycler.recycle()"]
B --> C["DorisSnapshotManager.recycle_snapshots(recycler)"]
C --> D["Range scan TxnKv: snapshot_key_prefix(instance_id)"]
D --> E{Iterate each SnapshotPB}
E --> F{TTL expired or PREPARE timeout?}
F -- Yes --> G["Mark status as RECYCLED/ABORTED"]
F -- No --> H{Status == RECYCLED or ABORTED?}
H -- Yes --> I["recycle_snapshot_meta_and_data()"]
I --> J["accessor->delete_directory(image_url)"]
I --> K["Delete snapshot_full_key"]
I --> L["Delete associated snapshot_reference_keys"]
H -- No --> M{NORMAL count > max_reserved?}
M -- Yes --> N["Mark oldest NORMAL as RECYCLED"]
N --> I
M -- No --> O["Skip"]
E --> P{Continue to next?}
P -- Yes --> E
P -- No --> Q["Done"]
8. Consistency Verification Design
8.1 Forward Verification (check_snapshots)
Goal: Ensure all NORMAL-state SnapshotPBs in TxnKv have corresponding files in object storage.
for each SnapshotPB where status == NORMAL:
if accessor->exists(snapshot.image_url) != 0:
LOG(ERROR) << "Snapshot " << snapshot_id << " image not found at " << image_url
error_count++
8.2 Inverse Verification (inverted_check_snapshots)
Goal: Ensure all files under the snapshot path in object storage have corresponding SnapshotPB metadata.
accessor->list_directory("snapshot/")
for each file in listing:
extract snapshot_id from path
if read_snapshot_pb(instance_id, parse_versionstamp(snapshot_id)) fails:
LOG(ERROR) << "Orphan snapshot file: " << file.path
orphan_count++
8.3 MVCC Metadata Verification (check_mvcc_meta_key / inverted_check_mvcc_meta_key)
Verifies consistency of multi-version keys (0x03 keyspace) for partition/tablet/rowset metadata across the snapshot chain.
9. Injection Point Modification Checklist
9.1 meta_server.cpp Modification
// Before:
auto snapshot_mgr = std::make_shared<SnapshotManager>(txn_kv_);
// After:
auto snapshot_mgr = std::make_shared<DorisSnapshotManager>(txn_kv_);9.2 recycler.cpp Modification
// Before:
snapshot_manager_ = std::make_shared<SnapshotManager>(txn_kv_);
// After:
snapshot_manager_ = std::make_shared<DorisSnapshotManager>(txn_kv_);9.3 CMakeLists.txt Modification
Add new source file in cloud/CMakeLists.txt:
set(SNAPSHOT_SRCS
src/snapshot/snapshot_manager.cpp
src/snapshot/doris_snapshot_manager.cpp # New
)10. File Change Manifest
10.1 New C++ Files
| File | Description |
|---|---|
cloud/src/snapshot/doris_snapshot_manager.h |
DorisSnapshotManager class definition |
cloud/src/snapshot/doris_snapshot_manager.cpp |
All 18 method implementations |
cloud/test/doris_snapshot_manager_test.cpp |
Complete unit tests |
10.2 Modified C++ Files
| File | Modification |
|---|---|
cloud/src/meta-service/meta_server.cpp |
L82: SnapshotManager → DorisSnapshotManager |
cloud/src/recycler/recycler.cpp |
L594: SnapshotManager → DorisSnapshotManager |
cloud/CMakeLists.txt |
Add doris_snapshot_manager.cpp to build list |
10.3 New Java Files
| File | Description |
|---|---|
fe/fe-core/src/main/java/org/apache/doris/cloud/snapshot/DorisCloudSnapshotHandler.java |
Handler implementation |
fe/fe-core/src/test/java/org/apache/doris/cloud/snapshot/DorisCloudSnapshotHandlerTest.java |
Unit tests |
10.4 Modified Java Files
| File | Modification |
|---|---|
fe/fe-common/src/main/java/org/apache/doris/common/Config.java |
Update cloud_snapshot_handler_class default value |
10.5 Test Files
| File | Description |
|---|---|
cloud/test/doris_snapshot_manager_test.cpp |
C++ unit tests (all methods) |
cloud/test/meta_service_snapshot_test.cpp |
Enable and extend existing DISABLED tests |
regression-test/suites/cloud_p0/snapshot/ |
Groovy integration tests |
Use case
No response
Related issues
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct