From 4a01ee4a7b71f611969ee260a56f0738e06f9ef2 Mon Sep 17 00:00:00 2001 From: tonyxuqqi Date: Mon, 19 Feb 2024 19:49:56 -0800 Subject: [PATCH] add disk protection in import service (#16487) close tikv/tikv#16454 add disk protection in import service's write and upload API Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> Signed-off-by: dbsid --- components/error_code/src/sst_importer.rs | 3 +- components/sst_importer/src/errors.rs | 4 ++ src/import/sst_service.rs | 15 ++++++- tests/integrations/import/test_sst_service.rs | 39 ++++++++++++++++--- 4 files changed, 54 insertions(+), 7 deletions(-) diff --git a/components/error_code/src/sst_importer.rs b/components/error_code/src/sst_importer.rs index 9e568ee00c1..b092796d467 100644 --- a/components/error_code/src/sst_importer.rs +++ b/components/error_code/src/sst_importer.rs @@ -27,5 +27,6 @@ define_error_codes!( "this request has been suspended.", "Probably there are some export tools don't support exporting data inserted by `ingest`(say, snapshot backup). Check the user manual and stop them."), REQUEST_TOO_NEW => ("RequestTooNew", "", ""), - REQUEST_TOO_OLD => ("RequestTooOld", "", "") + REQUEST_TOO_OLD => ("RequestTooOld", "", ""), + DISK_SPACE_NOT_ENOUGH => ("DiskSpaceNotEnough", "", "") ); diff --git a/components/sst_importer/src/errors.rs b/components/sst_importer/src/errors.rs index e5e235e9761..c79bd2db9f8 100644 --- a/components/sst_importer/src/errors.rs +++ b/components/sst_importer/src/errors.rs @@ -136,6 +136,9 @@ pub enum Error { #[error("imports are suspended for {time_to_lease_expire:?}")] Suspended { time_to_lease_expire: Duration }, + + #[error("TiKV disk space is not enough.")] + DiskSpaceNotEnough, } impl Error { @@ -222,6 +225,7 @@ impl ErrorCodeExt for Error { Error::Suspended { .. } => error_code::sst_importer::SUSPENDED, Error::RequestTooNew(_) => error_code::sst_importer::REQUEST_TOO_NEW, Error::RequestTooOld(_) => error_code::sst_importer::REQUEST_TOO_OLD, + Error::DiskSpaceNotEnough => error_code::sst_importer::DISK_SPACE_NOT_ENOUGH, } } } diff --git a/src/import/sst_service.rs b/src/import/sst_service.rs index bd12053031f..9501698b75d 100644 --- a/src/import/sst_service.rs +++ b/src/import/sst_service.rs @@ -46,7 +46,10 @@ use tikv_kv::{ use tikv_util::{ config::ReadableSize, future::{create_stream_with_buffer, paired_future_callback}, - sys::thread::ThreadBuildWrapper, + sys::{ + disk::{get_disk_status, DiskUsage}, + thread::ThreadBuildWrapper, + }, time::{Instant, Limiter}, HandyRwLock, }; @@ -817,6 +820,11 @@ macro_rules! impl_write { .try_fold( (writer, resource_limiter), |(mut writer, limiter), req| async move { + if get_disk_status(0) != DiskUsage::Normal { + warn!("Upload failed due to not enough disk space"); + return Err(Error::DiskSpaceNotEnough); + } + let batch = match req.chunk { Some($chunk_ty::Batch(b)) => b, _ => return Err(Error::InvalidChunk), @@ -961,6 +969,11 @@ impl ImportSst for ImportSstService { let file = import.create(meta)?; let mut file = rx .try_fold(file, |mut file, chunk| async move { + if get_disk_status(0) != DiskUsage::Normal { + warn!("Upload failed due to not enough disk space"); + return Err(Error::DiskSpaceNotEnough); + } + let start = Instant::now_coarse(); let data = chunk.get_data(); if data.is_empty() { diff --git a/tests/integrations/import/test_sst_service.rs b/tests/integrations/import/test_sst_service.rs index f1b2e23014c..49cfba1f722 100644 --- a/tests/integrations/import/test_sst_service.rs +++ b/tests/integrations/import/test_sst_service.rs @@ -8,7 +8,10 @@ use pd_client::PdClient; use tempfile::Builder; use test_sst_importer::*; use tikv::config::TikvConfig; -use tikv_util::config::ReadableSize; +use tikv_util::{ + config::ReadableSize, + sys::disk::{set_disk_status, DiskUsage}, +}; use super::util::*; @@ -36,6 +39,14 @@ fn test_upload_sst() { let meta = new_sst_meta(0, length); assert_to_string_contains!(send_upload_sst(&import, &meta, &data).unwrap_err(), "crc32"); + // diskfull + set_disk_status(DiskUsage::AlmostFull); + assert_to_string_contains!( + send_upload_sst(&import, &meta, &data).unwrap_err(), + "DiskSpaceNotEnough" + ); + set_disk_status(DiskUsage::Normal); + let mut meta = new_sst_meta(crc32, length); meta.set_region_id(ctx.get_region_id()); meta.set_region_epoch(ctx.get_region_epoch().clone()); @@ -48,7 +59,12 @@ fn test_upload_sst() { ); } -fn run_test_write_sst(ctx: Context, tikv: TikvClient, import: ImportSstClient) { +fn run_test_write_sst( + ctx: Context, + tikv: TikvClient, + import: ImportSstClient, + expected_error: &str, +) { let mut meta = new_sst_meta(0, 0); meta.set_region_id(ctx.get_region_id()); meta.set_region_epoch(ctx.get_region_epoch().clone()); @@ -60,8 +76,13 @@ fn run_test_write_sst(ctx: Context, tikv: TikvClient, import: ImportSstClient) { keys.push(vec![i]); values.push(vec![i]); } - let resp = send_write_sst(&import, &meta, keys, values, 1).unwrap(); + let resp = send_write_sst(&import, &meta, keys, values, 1); + if !expected_error.is_empty() { + assert_to_string_contains!(resp.unwrap_err(), expected_error); + return; + } + let resp = resp.unwrap(); for m in resp.metas.into_iter() { let mut ingest = IngestRequest::default(); ingest.set_context(ctx.clone()); @@ -76,13 +97,21 @@ fn run_test_write_sst(ctx: Context, tikv: TikvClient, import: ImportSstClient) { fn test_write_sst() { let (_cluster, ctx, tikv, import) = new_cluster_and_tikv_import_client(); - run_test_write_sst(ctx, tikv, import); + run_test_write_sst(ctx, tikv, import, ""); +} + +#[test] +fn test_write_sst_when_disk_full() { + set_disk_status(DiskUsage::AlmostFull); + let (_cluster, ctx, tikv, import) = new_cluster_and_tikv_import_client(); + run_test_write_sst(ctx, tikv, import, "DiskSpaceNotEnough"); + set_disk_status(DiskUsage::Normal); } #[test] fn test_write_and_ingest_with_tde() { let (_tmp_dir, _cluster, ctx, tikv, import) = new_cluster_and_tikv_import_client_tde(); - run_test_write_sst(ctx, tikv, import); + run_test_write_sst(ctx, tikv, import, ""); } #[test]