Skip to content

Commit

Permalink
add disk protection in import service (tikv#16487)
Browse files Browse the repository at this point in the history
close tikv#16454

add disk protection in import service's write and upload API

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
Signed-off-by: dbsid <chenhuansheng@pingcap.com>
  • Loading branch information
2 people authored and dbsid committed Mar 24, 2024
1 parent d19e9c7 commit 4a01ee4
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 7 deletions.
3 changes: 2 additions & 1 deletion components/error_code/src/sst_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ define_error_codes!(
"this request has been suspended.",
"Probably there are some export tools don't support exporting data inserted by `ingest`(say, snapshot backup). Check the user manual and stop them."),
REQUEST_TOO_NEW => ("RequestTooNew", "", ""),
REQUEST_TOO_OLD => ("RequestTooOld", "", "")
REQUEST_TOO_OLD => ("RequestTooOld", "", ""),
DISK_SPACE_NOT_ENOUGH => ("DiskSpaceNotEnough", "", "")
);
4 changes: 4 additions & 0 deletions components/sst_importer/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ pub enum Error {

#[error("imports are suspended for {time_to_lease_expire:?}")]
Suspended { time_to_lease_expire: Duration },

#[error("TiKV disk space is not enough.")]
DiskSpaceNotEnough,
}

impl Error {
Expand Down Expand Up @@ -222,6 +225,7 @@ impl ErrorCodeExt for Error {
Error::Suspended { .. } => error_code::sst_importer::SUSPENDED,
Error::RequestTooNew(_) => error_code::sst_importer::REQUEST_TOO_NEW,
Error::RequestTooOld(_) => error_code::sst_importer::REQUEST_TOO_OLD,
Error::DiskSpaceNotEnough => error_code::sst_importer::DISK_SPACE_NOT_ENOUGH,
}
}
}
15 changes: 14 additions & 1 deletion src/import/sst_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ use tikv_kv::{
use tikv_util::{
config::ReadableSize,
future::{create_stream_with_buffer, paired_future_callback},
sys::thread::ThreadBuildWrapper,
sys::{
disk::{get_disk_status, DiskUsage},
thread::ThreadBuildWrapper,
},
time::{Instant, Limiter},
HandyRwLock,
};
Expand Down Expand Up @@ -817,6 +820,11 @@ macro_rules! impl_write {
.try_fold(
(writer, resource_limiter),
|(mut writer, limiter), req| async move {
if get_disk_status(0) != DiskUsage::Normal {
warn!("Upload failed due to not enough disk space");
return Err(Error::DiskSpaceNotEnough);
}

let batch = match req.chunk {
Some($chunk_ty::Batch(b)) => b,
_ => return Err(Error::InvalidChunk),
Expand Down Expand Up @@ -961,6 +969,11 @@ impl<E: Engine> ImportSst for ImportSstService<E> {
let file = import.create(meta)?;
let mut file = rx
.try_fold(file, |mut file, chunk| async move {
if get_disk_status(0) != DiskUsage::Normal {
warn!("Upload failed due to not enough disk space");
return Err(Error::DiskSpaceNotEnough);
}

let start = Instant::now_coarse();
let data = chunk.get_data();
if data.is_empty() {
Expand Down
39 changes: 34 additions & 5 deletions tests/integrations/import/test_sst_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use pd_client::PdClient;
use tempfile::Builder;
use test_sst_importer::*;
use tikv::config::TikvConfig;
use tikv_util::config::ReadableSize;
use tikv_util::{
config::ReadableSize,
sys::disk::{set_disk_status, DiskUsage},
};

use super::util::*;

Expand Down Expand Up @@ -36,6 +39,14 @@ fn test_upload_sst() {
let meta = new_sst_meta(0, length);
assert_to_string_contains!(send_upload_sst(&import, &meta, &data).unwrap_err(), "crc32");

// diskfull
set_disk_status(DiskUsage::AlmostFull);
assert_to_string_contains!(
send_upload_sst(&import, &meta, &data).unwrap_err(),
"DiskSpaceNotEnough"
);
set_disk_status(DiskUsage::Normal);

let mut meta = new_sst_meta(crc32, length);
meta.set_region_id(ctx.get_region_id());
meta.set_region_epoch(ctx.get_region_epoch().clone());
Expand All @@ -48,7 +59,12 @@ fn test_upload_sst() {
);
}

fn run_test_write_sst(ctx: Context, tikv: TikvClient, import: ImportSstClient) {
fn run_test_write_sst(
ctx: Context,
tikv: TikvClient,
import: ImportSstClient,
expected_error: &str,
) {
let mut meta = new_sst_meta(0, 0);
meta.set_region_id(ctx.get_region_id());
meta.set_region_epoch(ctx.get_region_epoch().clone());
Expand All @@ -60,8 +76,13 @@ fn run_test_write_sst(ctx: Context, tikv: TikvClient, import: ImportSstClient) {
keys.push(vec![i]);
values.push(vec![i]);
}
let resp = send_write_sst(&import, &meta, keys, values, 1).unwrap();
let resp = send_write_sst(&import, &meta, keys, values, 1);
if !expected_error.is_empty() {
assert_to_string_contains!(resp.unwrap_err(), expected_error);
return;
}

let resp = resp.unwrap();
for m in resp.metas.into_iter() {
let mut ingest = IngestRequest::default();
ingest.set_context(ctx.clone());
Expand All @@ -76,13 +97,21 @@ fn run_test_write_sst(ctx: Context, tikv: TikvClient, import: ImportSstClient) {
fn test_write_sst() {
let (_cluster, ctx, tikv, import) = new_cluster_and_tikv_import_client();

run_test_write_sst(ctx, tikv, import);
run_test_write_sst(ctx, tikv, import, "");
}

#[test]
fn test_write_sst_when_disk_full() {
set_disk_status(DiskUsage::AlmostFull);
let (_cluster, ctx, tikv, import) = new_cluster_and_tikv_import_client();
run_test_write_sst(ctx, tikv, import, "DiskSpaceNotEnough");
set_disk_status(DiskUsage::Normal);
}

#[test]
fn test_write_and_ingest_with_tde() {
let (_tmp_dir, _cluster, ctx, tikv, import) = new_cluster_and_tikv_import_client_tde();
run_test_write_sst(ctx, tikv, import);
run_test_write_sst(ctx, tikv, import, "");
}

#[test]
Expand Down

0 comments on commit 4a01ee4

Please sign in to comment.