Skip to content

Commit

Permalink
db-backup: state snapshot backup pipelined (#13587)
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Jun 6, 2024
1 parent b3c676a commit acb6c89
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 68 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions consensus/consensus-types/src/wrapped_ledger_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{quorum_cert::QuorumCert, vote_data::VoteData};
use anyhow::{ensure, Context};
use aptos_crypto::hash::CryptoHash;
use aptos_types::{
aggregate_signature, block_info::BlockInfo, ledger_info::LedgerInfoWithSignatures,
block_info::BlockInfo, ledger_info::LedgerInfoWithSignatures,
validator_verifier::ValidatorVerifier,
};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -50,7 +50,7 @@ impl WrappedLedgerInfo {
vote_data: VoteData::dummy(),
signed_ledger_info: LedgerInfoWithSignatures::new(
aptos_types::ledger_info::LedgerInfo::dummy(),
aggregate_signature::AggregateSignature::empty(),
aptos_types::aggregate_signature::AggregateSignature::empty(),
),
}
}
Expand Down
1 change: 1 addition & 0 deletions storage/backup/backup-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ aptos-executor-types = { workspace = true }
aptos-infallible = { workspace = true }
aptos-jellyfish-merkle = { workspace = true }
aptos-logger = { workspace = true }
aptos-metrics-core = { workspace = true }
aptos-proptest-helpers = { workspace = true }
aptos-push-metrics = { workspace = true }
aptos-storage-interface = { workspace = true }
Expand Down
255 changes: 190 additions & 65 deletions storage/backup/backup-cli/src/backup_types/state_snapshot/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,29 @@
use crate::{
backup_types::state_snapshot::manifest::{StateSnapshotBackup, StateSnapshotChunk},
metadata::Metadata,
metrics::backup::BACKUP_TIMER,
storage::{BackupHandleRef, BackupStorage, FileHandle, ShellSafeName},
utils::{
backup_service_client::BackupServiceClient, read_record_bytes::ReadRecordBytes,
should_cut_chunk, storage_ext::BackupStorageExt, GlobalBackupOpt,
should_cut_chunk, storage_ext::BackupStorageExt, stream::TryStreamX, GlobalBackupOpt,
},
};
use anyhow::{anyhow, Result};
use anyhow::{anyhow, ensure, Result};
use aptos_crypto::{hash::CryptoHash, HashValue};
use aptos_logger::prelude::*;
use aptos_metrics_core::TimerHelper;
use aptos_types::{
ledger_info::LedgerInfoWithSignatures,
proof::TransactionInfoWithProof,
state_store::{state_key::StateKey, state_value::StateValue},
transaction::Version,
};
use bytes::Bytes;
use bytes::{BufMut, Bytes, BytesMut};
use clap::Parser;
use futures::TryStreamExt;
use once_cell::sync::Lazy;
use std::{convert::TryInto, str::FromStr, sync::Arc};
use tokio::{io::AsyncWriteExt, time::Instant};
use std::{convert::TryInto, str::FromStr, sync::Arc, time::Instant};
use tokio::io::{AsyncRead, AsyncWriteExt};

