Skip to content

Commit

Permalink
Filter tables with wild cards (#151)
Browse files Browse the repository at this point in the history
* Filter tables with wild cards

* Docs
  • Loading branch information
evgeniy-r committed Feb 10, 2022
1 parent 1661132 commit f679e34
Show file tree
Hide file tree
Showing 8 changed files with 342 additions and 137 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]
### 🚀 Added
- Add wildcards support in the filter section [#151](https://github.com/datanymizer/datanymizer/pull/151)
([@evgeniy-r](https://github.com/evgeniy-r))

### ⚙️ Changed

Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ filter:
- public.markets
```

You can use wildcards in the `filter` section:

* `?` matches exactly one occurrence of any character;
* `*` matches arbitrary many (including zero) occurrences of any character.

### Dump conditions and limit

You can specify conditions (SQL `WHERE` statement) and limit for dumped data per table:
Expand Down
16 changes: 8 additions & 8 deletions datanymizer_dumper/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use core::iter::Iterator;
use datanymizer_engine::{Filter, Settings};
use datanymizer_engine::Settings;
use indicatif::HumanDuration;
use solvent::DepGraph;
use std::{collections::HashMap, hash::Hash, time::Instant};
Expand Down Expand Up @@ -34,17 +34,17 @@ pub trait Dumper: 'static + Sized + Send {
/// This stage makes dump foreign keys, indices and other...
fn post_data(&mut self, _connection: &mut Self::Connection) -> Result<()>;

fn filter_table(&mut self, table: String, filter: &Option<Filter>) -> bool {
if let Some(f) = filter {
f.filter_schema(&table) && f.filter_data(&table)
} else {
true
}
fn filter_table(&self, table: String) -> bool {
self.settings()
.filter
.as_ref()
.map(|f| f.filter_table(&table))
.unwrap_or(true)
}

fn schema_inspector(&self) -> Self::SchemaInspector;

fn settings(&mut self) -> Settings;
fn settings(&self) -> &Settings;

fn write_log(&mut self, message: String) -> Result<()>;

Expand Down
100 changes: 61 additions & 39 deletions datanymizer_dumper/src/postgres/dumper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub struct PgDumper<W: Write + Send, I: Indicator + Send> {
dump_isolation_level: Option<IsolationLevel>,
pg_dump_location: String,
pg_dump_args: Vec<String>,
tables: Vec<PgTable>,
}

impl<W: 'static + Write + Send, I: 'static + Indicator + Send> PgDumper<W, I> {
Expand All @@ -39,6 +40,7 @@ impl<W: 'static + Write + Send, I: 'static + Indicator + Send> PgDumper<W, I> {
pg_dump_location,
schema_inspector: PgSchemaInspector {},
pg_dump_args,
tables: Vec::new(),
})
}

Expand Down Expand Up @@ -74,7 +76,6 @@ impl<W: 'static + Write + Send, I: 'static + Indicator + Send> PgDumper<W, I> {
}

fn dump_table(&mut self, table: &PgTable, qw: &mut QueryWrapper) -> Result<()> {
let settings = self.settings();
let started = Instant::now();

self.write_log(format!("Dump table: {}", &table.get_full_name()))?;
Expand All @@ -83,7 +84,7 @@ impl<W: 'static + Write + Send, I: 'static + Indicator + Send> PgDumper<W, I> {
self.dump_writer.write_all(table.query_from().as_bytes())?;
self.dump_writer.write_all(b"\n")?;

let cfg = settings.find_table(&table.get_names());
let cfg = self.engine.settings.find_table(&table.get_names());

self.indicator
.start_pb(table.count_of_query_to(cfg), &table.get_full_name());
Expand Down Expand Up @@ -141,35 +142,40 @@ impl<W: 'static + Write + Send, I: 'static + Indicator + Send> Dumper for PgDump

// Stage before dumping data. It makes dump schema with any options
fn pre_data(&mut self, connection: &mut Self::Connection) -> Result<()> {
self.debug("Fetch tables metadata...".into());
let mut tables = self.schema_inspector().ordered_tables(connection);

sort_tables(
&mut tables,
self.engine.settings.table_order.as_ref().unwrap_or(&vec![]),
);
self.tables = tables.into_iter().map(|(t, _)| t).collect();

if let Some(filter) = &mut self.engine.settings.filter {
filter.load_tables(self.tables.iter().map(|t| t.get_full_name()).collect());
}

self.debug("Prepare data scheme...".into());
self.run_pg_dump("pre-data", connection.url.as_str())
}

// This stage makes dump data only
fn data(&mut self, connection: &mut Self::Connection) -> Result<()> {
let settings = self.settings();
self.write_log("Start dumping data".into())?;
self.debug("Fetch tables metadata...".into());

let mut tables = self.schema_inspector().ordered_tables(connection);
sort_tables(
&mut tables,
settings.table_order.as_ref().unwrap_or(&vec![]),
);

let all_tables_count = tables.len();
let all_tables_count = self.tables.len();

let mut query_wrapper =
QueryWrapper::with_isolation_level(&mut connection.client, self.dump_isolation_level)?;
for (ind, (table, _weight)) in tables.iter().enumerate() {
for (ind, table) in self.tables.clone().iter().enumerate() {
self.debug(format!(
"[{} / {}] Prepare to dump table: {}",
ind + 1,
all_tables_count,
table.get_full_name(),
));

if self.filter_table(table.get_full_name(), &settings.filter) {
if self.filter_table(table.get_full_name()) {
self.dump_table(table, &mut query_wrapper)?;
} else {
self.debug(format!("[Dumping: {}] --- SKIP ---", table.get_full_name()));
Expand All @@ -190,8 +196,8 @@ impl<W: 'static + Write + Send, I: 'static + Indicator + Send> Dumper for PgDump
self.schema_inspector.clone()
}

fn settings(&mut self) -> Settings {
self.engine.settings.clone()
fn settings(&self) -> &Settings {
&self.engine.settings
}

fn write_log(&mut self, message: String) -> Result<()> {
Expand All @@ -208,15 +214,14 @@ impl<W: 'static + Write + Send, I: 'static + Indicator + Send> Dumper for PgDump
fn table_args(filter: &Option<Filter>) -> Result<Vec<String>> {
let mut args = vec![];
if let Some(f) = filter {
if let Some(list) = &f.schema {
let flag = match list {
TableList::Only(_) => "-t",
TableList::Except(_) => "-T",
};
for table in list.tables() {
args.push(String::from(flag));
args.push(PgTable::quote_table_name(table.as_str())?);
}
let list = f.schema_match_list();
let flag = match list {
TableList::Only(_) => "-t",
TableList::Except(_) => "-T",
};
for table in list.tables() {
args.push(String::from(flag));
args.push(PgTable::quote_table_name(table.as_str())?);
}
}

Expand All @@ -236,31 +241,48 @@ mod tests {

#[test]
fn test_table_args() {
let tables = vec![String::from("table1"), String::from("table2")];

let empty: Vec<String> = vec![];
assert_eq!(table_args(&None).unwrap(), empty);

let filter = Filter {
schema: Some(TableList::Except(vec![String::from("table1")])),
data: None,
};
let mut filter = Filter::new(
TableList::Except(vec![String::from("table1")]),
TableList::default(),
);
filter.load_tables(tables.clone());
assert_eq!(
table_args(&Some(filter)).unwrap(),
vec![String::from("-T"), String::from("\"table1\"")]
);

let filter = Filter {
schema: None,
data: Some(TableList::Except(vec![String::from("table1")])),
};
let mut filter = Filter::new(
TableList::default(),
TableList::Except(vec![String::from("table1")]),
);
filter.load_tables(tables.clone());
assert_eq!(table_args(&Some(filter)).unwrap(), empty);

let filter = Filter {
schema: Some(TableList::Only(vec![
String::from("table1"),
String::from("table2"),
])),
data: None,
};
let mut filter = Filter::new(
TableList::Only(vec![String::from("table1"), String::from("table2")]),
TableList::default(),
);
filter.load_tables(tables.clone());
assert_eq!(
table_args(&Some(filter)).unwrap(),
vec![
String::from("-t"),
String::from("\"table1\""),
String::from("-t"),
String::from("\"table2\"")
]
);

let mut filter = Filter::new(
TableList::Only(vec![String::from("table*")]),
TableList::default(),
);
filter.load_tables(tables);
assert_eq!(
table_args(&Some(filter)).unwrap(),
vec![
Expand Down
11 changes: 6 additions & 5 deletions datanymizer_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ edition = "2021"

[dependencies]
anyhow = "1.0"
chrono = "0.4"
config = "0.10"
csv = "1.1"
fake = { version = "2.4.1", features = ["chrono"] }
serde = { version = "1.0", features = ["derive"] }
once_cell = "1.5.2"
rand = "0.8.4"
unicode-segmentation = "1.7.0"
serde_yaml = "0.8.14"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.8.14"
tera = "1.15.0"
chrono = "0.4"
once_cell = "1.5.2"
thiserror = "1.0"
unicode-segmentation = "1.7.0"
wildmatch = "2.1.0"
Loading

0 comments on commit f679e34

Please sign in to comment.