Skip to content

Commit

Permalink
sst_importer: Fix MvccProperties didn't generated during restor… (#6378)
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta authored and AndreMouche committed Jan 2, 2020
1 parent 1225a64 commit ea7de98
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 60 deletions.
13 changes: 13 additions & 0 deletions components/engine/src/cf.rs
Expand Up @@ -9,3 +9,16 @@ pub const CF_RAFT: CfName = "raft";
pub const LARGE_CFS: &[CfName] = &[CF_DEFAULT, CF_WRITE];
pub const ALL_CFS: &[CfName] = &[CF_DEFAULT, CF_LOCK, CF_WRITE, CF_RAFT];
pub const DATA_CFS: &[CfName] = &[CF_DEFAULT, CF_LOCK, CF_WRITE];

pub fn name_to_cf(name: &str) -> Option<CfName> {
if name.is_empty() {
return Some(CF_DEFAULT);
}
for c in ALL_CFS {
if name == *c {
return Some(c);
}
}

None
}
189 changes: 129 additions & 60 deletions src/import/sst_importer.rs
Expand Up @@ -578,13 +578,77 @@ mod tests {

use crate::storage::mvcc::WriteType;
use crate::storage::Value;
use engine::rocks::util::new_engine;
use engine::rocks::SstWriterBuilder;
use engine::rocks::util::{new_engine, CFOptions};
use engine::rocks::{
ColumnFamilyOptions, DBEntryType, SstWriterBuilder, TablePropertiesCollector,
TablePropertiesCollectorFactory,
};
use engine::util::get_range_properties_cf;
use engine::CF_DEFAULT;
use engine::{name_to_cf, CF_DEFAULT, CF_WRITE, DATA_CFS};
use tempdir::TempDir;
use tikv_util::file::calc_crc32_bytes;

use std::collections::HashMap;

const PROP_TEST_MARKER_CF_NAME: &[u8] = b"tikv.test_marker_cf_name";

#[derive(Default)]
struct TestPropertiesCollectorFactory {
cf: String,
}

impl TestPropertiesCollectorFactory {
pub fn new(cf: impl Into<String>) -> Self {
Self { cf: cf.into() }
}
}

impl TablePropertiesCollectorFactory for TestPropertiesCollectorFactory {
fn create_table_properties_collector(
&mut self,
_: u32,
) -> Box<dyn TablePropertiesCollector> {
Box::new(TestPropertiesCollector::new(self.cf.clone()))
}
}

struct TestPropertiesCollector {
cf: String,
}

impl TestPropertiesCollector {
pub fn new(cf: String) -> Self {
Self { cf }
}
}

impl TablePropertiesCollector for TestPropertiesCollector {
fn add(&mut self, _: &[u8], _: &[u8], _: DBEntryType, _: u64, _: u64) {}

fn finish(&mut self) -> HashMap<Vec<u8>, Vec<u8>> {
std::iter::once((
PROP_TEST_MARKER_CF_NAME.to_owned(),
self.cf.as_bytes().to_owned(),
))
.collect()
}
}

fn new_engine_with_properties(path: &str, cfs: &[&str]) -> DB {
let cf_opts = cfs
.iter()
.map(|cf| {
let mut opt = ColumnFamilyOptions::new();
opt.add_table_properties_collector_factory(
"tikv.test_properties",
Box::new(TestPropertiesCollectorFactory::new(*cf)),
);
CFOptions::new(*cf, opt)
})
.collect();
new_engine(path, None, cfs, Some(cf_opts)).expect("rocks test engine")
}

#[test]
fn test_import_dir() {
let temp_dir = TempDir::new("test_import_dir").unwrap();
Expand Down Expand Up @@ -840,10 +904,11 @@ mod tests {
fn create_sst_writer_with_db(importer: &SSTImporter, meta: &SSTMeta) -> Result<SstWriter> {
let temp_dir = TempDir::new("test_import_dir").unwrap();
let db_path = temp_dir.path().join("db");
let db = new_engine(db_path.to_str().unwrap(), None, &["default"], None).unwrap();
let db = new_engine_with_properties(db_path.to_str().unwrap(), DATA_CFS);

let sst_writer = SstWriterBuilder::new()
.set_db(Arc::new(db))
.set_cf(name_to_cf(meta.get_cf_name()).unwrap())
.build(importer.get_path(meta).to_str().unwrap())
.unwrap();
Ok(sst_writer)
Expand Down Expand Up @@ -1045,63 +1110,67 @@ mod tests {

#[test]
fn test_download_sst_then_ingest() {
// creates a sample SST file.
let (_ext_sst_dir, backend, mut meta) = create_sample_external_sst_file().unwrap();

// performs the download.
let importer_dir = TempDir::new("importer_dir").unwrap();
let importer = SSTImporter::new(importer_dir.path()).unwrap();
let sst_writer = create_sst_writer_with_db(&importer, &meta).unwrap();

let range = importer
.download(
&meta,
&backend,
"sample.sst",
&new_rewrite_rule(b"t123", b"t9102", 0),
None,
sst_writer,
)
.unwrap()
.unwrap();

assert_eq!(range.get_start(), b"t9102_r01");
assert_eq!(range.get_end(), b"t9102_r13");

// performs the ingest
let ingest_dir = TempDir::new("ingest_dir").unwrap();
let db = new_engine(
ingest_dir.path().to_str().unwrap(),
None,
&[CF_DEFAULT],
None,
)
.unwrap();

meta.set_length(0); // disable validation.
meta.set_crc32(0);
importer.ingest(&meta, &db).unwrap();

// verifies the DB content is correct.
let mut iter = db.iter();
iter.seek(SeekKey::Start).unwrap();
assert_eq!(
iter.collect::<Vec<_>>(),
vec![
(b"zt9102_r01".to_vec(), b"abc".to_vec()),
(b"zt9102_r04".to_vec(), b"xyz".to_vec()),
(b"zt9102_r07".to_vec(), b"pqrst".to_vec()),
(b"zt9102_r13".to_vec(), b"www".to_vec()),
]
);
for cf in &[CF_DEFAULT, CF_WRITE] {
// creates a sample SST file.
let (_ext_sst_dir, backend, mut meta) = create_sample_external_sst_file().unwrap();
meta.set_cf_name(cf.to_string());

// performs the download.
let importer_dir = TempDir::new("importer_dir").unwrap();
let importer = SSTImporter::new(importer_dir.path()).unwrap();
let sst_writer = create_sst_writer_with_db(&importer, &meta).unwrap();

let range = importer
.download(
&meta,
&backend,
"sample.sst",
&new_rewrite_rule(b"t123", b"t9102", 0),
None,
sst_writer,
)
.unwrap()
.unwrap();

// check properties
let start = keys::data_key(b"");
let end = keys::data_end_key(b"");
let collection = get_range_properties_cf(&db, "default", &start, &end).unwrap();
assert!(!collection.is_empty());
for (_, v) in &*collection {
assert!(!v.user_collected_properties().is_empty());
assert_eq!(range.get_start(), b"t9102_r01");
assert_eq!(range.get_end(), b"t9102_r13");

// performs the ingest
let ingest_dir = TempDir::new("ingest_dir").unwrap();
let db = new_engine_with_properties(ingest_dir.path().to_str().unwrap(), DATA_CFS);

meta.set_length(0); // disable validation.
meta.set_crc32(0);
importer.ingest(&meta, &db).unwrap();

// verifies the DB content is correct.
let cf_handle = get_cf_handle(&db, cf).unwrap();
let mut iter = db.iter_cf(cf_handle);
iter.seek(SeekKey::Start).unwrap();
assert_eq!(
iter.collect::<Vec<_>>(),
vec![
(b"zt9102_r01".to_vec(), b"abc".to_vec()),
(b"zt9102_r04".to_vec(), b"xyz".to_vec()),
(b"zt9102_r07".to_vec(), b"pqrst".to_vec()),
(b"zt9102_r13".to_vec(), b"www".to_vec()),
]
);

// check properties
let start = keys::data_key(b"");
let end = keys::data_end_key(b"");
let collection = get_range_properties_cf(&db, cf, &start, &end).unwrap();
assert!(!collection.is_empty());
for (_, v) in &*collection {
assert!(!v.user_collected_properties().is_empty());
assert_eq!(
v.user_collected_properties()
.get(PROP_TEST_MARKER_CF_NAME)
.unwrap(),
cf.as_bytes()
);
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/import/sst_service.rs
Expand Up @@ -2,6 +2,7 @@

use std::sync::{Arc, Mutex};

use engine::name_to_cf;
use engine::rocks::util::{compact_files_in_range, io_limiter::IOLimiter};
use engine::rocks::{SstWriterBuilder, DB};
use futures::sync::mpsc;
Expand Down Expand Up @@ -153,6 +154,7 @@ impl<Router: RaftStoreRouter> ImportSst for ImportSSTService<Router> {
let limiter = self.limiter.clone();
let sst_writer = SstWriterBuilder::new()
.set_db(self.engine.clone())
.set_cf(name_to_cf(req.get_sst().get_cf_name()).unwrap())
.build(self.importer.get_path(req.get_sst()).to_str().unwrap())
.unwrap();

Expand Down

0 comments on commit ea7de98

Please sign in to comment.