Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 6 additions & 84 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,6 @@ pub struct TableFlushOptions {
///
/// Default is false.
pub block_on_write_thread: bool,
/// Flush policy
pub policy: TableFlushPolicy,
}

impl Default for TableFlushOptions {
Expand All @@ -171,7 +169,6 @@ impl Default for TableFlushOptions {
res_sender: None,
compact_after_flush: true,
block_on_write_thread: false,
policy: TableFlushPolicy::Dump,
}
}
}
Expand All @@ -184,23 +181,6 @@ pub struct TableFlushRequest {
pub max_sequence: SequenceNumber,
}

/// Policy of how to perform flush operation.
#[derive(Default, Debug, Clone, Copy)]
pub enum TableFlushPolicy {
/// Unknown policy, this is the default value and operation will report
/// error for it. Others except `RoleTable` should set policy to this
/// variant.
Unknown,
/// Dump memtable to sst file.
// todo: the default value should be [Unknown].
#[default]
Dump,
// TODO: use this policy and remove "allow(dead_code)"
/// Drop memtables.
#[allow(dead_code)]
Purge,
}

impl Instance {
/// Flush this table.
pub async fn flush_table(
Expand Down Expand Up @@ -348,7 +328,7 @@ impl Instance {
let table = table_data.name.clone();

let instance = self.clone();
let flush_job = async move { instance.flush_memtables(&flush_req, opts.policy).await };
let flush_job = async move { instance.flush_memtables(&flush_req).await };

let compact_req = TableCompactionRequest::no_waiter(
table_data.clone(),
Expand Down Expand Up @@ -389,11 +369,7 @@ impl Instance {
}

/// Each table can only have one running flush job.
async fn flush_memtables(
&self,
flush_req: &TableFlushRequest,
policy: TableFlushPolicy,
) -> Result<()> {
async fn flush_memtables(&self, flush_req: &TableFlushRequest) -> Result<()> {
let TableFlushRequest {
table_data,
max_sequence,
Expand All @@ -408,28 +384,16 @@ impl Instance {

let request_id = RequestId::next_id();
info!(
"Instance try to flush memtables, table:{}, table_id:{}, request_id:{}, mems_to_flush:{:?}, policy:{:?}",
table_data.name, table_data.id, request_id, mems_to_flush, policy,
"Instance try to flush memtables, table:{}, table_id:{}, request_id:{}, mems_to_flush:{:?}",
table_data.name, table_data.id, request_id, mems_to_flush
);

// Start flush duration timer.
let local_metrics = table_data.metrics.local_flush_metrics();
local_metrics.observe_memtables_num(mems_to_flush.len());
let _timer = local_metrics.flush_duration_histogram.start_timer();

match policy {
TableFlushPolicy::Unknown => {
return UnknownPolicy {}.fail();
}
TableFlushPolicy::Dump => {
self.dump_memtables(table_data, request_id, &mems_to_flush)
.await?
}
TableFlushPolicy::Purge => {
self.purge_memtables(table_data, request_id, &mems_to_flush)
.await?
}
}
self.dump_memtables(table_data, request_id, &mems_to_flush)
.await?;

table_data.set_last_flush_time(time::current_time_millis());

Expand All @@ -441,48 +405,6 @@ impl Instance {
Ok(())
}

/// Flush action for [TableFlushPolicy::Purge].
///
/// Purge is simply removing all selected memtables.
async fn purge_memtables(
&self,
table_data: &TableData,
request_id: RequestId,
mems_to_flush: &FlushableMemTables,
) -> Result<()> {
// calculate largest sequence number purged
let mut last_sequence_purged = SequenceNumber::MIN;
if let Some(sampling_mem) = &mems_to_flush.sampling_mem {
last_sequence_purged = last_sequence_purged.max(sampling_mem.last_sequence());
}
for mem in &mems_to_flush.memtables {
last_sequence_purged = last_sequence_purged.max(mem.last_sequence());
}

// remove these memtables
let mems_to_remove = mems_to_flush.ids();
let edit = VersionEdit {
flushed_sequence: last_sequence_purged,
mems_to_remove,
files_to_add: vec![],
files_to_delete: vec![],
};
table_data.current_version().apply_edit(edit);

info!(
"Instance purged memtables, table:{}, table_id:{}, request_id:{}, mems_to_flush:{:?}, last_sequence_purged:{}",
table_data.name,
table_data.id,
request_id,
mems_to_flush,
last_sequence_purged
);

Ok(())
}

/// Flush action for [TableFlushPolicy::Dump].
///
/// This will write picked memtables [FlushableMemTables] to level 0 sst
/// files. Sampling memtable may be dumped into multiple sst file according
/// to the sampled segment duration.
Expand Down
3 changes: 1 addition & 2 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
ApplyMemTable, FlushTable, OperateByWriteWorker, ReadMetaUpdate, ReadWal,
RecoverTableData, Result,
},
flush_compaction::{TableFlushOptions, TableFlushPolicy},
flush_compaction::TableFlushOptions,
mem_collector::MemUsageCollector,
write_worker,
write_worker::{RecoverTableCommand, WorkerLocal, WriteGroup},
Expand Down Expand Up @@ -423,7 +423,6 @@ impl Instance {
res_sender: None,
compact_after_flush: false,
block_on_write_thread: false,
policy: TableFlushPolicy::Dump,
};
self.flush_table_in_worker(worker_local, table_data, opts)
.await
Expand Down
6 changes: 1 addition & 5 deletions analytic_engine/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ use tokio::sync::oneshot;

use self::data::TableDataRef;
use crate::{
instance::{
flush_compaction::{TableFlushOptions, TableFlushPolicy},
Instance, InstanceRef,
},
instance::{flush_compaction::TableFlushOptions, Instance, InstanceRef},
space::{SpaceAndTable, SpaceId},
};

Expand Down Expand Up @@ -263,7 +260,6 @@ impl Table for TableImpl {
} else {
None
},
policy: TableFlushPolicy::Dump,
};

Instance::flush_table(self.space_table.table_data().clone(), flush_opts)
Expand Down