Skip to content

Commit

Permalink
encryption: Fix errors related to file ingestion (tikv#7806)
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Wu <yiwu@pingcap.com>
  • Loading branch information
yiwu-arbug committed May 13, 2020
1 parent cc9ac0d commit 3adc3da
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 45 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/encryption/Cargo.toml
Expand Up @@ -44,3 +44,4 @@ toml = "0.4"
rusoto_mock = "0.43.0"
rust-ini = "0.14.0"
structopt = "0.3"
test_util = { path = "../test_util" }
4 changes: 2 additions & 2 deletions components/encryption/src/config.rs
Expand Up @@ -38,7 +38,7 @@ impl Default for EncryptionConfig {
#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
#[serde(rename_all = "kebab-case")]
pub struct FileCofnig {
pub struct FileConfig {
pub path: String,
}

Expand Down Expand Up @@ -89,7 +89,7 @@ pub enum MasterKeyConfig {
#[serde(rename_all = "kebab-case")]
File {
#[serde(flatten)]
config: FileCofnig,
config: FileConfig,
},

#[serde(rename_all = "kebab-case")]
Expand Down
80 changes: 47 additions & 33 deletions components/encryption/src/manager/mod.rs
Expand Up @@ -172,38 +172,41 @@ impl Dicts {
"fname" => fname,
"method" => format!("{:?}", method),
"iv" => hex::encode(iv.as_slice()));
} else {
info!("new plaintext file"; "fname" => fname);
}
Ok(self.file_dict.files.get(fname).unwrap())
}

fn delete_file(&mut self, fname: &str) -> Result<()> {
let file = self.file_dict.files.remove(fname).ok_or_else(|| {
Error::Io(IoError::new(
ErrorKind::NotFound,
format!("file not found, {}", fname),
))
})?;
let file = match self.file_dict.files.remove(fname) {
Some(file_info) => file_info,
None => {
// Could be a plaintext file not tracked by file dictionary.
info!("delete untracked plaintext file"; "fname" => fname);
return Ok(());
}
};

// TOOD GC unused data keys.
self.save_file_dict()?;
if file.method != compat(EncryptionMethod::Plaintext) {
info!("delete encrypted file"; "fname" => fname);
} else {
info!("delete plaintext file"; "fname" => fname);
}
Ok(())
}

fn link_file(&mut self, src_fname: &str, dst_fname: &str) -> Result<()> {
let file = self
.file_dict
.files
.get(src_fname)
.cloned()
.ok_or_else(|| {
Error::Io(IoError::new(
ErrorKind::NotFound,
format!("file not found, {}", src_fname),
))
})?;
let file = match self.file_dict.files.get(src_fname) {
Some(file_info) => file_info.clone(),
None => {
// Could be a plaintext file not tracked by file dictionary.
info!("link untracked plaintext file"; "src" => src_fname, "dst" => dst_fname);
return Ok(());
}
};
if self.file_dict.files.get(dst_fname).is_some() {
return Err(Error::Io(IoError::new(
ErrorKind::AlreadyExists,
Expand All @@ -215,22 +218,28 @@ impl Dicts {
self.save_file_dict()?;
if method != compat(EncryptionMethod::Plaintext) {
info!("link encrypted file"; "src" => src_fname, "dst" => dst_fname);
} else {
info!("link plaintext file"; "src" => src_fname, "dst" => dst_fname);
}
Ok(())
}

fn rename_file(&mut self, src_fname: &str, dst_fname: &str) -> Result<()> {
let file = self.file_dict.files.remove(src_fname).ok_or_else(|| {
Error::Io(IoError::new(
ErrorKind::NotFound,
format!("file not found, {}", src_fname),
))
})?;
let file = match self.file_dict.files.remove(src_fname) {
Some(file_info) => file_info,
None => {
// Could be a plaintext file not tracked by file dictionary.
info!("rename untracked plaintext file"; "src" => src_fname, "dst" => dst_fname);
return Ok(());
}
};
let method = file.method;
self.file_dict.files.insert(dst_fname.to_owned(), file);
self.save_file_dict()?;
if method != compat(EncryptionMethod::Plaintext) {
info!("rename encrypted file"; "src" => src_fname, "dst" => dst_fname);
} else {
info!("rename plaintext file"; "src" => src_fname, "dst" => dst_fname);
}
Ok(())
}
Expand Down Expand Up @@ -506,7 +515,7 @@ impl EncryptionKeyManager for DataKeyManager {
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{FileCofnig, Mock};
use crate::config::{FileConfig, Mock};
use crate::master_key::tests::MockBackend;

use engine_traits::EncryptionMethod as DBEncryptionMethod;
Expand All @@ -519,6 +528,7 @@ mod tests {
};
use tempfile::TempDir;

// TODO(yiwu): use the similar method in test_util crate instead.
fn new_tmp_key_manager(
temp: Option<tempfile::TempDir>,
method: Option<EncryptionMethod>,
Expand All @@ -539,6 +549,7 @@ mod tests {
(tmp, manager)
}

// TODO(yiwu): use the similar method in test_util crate instead.
fn create_key_file(name: &str) -> (PathBuf, TempDir) {
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.path().join(name);
Expand Down Expand Up @@ -702,8 +713,8 @@ mod tests {
let get_file = manager.get_file("foo").unwrap();
assert_eq!(new_file, get_file);
manager.delete_file("foo").unwrap();
manager.delete_file("foo").unwrap_err();
manager.delete_file("foo1").unwrap_err();
manager.delete_file("foo").unwrap();
manager.delete_file("foo1").unwrap();

// Must be plaintext if file not found.
let file = manager.get_file("foo").unwrap();
Expand Down Expand Up @@ -740,7 +751,7 @@ mod tests {
assert_eq!(file1, file);

// Source file not exists.
manager.link_file("not exists", "not exists1").unwrap_err();
manager.link_file("not exists", "not exists1").unwrap();
// Target file already exists.
manager.new_file("foo2").unwrap();
manager.link_file("foo2", "foo1").unwrap_err();
Expand All @@ -760,10 +771,13 @@ mod tests {
assert_eq!(file1, file);

// foo must not exist (should be plaintext)
manager.rename_file("foo", "foo2").unwrap_err();
let file2 = manager.get_file("foo").unwrap();
assert_ne!(file2, file);
assert_eq!(file2.method, DBEncryptionMethod::Plaintext);
manager.rename_file("foo", "foo2").unwrap();
let file_foo = manager.get_file("foo").unwrap();
assert_ne!(file_foo, file);
assert_eq!(file_foo.method, DBEncryptionMethod::Plaintext);
let file_foo2 = manager.get_file("foo2").unwrap();
assert_ne!(file_foo2, file);
assert_eq!(file_foo2.method, DBEncryptionMethod::Plaintext);
}

#[test]
Expand Down Expand Up @@ -865,7 +879,7 @@ mod tests {
fn test_key_manager_rotate_on_key_expose() {
let (key_path, _tmp_key_dir) = create_key_file("key");
let master_key = MasterKeyConfig::File {
config: FileCofnig {
config: FileConfig {
path: key_path.to_str().unwrap().to_owned(),
},
};
Expand Down Expand Up @@ -911,7 +925,7 @@ mod tests {
fn test_expose_keys_on_insecure_backend() {
let (key_path, _tmp_key_dir) = create_key_file("key");
let master_key = MasterKeyConfig::File {
config: FileCofnig {
config: FileConfig {
path: key_path.to_str().unwrap().to_owned(),
},
};
Expand Down
1 change: 1 addition & 0 deletions components/sst_importer/Cargo.toml
Expand Up @@ -49,4 +49,5 @@ uuid = { version = "0.8.1", features = ["serde", "v4"] }
[dev-dependencies]
engine_rocks = { path = "../engine_rocks" }
tempfile = "3.0"
test_util = { path = "../test_util" }
test_sst_importer = { path = "../test_sst_importer" }
53 changes: 45 additions & 8 deletions components/sst_importer/src/util.rs
Expand Up @@ -34,6 +34,9 @@ pub fn prepare_sst_for_ingestion<P: AsRef<Path>, Q: AsRef<Path>>(
let clone = clone.as_ref().to_str().unwrap();

if Path::new(clone).exists() {
if let Some(key_manager) = encryption_key_manager {
key_manager.delete_file(clone)?;
}
fs::remove_file(clone).map_err(|e| format!("remove {}: {:?}", clone, e))?;
}

Expand Down Expand Up @@ -77,18 +80,19 @@ fn copy_and_sync<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> Result<()> {
mod tests {
use super::prepare_sst_for_ingestion;

use encryption::DataKeyManager;
use engine_rocks::{
util::{new_engine, RocksCFOptions},
RocksColumnFamilyOptions, RocksDBOptions, RocksEngine, RocksIngestExternalFileOptions,
RocksSstWriterBuilder, RocksTitanDBOptions,
};
use engine_traits::{
CFHandleExt, CfName, ColumnFamilyOptions, DBOptions, ImportExt, IngestExternalFileOptions,
Peekable, SstWriter, SstWriterBuilder, TitanDBOptions,
CFHandleExt, CfName, ColumnFamilyOptions, DBOptions, EncryptionKeyManager, ImportExt,
IngestExternalFileOptions, Peekable, SstWriter, SstWriterBuilder, TitanDBOptions,
};
use std::fs;
use std::path::Path;
use std::{fs, path::Path, sync::Arc};
use tempfile::Builder;
use test_util::encryption::new_test_key_manager;
use tikv_util::file::calc_crc32;

#[cfg(unix)]
Expand Down Expand Up @@ -126,6 +130,8 @@ mod tests {
fn check_prepare_sst_for_ingestion(
db_opts: Option<RocksDBOptions>,
cf_opts: Option<Vec<RocksCFOptions>>,
key_manager: Option<&Arc<DataKeyManager>>,
was_encrypted: bool,
) {
let path = Builder::new()
.prefix("_util_rocksdb_test_prepare_sst_for_ingestion")
Expand All @@ -152,15 +158,22 @@ mod tests {
let size = fs::metadata(&sst_path).unwrap().len();
let checksum = calc_crc32(&sst_path).unwrap();

if was_encrypted {
// Add the file to key_manager to simulate an encrypted file.
if let Some(manager) = key_manager {
manager.new_file(sst_path.to_str().unwrap()).unwrap();
}
}

// The first ingestion will hard link sst_path to sst_clone.
check_hard_link(&sst_path, 1);
prepare_sst_for_ingestion(&sst_path, &sst_clone, None).unwrap();
prepare_sst_for_ingestion(&sst_path, &sst_clone, key_manager).unwrap();
db.validate_sst_for_ingestion(cf, &sst_clone, size, checksum)
.unwrap();
check_hard_link(&sst_path, 2);
check_hard_link(&sst_clone, 2);
// If we prepare again, it will use hard link too.
prepare_sst_for_ingestion(&sst_path, &sst_clone, None).unwrap();
prepare_sst_for_ingestion(&sst_path, &sst_clone, key_manager).unwrap();
db.validate_sst_for_ingestion(cf, &sst_clone, size, checksum)
.unwrap();
check_hard_link(&sst_path, 2);
Expand All @@ -169,10 +182,15 @@ mod tests {
.unwrap();
check_db_with_kvs(&db, cf_name, &kvs);
assert!(!sst_clone.exists());
// Since we are not using key_manager in db, simulate the db deleting the file from
// key_manager.
if let Some(manager) = key_manager {
manager.delete_file(sst_clone.to_str().unwrap()).unwrap();
}

// The second ingestion will copy sst_path to sst_clone.
check_hard_link(&sst_path, 2);
prepare_sst_for_ingestion(&sst_path, &sst_clone, None).unwrap();
prepare_sst_for_ingestion(&sst_path, &sst_clone, key_manager).unwrap();
db.validate_sst_for_ingestion(cf, &sst_clone, size, checksum)
.unwrap();
check_hard_link(&sst_path, 2);
Expand All @@ -185,7 +203,10 @@ mod tests {

#[test]
fn test_prepare_sst_for_ingestion() {
check_prepare_sst_for_ingestion(None, None);
check_prepare_sst_for_ingestion(
None, None, None, /*key_manager*/
false, /* was encrypted*/
);
}

#[test]
Expand All @@ -200,6 +221,22 @@ mod tests {
check_prepare_sst_for_ingestion(
Some(db_opts),
Some(vec![RocksCFOptions::new("default", cf_opts)]),
None, /*key_manager*/
false, /*was_encrypted*/
);
}

#[test]
fn test_prepare_sst_for_ingestion_with_key_manager_plaintext() {
let (_tmp_dir, key_manager) = new_test_key_manager(None, None, None, None);
let manager = Arc::new(key_manager.unwrap().unwrap());
check_prepare_sst_for_ingestion(None, None, Some(&manager), false /*was_encrypted*/);
}

#[test]
fn test_prepare_sst_for_ingestion_with_key_manager_encrypted() {
let (_tmp_dir, key_manager) = new_test_key_manager(None, None, None, None);
let manager = Arc::new(key_manager.unwrap().unwrap());
check_prepare_sst_for_ingestion(None, None, Some(&manager), true /*was_encrypted*/);
}
}
2 changes: 2 additions & 0 deletions components/test_util/Cargo.toml
Expand Up @@ -8,10 +8,12 @@ publish = false
encryption = { path = "../encryption" }
fail = "0.3"
grpcio = { version = "0.5", default-features = false, features = ["openssl-vendored"] }
kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false }
rand = "0.7"
rand_isaac = "0.2"
security = { path = "../security" }
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "0e23a5baff302a9d7bccd85f8f31e43339c2f2c1" }
tempfile = "3.0"
tikv_util = { path = "../tikv_util" }
time = "0.1"

0 comments on commit 3adc3da

Please sign in to comment.