Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sst_importer: Fix MvccProperties didn't generated during restoring (#… #6378

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions components/engine/src/cf.rs
Expand Up @@ -9,3 +9,16 @@ pub const CF_RAFT: CfName = "raft";
pub const LARGE_CFS: &[CfName] = &[CF_DEFAULT, CF_WRITE];
pub const ALL_CFS: &[CfName] = &[CF_DEFAULT, CF_LOCK, CF_WRITE, CF_RAFT];
pub const DATA_CFS: &[CfName] = &[CF_DEFAULT, CF_LOCK, CF_WRITE];

pub fn name_to_cf(name: &str) -> Option<CfName> {
if name.is_empty() {
return Some(CF_DEFAULT);
}
for c in ALL_CFS {
if name == *c {
return Some(c);
}
}

None
}
188 changes: 129 additions & 59 deletions src/import/sst_importer.rs
Expand Up @@ -556,12 +556,77 @@ mod tests {
use super::*;
use crate::import::test_helpers::*;

use engine::rocks::util::new_engine;
use engine::rocks::SstWriterBuilder;
use engine::rocks::util::{new_engine, CFOptions};
use engine::rocks::{
ColumnFamilyOptions, DBEntryType, SstWriterBuilder, TablePropertiesCollector,
TablePropertiesCollectorFactory,
};
use engine::util::get_range_properties_cf;
use engine::{name_to_cf, CF_DEFAULT, CF_WRITE, DATA_CFS};
use tempdir::TempDir;
use tikv_util::file::calc_crc32_bytes;

use std::collections::HashMap;

const PROP_TEST_MARKER_CF_NAME: &[u8] = b"tikv.test_marker_cf_name";

#[derive(Default)]
struct TestPropertiesCollectorFactory {
cf: String,
}

impl TestPropertiesCollectorFactory {
pub fn new(cf: impl Into<String>) -> Self {
Self { cf: cf.into() }
}
}

impl TablePropertiesCollectorFactory for TestPropertiesCollectorFactory {
fn create_table_properties_collector(
&mut self,
_: u32,
) -> Box<dyn TablePropertiesCollector> {
Box::new(TestPropertiesCollector::new(self.cf.clone()))
}
}

struct TestPropertiesCollector {
cf: String,
}

impl TestPropertiesCollector {
pub fn new(cf: String) -> Self {
Self { cf }
}
}

impl TablePropertiesCollector for TestPropertiesCollector {
fn add(&mut self, _: &[u8], _: &[u8], _: DBEntryType, _: u64, _: u64) {}

fn finish(&mut self) -> HashMap<Vec<u8>, Vec<u8>> {
std::iter::once((
PROP_TEST_MARKER_CF_NAME.to_owned(),
self.cf.as_bytes().to_owned(),
))
.collect()
}
}

fn new_engine_with_properties(path: &str, cfs: &[&str]) -> DB {
let cf_opts = cfs
.iter()
.map(|cf| {
let mut opt = ColumnFamilyOptions::new();
opt.add_table_properties_collector_factory(
"tikv.test_properties",
Box::new(TestPropertiesCollectorFactory::new(*cf)),
);
CFOptions::new(*cf, opt)
})
.collect();
new_engine(path, None, cfs, Some(cf_opts)).expect("rocks test engine")
}

#[test]
fn test_import_dir() {
let temp_dir = TempDir::new("test_import_dir").unwrap();
Expand Down Expand Up @@ -726,10 +791,11 @@ mod tests {
fn create_sst_writer_with_db(importer: &SSTImporter, meta: &SSTMeta) -> Result<SstWriter> {
let temp_dir = TempDir::new("test_import_dir").unwrap();
let db_path = temp_dir.path().join("db");
let db = new_engine(db_path.to_str().unwrap(), None, &["default"], None).unwrap();
let db = new_engine_with_properties(db_path.to_str().unwrap(), DATA_CFS);

let sst_writer = SstWriterBuilder::new()
.set_db(Arc::new(db))
.set_cf(name_to_cf(meta.get_cf_name()).unwrap())
.build(importer.get_path(meta).to_str().unwrap())
.unwrap();
Ok(sst_writer)
Expand Down Expand Up @@ -832,63 +898,67 @@ mod tests {

#[test]
fn test_download_sst_then_ingest() {
// creates a sample SST file.
let (_ext_sst_dir, backend, mut meta) = create_sample_external_sst_file().unwrap();

// performs the download.
let importer_dir = TempDir::new("importer_dir").unwrap();
let importer = SSTImporter::new(importer_dir.path()).unwrap();
let sst_writer = create_sst_writer_with_db(&importer, &meta).unwrap();

let range = importer
.download(
&meta,
&backend,
"sample.sst",
&new_rewrite_rule(b"t123", b"t9102"),
None,
sst_writer,
)
.unwrap()
.unwrap();

assert_eq!(range.get_start(), b"t9102_r01");
assert_eq!(range.get_end(), b"t9102_r13");

// performs the ingest
let ingest_dir = TempDir::new("ingest_dir").unwrap();
let db = new_engine(
ingest_dir.path().to_str().unwrap(),
None,
&["default"],
None,
)
.unwrap();

meta.set_length(0); // disable validation.
meta.set_crc32(0);
importer.ingest(&meta, &db).unwrap();

// verifies the DB content is correct.
let mut iter = db.iter();
iter.seek(SeekKey::Start).unwrap();
assert_eq!(
iter.collect::<Vec<_>>(),
vec![
(b"zt9102_r01".to_vec(), b"abc".to_vec()),
(b"zt9102_r04".to_vec(), b"xyz".to_vec()),
(b"zt9102_r07".to_vec(), b"pqrst".to_vec()),
(b"zt9102_r13".to_vec(), b"www".to_vec()),
]
);
for cf in &[CF_DEFAULT, CF_WRITE] {
// creates a sample SST file.
let (_ext_sst_dir, backend, mut meta) = create_sample_external_sst_file().unwrap();
meta.set_cf_name(cf.to_string());

// performs the download.
let importer_dir = TempDir::new("importer_dir").unwrap();
let importer = SSTImporter::new(importer_dir.path()).unwrap();
let sst_writer = create_sst_writer_with_db(&importer, &meta).unwrap();

let range = importer
.download(
&meta,
&backend,
"sample.sst",
&new_rewrite_rule(b"t123", b"t9102"),
None,
sst_writer,
)
.unwrap()
.unwrap();

// check properties
let start = keys::data_key(b"");
let end = keys::data_end_key(b"");
let collection = get_range_properties_cf(&db, "default", &start, &end).unwrap();
assert!(!collection.is_empty());
for (_, v) in &*collection {
assert!(!v.user_collected_properties().is_empty());
assert_eq!(range.get_start(), b"t9102_r01");
assert_eq!(range.get_end(), b"t9102_r13");

// performs the ingest
let ingest_dir = TempDir::new("ingest_dir").unwrap();
let db = new_engine_with_properties(ingest_dir.path().to_str().unwrap(), DATA_CFS);

meta.set_length(0); // disable validation.
meta.set_crc32(0);
importer.ingest(&meta, &db).unwrap();

// verifies the DB content is correct.
let cf_handle = get_cf_handle(&db, cf).unwrap();
let mut iter = db.iter_cf(cf_handle);
iter.seek(SeekKey::Start).unwrap();
assert_eq!(
iter.collect::<Vec<_>>(),
vec![
(b"zt9102_r01".to_vec(), b"abc".to_vec()),
(b"zt9102_r04".to_vec(), b"xyz".to_vec()),
(b"zt9102_r07".to_vec(), b"pqrst".to_vec()),
(b"zt9102_r13".to_vec(), b"www".to_vec()),
]
);

// check properties
let start = keys::data_key(b"");
let end = keys::data_end_key(b"");
let collection = get_range_properties_cf(&db, cf, &start, &end).unwrap();
assert!(!collection.is_empty());
for (_, v) in &*collection {
assert!(!v.user_collected_properties().is_empty());
assert_eq!(
v.user_collected_properties()
.get(PROP_TEST_MARKER_CF_NAME)
.unwrap(),
cf.as_bytes()
);
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/import/sst_service.rs
Expand Up @@ -4,6 +4,7 @@ use std::sync::{Arc, Mutex};

use engine::rocks::util::{compact_files_in_range, io_limiter::IOLimiter};
use engine::rocks::{SstWriterBuilder, DB};
use engine::name_to_cf;
use futures::sync::mpsc;
use futures::{future, Future, Stream};
use futures_cpupool::{Builder, CpuPool};
Expand Down Expand Up @@ -153,6 +154,7 @@ impl<Router: RaftStoreRouter> ImportSst for ImportSSTService<Router> {
let limiter = self.limiter.clone();
let sst_writer = SstWriterBuilder::new()
.set_db(self.engine.clone())
.set_cf(name_to_cf(req.get_sst().get_cf_name()).unwrap())
.build(self.importer.get_path(req.get_sst()).to_str().unwrap())
.unwrap();

Expand Down