Skip to content

Commit

Permalink
Merge pull request #629 from EspressoSystems/abdul/drop-write-lock
Browse files Browse the repository at this point in the history
sleep and drop write lock after each batch delete
  • Loading branch information
imabdulbasit committed Jun 6, 2024
2 parents 0b633b1 + cddf8c6 commit 0515f4a
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 52 deletions.
48 changes: 26 additions & 22 deletions src/data_source/fetching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,33 +358,37 @@ where
};
};

let task = {
BackgroundTask::spawn("pruner", async move {
for i in 1.. {
tracing::warn!("starting pruner run {i} ");
{
let mut storage = fetcher.storage.write().await;

match storage.storage.prune().await {
Ok(Some(height)) => {
storage.pruned_height = Some(height);
tracing::warn!(
"pruner run {i} succeeded. Pruned to height {height}"
);
}
Ok(None) => (),
Err(e) => {
tracing::error!("pruner run {i} failed: {e:?}");
storage.revert().await;
}
let future = async move {
for i in 1.. {
tracing::warn!("starting pruner run {i} ");
// We loop until the whole run pruner run is complete
// This allows write lock to be released after each batch delete
loop {
let mut storage = fetcher.storage.write().await;

match storage.storage.prune().await {
Ok(Some(height)) => {
storage.pruned_height = Some(height);
tracing::warn!("Pruned to height {height}");
}
Ok(None) => {
tracing::warn!("pruner run {i} complete.");
break;
}
Err(e) => {
tracing::error!("pruner run {i} failed: {e:?}");
storage.revert().await;
break;
}
}

sleep(cfg.interval()).await;
}
})

sleep(cfg.interval()).await;
}
};

let task = BackgroundTask::spawn("pruner", future);

Self {
handle: Some(task),
_types: Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion src/data_source/storage/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl Default for PrunerCfg {
minimum_retention: Duration::from_secs(24 * 3600),
// 7 days
target_retention: Duration::from_secs(7 * 24 * 3600),
batch_size: 1000,
batch_size: 30000,
// 80%
max_usage: 8000,
// 1.5 hour
Expand Down
115 changes: 86 additions & 29 deletions src/data_source/storage/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,37 @@ impl Config {
pub struct SqlStorage {
client: Arc<Client>,
tx_in_progress: bool,
pruner_cfg: Option<PrunerCfg>,
pruner: Pruner,
_connection: BackgroundTask,
}

#[derive(Debug, Default)]
pub struct Pruner {
enabled: bool,
config: PrunerCfg,
pruned_height: Option<u64>,
target_height: Option<u64>,
minimum_retention_height: Option<u64>,
}

impl Pruner {
fn new(config: PrunerCfg) -> Self {
Pruner {
enabled: true,
config,
pruned_height: None,
target_height: None,
minimum_retention_height: None,
}
}

fn reset(&mut self) {
self.pruned_height = None;
self.target_height = None;
self.minimum_retention_height = None;
}
}

impl SqlStorage {
/// Connect to a remote database.
pub async fn connect(mut config: Config) -> Result<Self, Error> {
Expand Down Expand Up @@ -430,10 +457,12 @@ impl SqlStorage {
}
}

let pruner = config.pruner_cfg.map(Pruner::new).unwrap_or_default();

Ok(Self {
client: Arc::new(client),
tx_in_progress: false,
pruner_cfg: config.pruner_cfg,
pruner,
_connection: connection,
})
}
Expand All @@ -456,11 +485,16 @@ impl SqlStorage {

impl PrunerConfig for SqlStorage {
fn set_pruning_config(&mut self, cfg: PrunerCfg) {
self.pruner_cfg = Some(cfg);
self.pruner.enabled = true;
self.pruner.config = cfg;
}

fn get_pruning_config(&self) -> Option<PrunerCfg> {
self.pruner_cfg.clone()
if !self.pruner.enabled {
return None;
}

Some(self.pruner.config.clone())
}
}

Expand Down Expand Up @@ -583,39 +617,55 @@ impl PruneStorage for SqlStorage {
let cfg = self.get_pruning_config().ok_or(QueryError::Error {
message: "Pruning config not found".to_string(),
})?;
let batch_size = cfg.batch_size();
let max_usage = cfg.max_usage();

// If a pruner run was already in progress, some variables may already be set,
// depending on whether a batch was deleted and which batch it was (target or minimum retention).
// This enables us to resume the pruner run from the exact heights.
// If any of these values are not set, they can be loaded from the database if necessary.
let mut minimum_retention_height = self.pruner.minimum_retention_height;
let mut target_height = self.pruner.target_height;
let mut height = match self.pruner.pruned_height {
Some(h) => h,
None => {
let Some(height) = self.get_minimum_height().await? else {
tracing::info!("database is empty, nothing to prune");
return Ok(None);
};

let Some(mut height) = self.get_minimum_height().await? else {
tracing::info!("database is empty, nothing to prune");
return Ok(None);
height
}
};

let batch_size = cfg.batch_size();
let max_usage = cfg.max_usage();
let mut pruned_height = None;
// Prune data exceeding target retention in batches
let target_height = self
.get_height_by_timestamp(
Utc::now().timestamp() - (cfg.target_retention().as_secs()) as i64,
)
.await?;
if self.pruner.target_height.is_none() {
let th = self
.get_height_by_timestamp(
Utc::now().timestamp() - (cfg.target_retention().as_secs()) as i64,
)
.await?;
target_height = th;
self.pruner.target_height = target_height;
};

if let Some(target_height) = target_height {
while height < target_height {
if height < target_height {
height = min(height + batch_size, target_height);
self.delete_batch(height).await?;
self.commit().await.map_err(|e| QueryError::Error {
message: format!("failed to commit {e}"),
})?;
pruned_height = Some(height);

tracing::warn!("Pruned data up to height {height}");
self.pruner.pruned_height = Some(height);
return Ok(Some(height));
}
}

// If threshold is set, prune data exceeding minimum retention in batches
// This parameter is needed for SQL storage as there is no direct way to get free space.
if let Some(threshold) = cfg.pruning_threshold() {
let mut usage = self.get_disk_usage().await?;
let usage = self.get_disk_usage().await?;

// Prune data exceeding minimum retention in batches starting from minimum height
// until usage is below threshold
Expand All @@ -624,31 +674,38 @@ impl PruneStorage for SqlStorage {
"Disk usage {usage} exceeds pruning threshold {:?}",
cfg.pruning_threshold()
);
let minimum_retention_height = self
.get_height_by_timestamp(
Utc::now().timestamp() - (cfg.minimum_retention().as_secs()) as i64,
)
.await?;

if minimum_retention_height.is_none() {
minimum_retention_height = self
.get_height_by_timestamp(
Utc::now().timestamp() - (cfg.minimum_retention().as_secs()) as i64,
)
.await?;

self.pruner.minimum_retention_height = minimum_retention_height;
}

if let Some(min_retention_height) = minimum_retention_height {
while (usage as f64 / threshold as f64) > (f64::from(max_usage) / 10000.0)
if (usage as f64 / threshold as f64) > (f64::from(max_usage) / 10000.0)
&& height < min_retention_height
{
height = min(height + batch_size, min_retention_height);
self.delete_batch(height).await?;
self.commit().await.map_err(|e| QueryError::Error {
message: format!("failed to commit {e}"),
})?;
pruned_height = Some(height);
tracing::warn!("Pruned data up to height {height}");

usage = self.get_disk_usage().await?;
self.pruner.pruned_height = Some(height);

return Ok(Some(height));
}
}
}
}

Ok(pruned_height)
self.pruner.reset();

Ok(None)
}
}

Expand Down

0 comments on commit 0515f4a

Please sign in to comment.