Skip to content

Commit

Permalink
sst_importer: add metrics (tikv#6395) (tikv#6404)
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer authored and AndreMouche committed Jan 3, 2020
1 parent abee7ec commit e5568d4
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 4 deletions.
20 changes: 20 additions & 0 deletions src/import/errors.rs
Expand Up @@ -16,6 +16,26 @@ use crate::raftstore::errors::Error as RaftStoreError;
use crate::storage::mvcc::Error as MvccError;
use tikv_util::codec::Error as CodecError;

use super::metrics::*;

pub fn error_inc(err: &Error) {
let label = match err {
Error::Io(..) => "io",
Error::Grpc(..) => "grpc",
Error::Uuid(..) => "uuid",
Error::RocksDB(..) => "rocksdb",
Error::ParseIntError(..) => "parse_int",
Error::FileExists(..) => "file_exists",
Error::FileCorrupted(..) => "file_corrupt",
Error::InvalidSSTPath(..) => "invalid_sst",
Error::Engine(..) => "engine",
Error::CannotReadExternalStorage(..) => "read_external_storage",
Error::WrongKeyPrefix(..) => "wrong_prefix",
_ => return,
};
IMPORTER_ERROR_VEC.with_label_values(&[label]).inc();
}

quick_error! {
#[derive(Debug)]
pub enum Error {
Expand Down
2 changes: 1 addition & 1 deletion src/import/kv_service.rs
Expand Up @@ -17,7 +17,7 @@ use tikv_util::time::Instant;
use super::client::*;
use super::metrics::*;
use super::service::*;
use super::{Config, Error, KVImporter};
use super::{Config, Error, KVImporter, error_inc};

#[derive(Clone)]
pub struct ImportKVService {
Expand Down
34 changes: 33 additions & 1 deletion src/import/metrics.rs
Expand Up @@ -88,5 +88,37 @@ lazy_static! {
"Counter of wait store available",
&["store_id"]
)
.unwrap();
.unwrap();
pub static ref IMPORTER_DOWNLOAD_DURATION: HistogramVec = register_histogram_vec!(
"tikv_import_download_duration",
"Bucketed histogram of importer download duration",
&["type"],
exponential_buckets(0.001, 2.0, 20).unwrap()
)
.unwrap();
pub static ref IMPORTER_DOWNLOAD_BYTES: Histogram = register_histogram!(
"tikv_import_download_bytes",
"Bucketed histogram of importer download bytes",
exponential_buckets(1024.0, 2.0, 20).unwrap()
)
.unwrap();
pub static ref IMPORTER_INGEST_DURATION: HistogramVec = register_histogram_vec!(
"tikv_import_ingest_duration",
"Bucketed histogram of importer ingest duration",
&["type"],
exponential_buckets(0.001, 2.0, 20).unwrap()
)
.unwrap();
pub static ref IMPORTER_INGEST_BYTES: Histogram = register_histogram!(
"tikv_import_ingest_bytes",
"Bucketed histogram of importer ingest bytes",
exponential_buckets(1024.0, 2.0, 20).unwrap()
)
.unwrap();
pub static ref IMPORTER_ERROR_VEC: IntCounterVec = register_int_counter_vec!(
"tikv_import_error_counter",
"Total number of importer errors",
&["error"]
)
.unwrap();
}
2 changes: 1 addition & 1 deletion src/import/mod.rs
Expand Up @@ -39,7 +39,7 @@ mod sst_service;
pub mod test_helpers;

pub use self::config::Config;
pub use self::errors::{Error, Result};
pub use self::errors::{Error, Result, error_inc};
pub use self::kv_importer::KVImporter;
pub use self::kv_server::ImportKVServer;
pub use self::kv_service::ImportKVService;
Expand Down
1 change: 1 addition & 0 deletions src/import/service.rs
Expand Up @@ -32,6 +32,7 @@ macro_rules! send_rpc_response {
IMPORT_RPC_DURATION
.with_label_values(&[$label, "error"])
.observe($timer.elapsed_secs());
error_inc(&e);
$sink.fail(make_rpc_error(e))
}
};
Expand Down
18 changes: 18 additions & 0 deletions src/import/sst_importer.rs
Expand Up @@ -7,6 +7,7 @@ use std::io::Write as _;
use std::ops::Bound;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;

use kvproto::backup::StorageBackend;
use kvproto::import_sstpb::*;
Expand All @@ -21,6 +22,7 @@ use engine::CF_WRITE;
use external_storage::{create_storage, url_of_backend};

use super::{Error, Result};
use super::metrics::*;
use crate::raftstore::store::keys;

/// SSTImporter manages SST files that are waiting for ingesting.
Expand Down Expand Up @@ -132,6 +134,7 @@ impl SSTImporter {
speed_limiter: Option<Arc<IOLimiter>>,
mut sst_writer: SstWriter,
) -> Result<Option<Range>> {
let start = Instant::now();
let path = self.dir.join(meta)?;
let url = url_of_backend(backend);

Expand All @@ -150,6 +153,7 @@ impl SSTImporter {
let reason = format!("length {}, expect {}", file_length, meta.length);
return Err(Error::FileCorrupted(path.temp, reason));
}
IMPORTER_DOWNLOAD_BYTES.observe(file_length as _);
file_writer.sync_data()?;
}

Expand Down Expand Up @@ -228,6 +232,10 @@ impl SSTImporter {
if let Some(range) = direct_retval {
// TODO: what about encrypted SSTs?
fs::rename(&path.temp, &path.save)?;
let duration = start.elapsed();
IMPORTER_DOWNLOAD_DURATION
.with_label_values(&["rename"])
.observe(duration.as_secs_f64());
return Ok(Some(range));
}

Expand Down Expand Up @@ -279,6 +287,11 @@ impl SSTImporter {

let _ = fs::remove_file(&path.temp);

let duration = start.elapsed();
IMPORTER_DOWNLOAD_DURATION
.with_label_values(&["rewrite"])
.observe(duration.as_secs_f64());

if let Some(start_key) = first_key {
sst_writer.finish()?;
let mut final_range = Range::default();
Expand Down Expand Up @@ -368,6 +381,7 @@ impl ImportDir {
}

fn ingest(&self, meta: &SSTMeta, db: &DB) -> Result<()> {
let start = Instant::now();
let path = self.join(meta)?;
let cf = meta.get_cf_name();
prepare_sst_for_ingestion(&path.save, &path.clone)?;
Expand All @@ -376,6 +390,7 @@ impl ImportDir {
if length != 0 || crc32 != 0 {
// we only validate if the length and CRC32 are explicitly provided.
validate_sst_for_ingestion(db, cf, &path.clone, length, crc32)?;
IMPORTER_INGEST_BYTES.observe(length as _)
} else {
debug!("skipping SST validation since length and crc32 are both 0");
}
Expand All @@ -384,6 +399,9 @@ impl ImportDir {
let mut opts = IngestExternalFileOptions::new();
opts.move_files(true);
db.ingest_external_file_cf(handle, &opts, &[path.clone.to_str().unwrap()])?;
IMPORTER_INGEST_DURATION
.with_label_values(&["ingest"])
.observe(start.elapsed().as_secs_f64());
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/import/sst_service.rs
Expand Up @@ -21,7 +21,7 @@ use tikv_util::time::Instant;
use super::import_mode::*;
use super::metrics::*;
use super::service::*;
use super::{Config, Error, SSTImporter};
use super::{Config, Error, SSTImporter, error_inc};

/// ImportSSTService provides tikv-server with the ability to ingest SST files.
///
Expand Down

0 comments on commit e5568d4

Please sign in to comment.