Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracing #153

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 57 additions & 24 deletions datanymizer_dumper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub trait SchemaInspector: 'static + Sized + Send + Clone {
type Connection;
type Table: Table<Self::Type>;
type Column: ColumnData<Self::Type>;
type ForeignKey;

/// Get all tables in the database
fn get_tables(&self, connection: &mut Self::Connection) -> Result<Vec<Self::Table>>;
Expand All @@ -64,37 +65,67 @@ pub trait SchemaInspector: 'static + Sized + Send + Clone {
fn get_table_size(&self, connection: &mut Self::Connection, table: &Self::Table)
-> Result<i64>;

/// Get all dependencies (by FK) for `table` in database
fn get_dependencies(
/// Get foreign keys for table
fn get_foreign_keys(
&self,
connection: &mut Self::Connection,
table: &Self::Table,
) -> Result<Vec<Self::Table>>;

fn ordered_tables(&self, connection: &mut Self::Connection) -> Vec<(Self::Table, i32)> {
let mut res: HashMap<Self::Table, i32> = HashMap::new();
let mut depgraph: DepGraph<Self::Table> = DepGraph::new();
if let Ok(tables) = self.get_tables(connection) {
for table in tables.iter() {
let deps: Vec<Self::Table> = self
.get_dependencies(connection, table)
.unwrap_or_default()
.into_iter()
.collect();
depgraph.register_dependencies(table.clone(), deps);
}
) -> Result<Vec<Self::ForeignKey>>;

fn ordered_tables(&self, connection: &mut Self::Connection) -> Result<Vec<(Self::Table, i32)>> {
let mut depgraph: DepGraph<String> = DepGraph::new();

let started = Instant::now();
println!("Inspecting tables ...");

for table in tables.iter() {
let _ = res.entry(table.clone()).or_insert(0);
if let Ok(nodes) = depgraph.dependencies_of(table) {
for node in nodes.flatten() {
let counter = res.entry(node.clone()).or_insert(0);
*counter += 1;
}
let tables = self.get_tables(connection)?;
let mut weight_map: HashMap<String, i32> = HashMap::with_capacity(tables.len());

println!(
"Inspecting completed in {}",
HumanDuration(started.elapsed())
);

println!("Inspecting table dependencies...");
let started = Instant::now();
for table in tables.iter() {
depgraph.register_dependencies(table.get_full_name(), table.get_dep_table_names());
}

println!(
"Inspecting completed in {}",
HumanDuration(started.elapsed())
);

println!("Processing table dependencies...");
let started = Instant::now();

for table in tables.iter() {
let name = table.get_full_name();
weight_map.entry(name.clone()).or_insert(0);
if let Ok(dep_names) = depgraph.dependencies_of(&name) {
for dep_name in dep_names.flatten() {
let weight = weight_map.entry(dep_name.clone()).or_insert(0);
*weight += 1;
}
}
}
res.iter().map(|(k, b)| (k.clone(), *b)).collect()

println!(
"Processing completed in {}",
HumanDuration(started.elapsed())
);

Ok(tables
.into_iter()
.map(|t| {
let name = t.get_full_name();
(
t,
weight_map.get(name.as_str()).copied().unwrap_or_default(),
)
})
.collect())
}

/// Get columns for table
Expand Down Expand Up @@ -124,6 +155,8 @@ pub trait Table<T>: Sized + Send + Clone + Eq + Hash {
fn get_size(&self) -> i64;
/// Get column name - index map
fn get_column_indexes(&self) -> &HashMap<String, usize>;
/// Get depended table names
fn get_dep_table_names(&self) -> Vec<String>;
}

pub trait ColumnData<T> {
Expand Down
16 changes: 14 additions & 2 deletions datanymizer_dumper/src/postgres/dumper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::{
use crate::{indicator::Indicator, Dumper, SchemaInspector, Table};
use anyhow::Result;
use datanymizer_engine::{Engine, Filter, Settings, TableList};
use indicatif::HumanDuration;
use postgres::IsolationLevel;
use std::{
io::{self, prelude::*},
Expand Down Expand Up @@ -142,14 +143,25 @@ impl<W: 'static + Write + Send, I: 'static + Indicator + Send> Dumper for PgDump

// Stage before dumping data. It makes dump schema with any options
fn pre_data(&mut self, connection: &mut Self::Connection) -> Result<()> {
self.debug("Fetch tables metadata...".into());
let mut tables = self.schema_inspector().ordered_tables(connection);
let started = Instant::now();
self.debug("Fetching tables metadata...".into());
let mut tables = self.schema_inspector().ordered_tables(connection)?;
self.debug(format!(
"Fetching completed in {}",
HumanDuration(started.elapsed())
));

let started = Instant::now();
self.debug("Sorting tables...".into());
sort_tables(
&mut tables,
self.engine.settings.table_order.as_ref().unwrap_or(&vec![]),
);
self.tables = tables.into_iter().map(|(t, _)| t).collect();
self.debug(format!(
"Sorting completed in {}",
HumanDuration(started.elapsed())
));

if let Some(filter) = &mut self.engine.settings.filter {
filter.load_tables(self.tables.iter().map(|t| t.get_full_name()).collect());
Expand Down
6 changes: 3 additions & 3 deletions datanymizer_dumper/src/postgres/foreign_key.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use postgres::Row as PostgresRow;

#[derive(Debug)]
pub struct ForeignKey {
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct PgForeignKey {
// Source
pub table_schema: String,
pub table_name: String,
Expand All @@ -15,7 +15,7 @@ pub struct ForeignKey {
pub foreign_column_name: String,
}

impl From<PostgresRow> for ForeignKey {
impl From<PostgresRow> for PgForeignKey {
fn from(row: PostgresRow) -> Self {
Self {
table_schema: row.get("table_schema"),
Expand Down
68 changes: 32 additions & 36 deletions datanymizer_dumper/src/postgres/schema_inspector.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use super::{
column::PgColumn, connector, foreign_key::ForeignKey, sequence::PgSequence, table::PgTable,
column::PgColumn, connector, foreign_key::PgForeignKey, sequence::PgSequence, table::PgTable,
SchemaInspector,
};
use crate::Table;
use anyhow::Result;
use postgres::types::Type;

Expand All @@ -27,7 +26,7 @@ const TABLE_FOREIGN_KEYS: &str = "SELECT
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_name = $1";
WHERE tc.constraint_type = 'FOREIGN KEY'"; // AND tc.table_schema = $1 AND tc.table_name = $2

const TABLE_COLUMNS_QUERY: &str = "SELECT cc.column_name, cc.ordinal_position, cc.data_type, pt.oid
FROM information_schema.columns as cc
Expand All @@ -54,10 +53,16 @@ impl SchemaInspector for PgSchemaInspector {
type Connection = connector::Connection;
type Table = PgTable;
type Column = PgColumn;
type ForeignKey = PgForeignKey;

// Get all tables in the database
fn get_tables(&self, connection: &mut Self::Connection) -> Result<Vec<Self::Table>> {
let mut counter = 0;
let foreign_keys: Vec<PgForeignKey> = connection
.client
.query(TABLE_FOREIGN_KEYS, &[])?
.into_iter()
.map(|row| row.into())
.collect();
let items: Vec<Self::Table> = connection
.client
.query(PG_CATALOG_SCHEMA, &[])?
Expand All @@ -70,14 +75,29 @@ impl SchemaInspector for PgSchemaInspector {
if let Ok(sequences) = self.get_sequences(connection, &table) {
table.set_sequences(sequences);
};
// if let Ok(foreign_keys) = self.get_foreign_keys(connection, &table) {
// table.set_foreign_keys(foreign_keys);
// };
table.set_foreign_keys(
foreign_keys
.iter()
.filter_map(|fk| {
if fk.table_schema == table.schemaname
&& fk.table_name == table.tablename
{
Some(fk.clone())
} else {
None
}
})
.collect(),
);

match self.get_table_size(connection, &table) {
Ok(size) => table.size = size as i64,
Err(e) => panic!("ERR: {}", e),
}

counter += 1;

table
})
.collect();
Expand All @@ -97,41 +117,17 @@ impl SchemaInspector for PgSchemaInspector {
Ok(size)
}

// Get all dependencies (by FK) for `table` in database
fn get_dependencies(
fn get_foreign_keys(
&self,
connection: &mut Self::Connection,
table: &Self::Table,
) -> Result<Vec<Self::Table>> {
let fkeys_iterator = connection
) -> Result<Vec<Self::ForeignKey>> {
Ok(connection
.client
.query(TABLE_FOREIGN_KEYS, &[&table.get_name()])?
.query(TABLE_FOREIGN_KEYS, &[&table.schemaname, &table.tablename])?
.into_iter()
.map(|row| row.into());

let tables: Vec<Self::Table> = fkeys_iterator
// Table from foreign key
.map(|fkey: ForeignKey| {
PgTable::new(fkey.foreign_table_name, fkey.foreign_table_schema)
})
// Columns for table
.map(|mut table| {
if let Ok(columns) = self.get_columns(connection, &table) {
table.set_columns(columns);
};
if let Ok(sequences) = self.get_sequences(connection, &table) {
table.set_sequences(sequences);
};

match self.get_table_size(connection, &table) {
Ok(size) => table.size = size as i64,
Err(e) => println!("ERR: {}", e),
}

table
})
.collect();
Ok(tables)
.map(|row| row.into())
.collect())
}

/// Get columns for table
Expand Down
15 changes: 14 additions & 1 deletion datanymizer_dumper/src/postgres/table.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{column::PgColumn, row::PgRow, sequence::PgSequence};
use super::{column::PgColumn, foreign_key::PgForeignKey, row::PgRow, sequence::PgSequence};
use crate::Table;
use anyhow::{anyhow, Result};
use datanymizer_engine::{Query as QueryCfg, Table as TableCfg};
Expand All @@ -14,6 +14,7 @@ pub struct PgTable {
pub schemaname: String,
pub columns: Vec<PgColumn>,
pub sequences: Vec<PgSequence>,
pub foreign_keys: Vec<PgForeignKey>,
column_indexes: HashMap<String, usize>,
pub size: i64,
}
Expand Down Expand Up @@ -64,6 +65,13 @@ impl Table<Type> for PgTable {
fn get_column_indexes(&self) -> &HashMap<String, usize> {
&self.column_indexes
}

fn get_dep_table_names(&self) -> Vec<String> {
self.foreign_keys
.iter()
.map(|fk| format!("{}.{}", fk.foreign_table_schema, fk.foreign_table_name))
.collect()
}
}

impl PgTable {
Expand All @@ -73,6 +81,7 @@ impl PgTable {
schemaname,
columns: vec![],
sequences: vec![],
foreign_keys: vec![],
column_indexes: HashMap::new(),
size: 0,
}
Expand Down Expand Up @@ -107,6 +116,10 @@ impl PgTable {
self.sequences = sequences;
}

pub fn set_foreign_keys(&mut self, foreign_keys: Vec<PgForeignKey>) {
self.foreign_keys = foreign_keys;
}

pub fn transformed_query_to(
&self,
cfg: Option<&TableCfg>,
Expand Down