#[derive(Parser)]
pub struct StateSnapshotBackupOpt {
Expand All @@ -35,6 +38,159 @@ pub struct StateSnapshotBackupOpt {
pub epoch: u64,
}

struct Chunk {
bytes: Bytes,
first_key: HashValue,
first_idx: usize,
last_key: HashValue,
last_idx: usize,
}

struct ChunkerState<R> {
state_snapshot_file: Option<R>,
buf: BytesMut,
chunk_first_key: HashValue,
prev_record_len: usize,
current_idx: usize,
chunk_first_idx: usize,
max_chunk_size: usize,
}

impl<R: AsyncRead + Send + Unpin> ChunkerState<R> {
async fn new(mut state_snapshot_file: R, max_chunk_size: usize) -> Result<Self> {
let first_record = state_snapshot_file
.read_record_bytes()
.await?
.ok_or_else(|| anyhow!("State is empty."))?;

let chunk_first_key = Self::parse_key(&first_record)?;
let prev_record_len = first_record.len();

let mut buf = BytesMut::new();
buf.put_slice(&(first_record.len() as u32).to_be_bytes());
buf.extend(first_record);

Ok(Self {
state_snapshot_file: Some(state_snapshot_file),
buf,
chunk_first_key,
prev_record_len,
current_idx: 0,
chunk_first_idx: 0,
max_chunk_size,
})
}

async fn next_full_chunk(&mut self) -> Result<Option<Chunk>> {
let _timer = BACKUP_TIMER.timer_with(&["state_snapshot_next_full_chunk"]);

let input = self
.state_snapshot_file
.as_mut()
.expect("get_next_full_chunk after EOF.");

while let Some(record_bytes) = input.read_record_bytes().await? {
let _timer = BACKUP_TIMER.timer_with(&["state_snapshot_process_records"]);

// If buf + current_record exceeds max_chunk_size, dump current buf to a new chunk
let chunk_cut_opt = should_cut_chunk(&self.buf, &record_bytes, self.max_chunk_size)
.then(|| {
let bytes = self.buf.split().freeze();
let last_key = Self::parse_key(&bytes[bytes.len() - self.prev_record_len..])?;

let chunk = Chunk {
bytes,
first_key: self.chunk_first_key,
first_idx: self.chunk_first_idx,
last_key,
last_idx: self.current_idx,
};

self.chunk_first_idx = self.current_idx + 1;
self.chunk_first_key = Self::parse_key(&record_bytes)?;

Result::<_>::Ok(chunk)
})
.transpose()?;

// Append record to buf
self.prev_record_len = record_bytes.len();
self.buf
.put_slice(&(record_bytes.len() as u32).to_be_bytes());
self.buf.extend(record_bytes);
self.current_idx += 1;

// Return the full chunk if found
if let Some(chunk) = chunk_cut_opt {
// FIXME(aldenhu): add logging, maybe not here
return Ok(Some(chunk));
}
}

// Input file ended, full chunk not found.
// The call site will call get_last_chunk which consume ChunkerState
let _ = self.state_snapshot_file.take();
Ok(None)
}

async fn last_chunk(self) -> Result<Chunk> {
let Self {
state_snapshot_file,
buf,
chunk_first_key,
prev_record_len,
current_idx,
chunk_first_idx,
max_chunk_size: _,
} = self;
ensure!(
state_snapshot_file.is_none(),
"get_last_chunk called before EOF"
);
ensure!(!buf.is_empty(), "Last chunk can't be empty");

let bytes = buf.freeze();
let last_key = Self::parse_key(&bytes[bytes.len() - prev_record_len..])?;

Ok(Chunk {
bytes,
first_key: chunk_first_key,
first_idx: chunk_first_idx,
last_key,
last_idx: current_idx,
})
}

fn parse_key(record: &[u8]) -> Result<HashValue> {
let (key, _): (StateKey, StateValue) = bcs::from_bytes(record)?;
Ok(key.hash())
}
}

struct Chunker<R> {
state: Option<ChunkerState<R>>,
}

impl<R: AsyncRead + Send + Unpin> Chunker<R> {
async fn new(state_snapshot_file: R, max_chunk_size: usize) -> Result<Self> {
Ok(Self {
state: Some(ChunkerState::new(state_snapshot_file, max_chunk_size).await?),
})
}

async fn next_chunk(&mut self) -> Result<Option<Chunk>> {
let ret = match self.state.as_mut() {
None => None,
Some(state) => match state.next_full_chunk().await? {
Some(chunk) => Some(chunk),
None => Some(self.state.take().unwrap().last_chunk().await?),
},
};

Ok(ret)
}
}

pub struct StateSnapshotBackupController {
epoch: u64,
version: Option<Version>, // initialize before using
Expand Down Expand Up @@ -76,63 +232,31 @@ impl StateSnapshotBackupController {
.create_backup_with_random_suffix(&self.backup_name())
.await?;

let mut chunks = vec![];

let mut state_snapshot_file = self.client.get_state_snapshot(self.version()).await?;
let mut prev_record_bytes = state_snapshot_file
.read_record_bytes()
.await?
.ok_or_else(|| anyhow!("State is empty."))?;
let mut chunk_bytes = (prev_record_bytes.len() as u32).to_be_bytes().to_vec();
chunk_bytes.extend(&prev_record_bytes);
let mut chunk_first_key = Self::parse_key(&prev_record_bytes)?;
let mut current_idx: usize = 0;
let mut chunk_first_idx: usize = 0;
let state_snapshot_file = self.client.get_state_snapshot(self.version()).await?;
let chunker = Chunker::new(state_snapshot_file, self.max_chunk_size).await?;

let start = Instant::now();
while let Some(record_bytes) = state_snapshot_file.read_record_bytes().await? {
if should_cut_chunk(&chunk_bytes, &record_bytes, self.max_chunk_size) {
let chunk = self
.write_chunk(
&backup_handle,
&chunk_bytes,
chunk_first_idx,
current_idx,
chunk_first_key,
Self::parse_key(&prev_record_bytes)?,
)
.await?;
chunks.push(chunk);
chunk_bytes = vec![];
chunk_first_idx = current_idx + 1;
chunk_first_key = Self::parse_key(&record_bytes)?;
let chunk_stream = futures::stream::try_unfold(chunker, |mut chunker| async {
Ok(chunker.next_chunk().await?.map(|chunk| (chunk, chunker)))
});

let chunk_manifest_fut_stream =
chunk_stream.map_ok(|chunk| self.write_chunk(&backup_handle, chunk));

let chunks: Vec<_> = chunk_manifest_fut_stream
.try_buffered_x(8, 4) // 4 concurrently, at most 8 results in buffer.
.map_ok(|chunk_manifest| {
let last_idx = chunk_manifest.last_idx;
info!(
last_idx = current_idx,
last_idx = last_idx,
values_per_second =
((current_idx + 1) as f64 / start.elapsed().as_secs_f64()) as u64,
((last_idx + 1) as f64 / start.elapsed().as_secs_f64()) as u64,
"Chunk written."
);
}

current_idx += 1;
chunk_bytes.extend((record_bytes.len() as u32).to_be_bytes());
chunk_bytes.extend(&record_bytes);
prev_record_bytes = record_bytes;
}

assert!(!chunk_bytes.is_empty());
let chunk = self
.write_chunk(
&backup_handle,
&chunk_bytes,
chunk_first_idx,
current_idx,
chunk_first_key,
Self::parse_key(&prev_record_bytes)?,
)
chunk_manifest
})
.try_collect()
.await?;
chunks.push(chunk);

self.write_manifest(&backup_handle, chunks).await
}
Expand Down Expand Up @@ -169,11 +293,6 @@ impl StateSnapshotBackupController {
.unwrap()
}

fn parse_key(record: &Bytes) -> Result<HashValue> {
let (key, _): (StateKey, StateValue) = bcs::from_bytes(record)?;
Ok(key.hash())
}

async fn get_version_for_epoch_ending(&self, epoch: u64) -> Result<u64> {
let ledger_info: LedgerInfoWithSignatures = bcs::from_bytes(
self.client
Expand All @@ -192,17 +311,23 @@ impl StateSnapshotBackupController {
async fn write_chunk(
&self,
backup_handle: &BackupHandleRef,
chunk_bytes: &[u8],
first_idx: usize,
last_idx: usize,
first_key: HashValue,
last_key: HashValue,
chunk: Chunk,
) -> Result<StateSnapshotChunk> {
let _timer = BACKUP_TIMER.timer_with(&["state_snapshot_write_chunk"]);

let Chunk {
bytes,
first_idx,
last_idx,
first_key,
last_key,
} = chunk;

let (chunk_handle, mut chunk_file) = self
.storage
.create_for_write(backup_handle, &Self::chunk_name(first_idx))
.await?;
chunk_file.write_all(chunk_bytes).await?;
chunk_file.write_all(&bytes).await?;
chunk_file.shutdown().await?;
let (proof_handle, mut proof_file) = self
.storage
Expand Down
14 changes: 13 additions & 1 deletion storage/backup/backup-cli/src/metrics/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use aptos_push_metrics::{register_int_gauge, IntGauge};
use aptos_push_metrics::{
exponential_buckets, register_histogram_vec, register_int_gauge, HistogramVec, IntGauge,
};
use once_cell::sync::Lazy;

pub static HEARTBEAT_TS: Lazy<IntGauge> = Lazy::new(|| {
Expand Down Expand Up @@ -44,3 +46,13 @@ pub static COMPACTED_TXN_VERSION: Lazy<IntGauge> = Lazy::new(|| {
)
.unwrap()
});

pub static BACKUP_TIMER: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"aptos_db_backup_timers_seconds",
"Various timers for performance analysis.",
&["name"],
exponential_buckets(/*start=*/ 1e-6, /*factor=*/ 2.0, /*count=*/ 32).unwrap(),
)
.unwrap()
});

0 comments on commit acb6c89

Please sign in to comment.