Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: able to handle concurrent region edit requests #4569

Merged
merged 2 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ mod create_test;
#[cfg(test)]
mod drop_test;
#[cfg(test)]
mod edit_region_test;
#[cfg(test)]
mod filter_deleted_test;
#[cfg(test)]
mod flush_test;
Expand Down Expand Up @@ -88,7 +90,7 @@ use crate::manifest::action::RegionEdit;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
use crate::read::scan_region::{ScanParallism, ScanRegion, Scanner};
use crate::region::RegionUsage;
use crate::request::WorkerRequest;
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::wal::entry_distributor::{
build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
};
Expand Down Expand Up @@ -196,11 +198,11 @@ impl MitoEngine {
);

let (tx, rx) = oneshot::channel();
let request = WorkerRequest::EditRegion {
let request = WorkerRequest::EditRegion(RegionEditRequest {
region_id,
edit,
tx,
};
});
self.inner
.workers
.submit_to_worker(region_id, request)
Expand Down
123 changes: 123 additions & 0 deletions src/mito2/src/engine/edit_region_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use object_store::ObjectStore;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;

use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::manifest::action::RegionEdit;
use crate::region::MitoRegionRef;
use crate::sst::file::{FileId, FileMeta};
use crate::test_util::{CreateRequestBuilder, TestEnv};

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_edit_region_concurrently() {
const EDITS_PER_TASK: usize = 10;
let tasks_count = 10;

// A task that creates SST files and edits the region with them.
struct Task {
region: MitoRegionRef,
ssts: Vec<FileMeta>,
}

impl Task {
async fn create_ssts(&mut self, object_store: &ObjectStore) {
for _ in 0..EDITS_PER_TASK {
let file = FileMeta {
region_id: self.region.region_id,
file_id: FileId::random(),
level: 0,
..Default::default()
};
object_store
.write(
&format!("{}/{}.parquet", self.region.region_dir(), file.file_id),
b"x".as_slice(),
)
.await
.unwrap();
self.ssts.push(file);
}
}

async fn edit_region(self, engine: MitoEngine) {
for sst in self.ssts {
let edit = RegionEdit {
files_to_add: vec![sst],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
};
engine
.edit_region(self.region.region_id, edit)
.await
.unwrap();
}
}
}

let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
engine
.handle_request(
region_id,
RegionRequest::Create(CreateRequestBuilder::new().build()),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();

let mut tasks = Vec::with_capacity(tasks_count);
let object_store = env.get_object_store().unwrap();
for _ in 0..tasks_count {
let mut task = Task {
region: region.clone(),
ssts: Vec::new(),
};
task.create_ssts(&object_store).await;
tasks.push(task);
}

let mut join_set = JoinSet::new();
// This semaphore is used to coordinate the tasks, making them started at roughly the same time.
let s = Arc::new(Semaphore::new(0));
for task in tasks {
join_set.spawn({
let s = s.clone();
let engine = engine.clone();
async move {
let _permit = s.acquire().await.unwrap();
MichaelScofield marked this conversation as resolved.
Show resolved Hide resolved
task.edit_region(engine).await;
}
});
}
s.add_permits(tasks_count);
while join_set.join_next().await.is_some() {}
MichaelScofield marked this conversation as resolved.
Show resolved Hide resolved

assert_eq!(
region.version().ssts.levels()[0].files.len(),
tasks_count * EDITS_PER_TASK
);
}
8 changes: 8 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Region {} is busy", region_id))]
RegionBusy {
region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -973,6 +980,7 @@ impl ErrorExt for Error {
| FulltextFinish { source, .. }
| ApplyFulltextIndex { source, .. } => source.status_code(),
DecodeStats { .. } | StatsNotPresent { .. } => StatusCode::Internal,
RegionBusy { .. } => StatusCode::RegionBusy,
}
}

Expand Down
15 changes: 10 additions & 5 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,11 +494,7 @@ pub(crate) enum WorkerRequest {
Stop,

/// Use [RegionEdit] to edit a region directly.
EditRegion {
region_id: RegionId,
edit: RegionEdit,
tx: Sender<Result<()>>,
},
EditRegion(RegionEditRequest),
}

impl WorkerRequest {
Expand Down Expand Up @@ -762,6 +758,15 @@ pub(crate) struct RegionChangeResult {
pub(crate) result: Result<()>,
}

/// Request to edit a region directly.
#[derive(Debug)]
pub(crate) struct RegionEditRequest {
pub(crate) region_id: RegionId,
pub(crate) edit: RegionEdit,
/// The sender to notify the result to the region engine.
pub(crate) tx: Sender<Result<()>>,
}

/// Notifies the regin the result of editing region.
#[derive(Debug)]
pub(crate) struct RegionEditResult {
Expand Down
14 changes: 7 additions & 7 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::wal::Wal;
use crate::worker::handle_manifest::RegionEditQueues;

/// Identifier for a worker.
pub(crate) type WorkerId = u32;
Expand Down Expand Up @@ -441,6 +442,7 @@ impl<S: LogStore> WorkerStarter<S> {
flush_receiver: self.flush_receiver,
stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&id_string]),
region_count: REGION_COUNT.with_label_values(&[&id_string]),
region_edit_queues: RegionEditQueues::default(),
};
let handle = common_runtime::spawn_global(async move {
worker_thread.run().await;
Expand Down Expand Up @@ -629,6 +631,8 @@ struct RegionWorkerLoop<S> {
stalled_count: IntGauge,
/// Gauge of regions in the worker.
region_count: IntGauge,
/// Queues for region edit requests.
region_edit_queues: RegionEditQueues,
}

impl<S: LogStore> RegionWorkerLoop<S> {
Expand Down Expand Up @@ -727,12 +731,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
WorkerRequest::SetReadonlyGracefully { region_id, sender } => {
self.set_readonly_gracefully(region_id, sender).await;
}
WorkerRequest::EditRegion {
region_id,
edit,
tx,
} => {
self.handle_region_edit(region_id, edit, tx).await;
WorkerRequest::EditRegion(request) => {
self.handle_region_edit(request).await;
}
// We receive a stop signal, but we still want to process remaining
// requests. The worker thread will then check the running flag and
Expand Down Expand Up @@ -824,7 +824,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
BackgroundNotify::RegionChange(req) => self.handle_manifest_region_change_result(req),
BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req),
BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
}
}

Expand Down
91 changes: 74 additions & 17 deletions src/mito2/src/worker/handle_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,89 @@
//!
//! It updates the manifest and applies the changes to the region in background.

use std::collections::{HashMap, VecDeque};

use common_telemetry::{info, warn};
use snafu::ensure;
use store_api::storage::RegionId;
use tokio::sync::oneshot::Sender;

use crate::error::{InvalidRequestSnafu, RegionNotFoundSnafu, Result};
use crate::error::{InvalidRequestSnafu, RegionBusySnafu, RegionNotFoundSnafu, Result};
use crate::manifest::action::{
RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
};
use crate::region::{MitoRegionRef, RegionState};
use crate::request::{
BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditResult, TruncateResult,
WorkerRequest,
BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult,
TruncateResult, WorkerRequest,
};
use crate::worker::RegionWorkerLoop;

pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;

/// A queue for temporary store region edit requests, if the region is in the "Editing" state.
/// When the current region edit request is completed, the next (if there exists) request in the
/// queue will be processed.
/// Everything is done in the region worker loop.
pub(crate) struct RegionEditQueue {
region_id: RegionId,
requests: VecDeque<RegionEditRequest>,
}

impl RegionEditQueue {
const QUEUE_MAX_LEN: usize = 128;

fn new(region_id: RegionId) -> Self {
Self {
region_id,
requests: VecDeque::new(),
}
}

fn enqueue(&mut self, request: RegionEditRequest) {
if self.requests.len() > Self::QUEUE_MAX_LEN {
let _ = request.tx.send(
RegionBusySnafu {
region_id: self.region_id,
}
.fail(),
);
return;
};
self.requests.push_back(request);
}

fn dequeue(&mut self) -> Option<RegionEditRequest> {
self.requests.pop_front()
}
}

impl<S> RegionWorkerLoop<S> {
/// Handles region edit request.
pub(crate) async fn handle_region_edit(
&self,
region_id: RegionId,
edit: RegionEdit,
sender: Sender<Result<()>>,
) {
let region = match self.regions.writable_region(region_id) {
Ok(region) => region,
Err(e) => {
let _ = sender.send(Err(e));
return;
}
pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) {
let region_id = request.region_id;
let Some(region) = self.regions.get_region(region_id) else {
let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
return;
};

if !region.is_writable() {
if region.state() == RegionState::Editing {
self.region_edit_queues
.entry(region_id)
.or_insert_with(|| RegionEditQueue::new(region_id))
.enqueue(request);
} else {
let _ = request.tx.send(RegionBusySnafu { region_id }.fail());
}
return;
}

let RegionEditRequest {
region_id: _,
edit,
tx: sender,
} = request;

// Marks the region as editing.
if let Err(e) = region.set_editing() {
let _ = sender.send(Err(e));
Expand Down Expand Up @@ -79,7 +130,7 @@ impl<S> RegionWorkerLoop<S> {
}

/// Handles region edit result.
pub(crate) fn handle_region_edit_result(&self, edit_result: RegionEditResult) {
pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) {
let region = match self.regions.get_region(edit_result.region_id) {
Some(region) => region,
None => {
Expand All @@ -104,6 +155,12 @@ impl<S> RegionWorkerLoop<S> {
region.switch_state_to_writable(RegionState::Editing);

let _ = edit_result.sender.send(edit_result.result);

if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id) {
if let Some(request) = edit_queue.dequeue() {
self.handle_region_edit(request).await;
}
}
}

/// Writes truncate action to the manifest and then applies it to the region in background.
Expand Down