Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: mistakely removes compaction inputs on failure #3635

Merged
merged 3 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,11 @@ impl CompactionTask for TwcsCompactionTask {
let notify = match self.handle_compaction().await {
Ok((added, deleted)) => {
info!(
"Compacted SST files, input: {:?}, output: {:?}, window: {:?}",
deleted, added, self.compaction_time_window
"Compacted SST files, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}",
deleted,
added,
self.compaction_time_window,
self.waiters.len(),
);

BackgroundNotify::CompactionFinished(CompactionFinished {
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,11 @@ impl MitoEngine {
}),
})
}

/// Returns the purge scheduler.
pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
self.inner.workers.purge_scheduler()
}
}

#[cfg(test)]
Expand Down
73 changes: 73 additions & 0 deletions src/mito2/src/engine/compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::ops::Range;
use std::sync::Arc;

use api::v1::{ColumnSchema, Rows};
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
Expand All @@ -23,8 +24,10 @@ use store_api::region_request::{
RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::Notify;

use crate::config::MitoConfig;
use crate::engine::listener::CompactionListener;
use crate::engine::MitoEngine;
use crate::test_util::{
build_rows_for_key, column_metadata_to_column_schema, put_rows, CreateRequestBuilder, TestEnv,
Expand Down Expand Up @@ -145,3 +148,73 @@ async fn test_compaction_region() {
let vec = collect_stream_ts(stream).await;
assert_eq!((0..25).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}

// For issue https://github.com/GreptimeTeam/greptimedb/issues/3633
#[tokio::test]
async fn test_readonly_during_compaction() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let listener = Arc::new(CompactionListener::default());
let engine = env
.create_engine_with(
MitoConfig {
// Ensure there is only one background worker for purge task.
max_background_jobs: 1,
..Default::default()
},
None,
Some(listener.clone()),
)
.await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_files", "1")
.build();

let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 2 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
put_and_flush(&engine, region_id, &column_schemas, 10..20).await;

// Waits until the engine receives compaction finished request.
listener.wait_handle_finished().await;

// Sets the region to read only mode.
engine.set_writable(region_id, false).unwrap();
// Wakes up the listener.
listener.wake();

let notify = Arc::new(Notify::new());
// We already sets max background jobs to 1, so we can submit a task to the
// purge scheduler to ensure all purge tasks are finished.
let job_notify = notify.clone();
engine
.purge_scheduler()
.schedule(Box::pin(async move {
job_notify.notify_one();
}))
.unwrap();
notify.notified().await;

let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(
2,
scanner.num_files(),
"unexpected files: {:?}",
scanner.file_ids()
);
let stream = scanner.scan().await.unwrap();

let vec = collect_stream_ts(stream).await;
assert_eq!((0..20).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}
63 changes: 44 additions & 19 deletions src/mito2/src/engine/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ use tokio::sync::Notify;
#[async_trait]
pub trait EventListener: Send + Sync {
/// Notifies the listener that a region is flushed successfully.
fn on_flush_success(&self, region_id: RegionId);
fn on_flush_success(&self, region_id: RegionId) {
let _ = region_id;
}

/// Notifies the listener that the engine is stalled.
fn on_write_stall(&self);
fn on_write_stall(&self) {}

/// Notifies the listener that the region starts to do flush.
async fn on_flush_begin(&self, region_id: RegionId);
async fn on_flush_begin(&self, region_id: RegionId) {
let _ = region_id;
}

/// Notifies the listener that the later drop task starts running.
/// Returns the gc interval if we want to override the default one.
Expand All @@ -46,6 +50,12 @@ pub trait EventListener: Send + Sync {
let _ = region_id;
let _ = removed;
}

/// Notifies the listener that the region is going to handle the compaction
/// finished request.
async fn on_handle_compaction_finished(&self, region_id: RegionId) {
let _ = region_id;
}
}

pub type EventListenerRef = Arc<dyn EventListener>;
Expand All @@ -70,10 +80,6 @@ impl EventListener for FlushListener {

self.notify.notify_one()
}

fn on_write_stall(&self) {}

async fn on_flush_begin(&self, _region_id: RegionId) {}
}

/// Listener to watch stall events.
Expand All @@ -98,8 +104,6 @@ impl EventListener for StallListener {

self.notify.notify_one();
}

async fn on_flush_begin(&self, _region_id: RegionId) {}
}

/// Listener to watch begin flush events.
Expand Down Expand Up @@ -130,10 +134,6 @@ impl FlushTruncateListener {

#[async_trait]
impl EventListener for FlushTruncateListener {
fn on_flush_success(&self, _region_id: RegionId) {}

fn on_write_stall(&self) {}

/// Calling this function will block the thread!
/// Notify the listener to perform a truncate region and block the flush region job.
async fn on_flush_begin(&self, region_id: RegionId) {
Expand Down Expand Up @@ -169,12 +169,6 @@ impl DropListener {

#[async_trait]
impl EventListener for DropListener {
fn on_flush_success(&self, _region_id: RegionId) {}

fn on_write_stall(&self) {}

async fn on_flush_begin(&self, _region_id: RegionId) {}

fn on_later_drop_begin(&self, _region_id: RegionId) -> Option<Duration> {
Some(self.gc_duration)
}
Expand All @@ -185,3 +179,34 @@ impl EventListener for DropListener {
self.notify.notify_one();
}
}

/// Listener on handling compaction requests.
#[derive(Default)]
pub struct CompactionListener {
handle_finished_notify: Notify,
blocker: Notify,
}

impl CompactionListener {
/// Waits for handling compaction finished request.
pub async fn wait_handle_finished(&self) {
self.handle_finished_notify.notified().await;
}

/// Wakes up the listener.
pub fn wake(&self) {
self.blocker.notify_one();
}
}

#[async_trait]
impl EventListener for CompactionListener {
async fn on_handle_compaction_finished(&self, region_id: RegionId) {
info!("Handle compaction finished request, region {region_id}");

self.handle_finished_notify.notify_one();

// Blocks current task.
self.blocker.notified().await;
}
}
2 changes: 1 addition & 1 deletion src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ impl OnFailure for CompactionFinished {
region_id: self.region_id,
}));
}
for file in &self.compacted_files {
for file in &self.compaction_outputs {
warn!(
"Cleaning region {} compaction output file: {}",
self.region_id, file.file_id
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl TestEnv {
.unwrap()
}

/// Creates a new engine with specific config and manager/listener under this env.
/// Creates a new engine with specific config and manager/listener/purge_scheduler under this env.
pub async fn create_engine_with(
&mut self,
config: MitoConfig,
Expand Down
14 changes: 14 additions & 0 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ impl WorkerGroup {
cache_manager,
})
}

/// Returns the purge scheduler.
pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
&self.purge_scheduler
}
}

fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
Expand Down Expand Up @@ -819,6 +824,15 @@ impl WorkerListener {
let _ = region_id;
let _ = removed;
}

pub(crate) async fn on_handle_compaction_finished(&self, region_id: RegionId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_handle_compaction_finished(region_id).await;
}
// Avoid compiler warning.
let _ = region_id;
}
}

#[cfg(test)]
Expand Down
8 changes: 7 additions & 1 deletion src/mito2/src/worker/handle_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_telemetry::{error, info};
use common_telemetry::{error, info, warn};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;

Expand Down Expand Up @@ -55,7 +55,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id: RegionId,
mut request: CompactionFinished,
) {
self.listener.on_handle_compaction_finished(region_id).await;

let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
warn!(
"Unable to finish the compaction task for a read only region {}",
region_id
);
return;
};

Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/worker/handle_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
mut request: FlushFinished,
) {
let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
warn!(
"Unable to finish the flush task for a read only region {}",
region_id
);
return;
};

Expand Down