Skip to content

Commit

Permalink
feat: prevent exporting physical table data
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed May 17, 2024
1 parent 3477fde commit 8b094c9
Showing 1 changed file with 25 additions and 17 deletions.
42 changes: 25 additions & 17 deletions src/cmd/src/cli/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,12 @@ impl Export {
}

/// Return a list of [`TableReference`] to be exported.
/// Includes all tables under the given `catalog` and `schema`
async fn get_table_list(&self, catalog: &str, schema: &str) -> Result<Vec<TableReference>> {
/// Includes all tables under the given `catalog` and `schema`.
async fn get_table_list(
&self,
catalog: &str,
schema: &str,
) -> Result<(Vec<TableReference>, Vec<TableReference>)> {
// Puts all metric table first
let sql = format!(
"select table_catalog, table_schema, table_name from \
Expand Down Expand Up @@ -232,11 +236,11 @@ impl Export {
remaining_tables.push(table);
}
}
let mut tables = Vec::with_capacity(metric_physical_tables.len() + remaining_tables.len());
tables.extend(metric_physical_tables.into_iter());
tables.extend(remaining_tables);

Ok(tables)
Ok((
metric_physical_tables.into_iter().collect(),
remaining_tables,
))
}

async fn show_create_table(&self, catalog: &str, schema: &str, table: &str) -> Result<String> {
Expand Down Expand Up @@ -265,15 +269,16 @@ impl Export {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
let table_list = self.get_table_list(&catalog, &schema).await?;
let (metric_physical_tables, remaining_tables) =
self.get_table_list(&catalog, &schema).await?;
let table_count = table_list.len();
tokio::fs::create_dir_all(&self.output_dir)
.await
.context(FileIoSnafu)?;
let output_file =
Path::new(&self.output_dir).join(format!("{catalog}-{schema}.sql"));
let mut file = File::create(output_file).await.context(FileIoSnafu)?;
for (c, s, t) in table_list {
for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) {
match self.show_create_table(&c, &s, &t).await {
Err(e) => {
error!(e; r#"Failed to export table "{}"."{}"."{}""#, c, s, t)
Expand Down Expand Up @@ -322,15 +327,18 @@ impl Export {
.await
.context(FileIoSnafu)?;
let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/"));

// copy database to
let sql = format!(
"copy database {} to '{}' with (format='parquet');",
schema,
output_dir.to_str().unwrap()
);
self.sql(&sql).await?;
info!("finished exporting {catalog}.{schema} data");
// Ignores metric physical tables
let (_, table_lists) = self.get_table_list(&catalog, &schema).await?;
for (_, _, table_name) in table_list {
// copy table to
let sql = format!(
"copy table {} to '{}' with (format='parquet');",
schema,
format!("{}{}.parquet",output_dir.to_str().unwrap(),);
);
self.sql(&sql).await?;
info!("finished exporting {catalog}.{schema} data");
}

// export copy from sql
let dir_filenames = match output_dir.read_dir() {
Expand Down

0 comments on commit 8b094c9

Please sign in to comment.