Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,15 @@ pub enum DatabaseTablesCommands {
schema: String,

/// Path to a local parquet file to upload and load
#[arg(long, conflicts_with = "upload_id")]
#[arg(long, conflicts_with_all = ["upload_id", "url"])]
file: Option<String>,

/// URL of a remote parquet file to download and load
#[arg(long, conflicts_with_all = ["file", "upload_id"])]
url: Option<String>,

/// Use a previously staged upload ID from `POST /v1/files` instead of uploading
#[arg(long)]
#[arg(long, conflicts_with_all = ["file", "url"])]
upload_id: Option<String>,
},

Expand Down
99 changes: 73 additions & 26 deletions src/databases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,32 @@ fn table_rows_for_database(db_name: &str, tables: Vec<InfoTable>) -> Vec<TableRo
.collect()
}

fn finish_upload(api: &ApiClient, reader: impl std::io::Read + Send + 'static, size: Option<u64>, pb: &ProgressBar) -> String {
let (status, resp_body) = api.post_body("/files", "application/octet-stream", reader, size);
pb.finish_and_clear();

if !status.is_success() {
use crossterm::style::Stylize;
eprintln!("{}", crate::util::api_error(resp_body).red());
std::process::exit(1);
}

let body: serde_json::Value = match serde_json::from_str(&resp_body) {
Ok(v) => v,
Err(e) => {
eprintln!("error parsing upload response: {e}");
std::process::exit(1);
}
};
match body["id"].as_str() {
Some(id) => id.to_string(),
None => {
eprintln!("error: upload response missing id");
std::process::exit(1);
}
}
}

fn upload_parquet_file(api: &ApiClient, path: &str) -> String {
if !is_parquet_path(path) {
eprintln!(
Expand Down Expand Up @@ -205,35 +231,54 @@ fn upload_parquet_file(api: &ApiClient, path: &str) -> String {
.progress_chars("=>-"),
);
let reader = pb.wrap_read(f);
finish_upload(api, reader, Some(file_size), &pb)
}

let (status, resp_body) = api.post_body(
"/files",
"application/octet-stream",
reader,
Some(file_size),
);
pb.finish_and_clear();

if !status.is_success() {
use crossterm::style::Stylize;
eprintln!("{}", crate::util::api_error(resp_body).red());
fn upload_parquet_url(api: &ApiClient, url: &str) -> String {
if !is_parquet_path(url) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is_parquet_path checks ends_with(".parquet") against the raw URL, so it rejects URLs with query strings or fragments — e.g. S3/GCS presigned URLs like https://bucket.s3.amazonaws.com/file.parquet?X-Amz-Signature=... would be turned away even though they point at a valid parquet file. Consider parsing the URL and validating only the path component (or stripping ?/# before the extension check) so signed URLs work. (not blocking)

eprintln!(
"error: managed table loads require a parquet URL ending in .parquet (got '{url}')."
);
std::process::exit(1);
}

let body: serde_json::Value = match serde_json::from_str(&resp_body) {
Ok(v) => v,
let resp = match reqwest::blocking::get(url) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: reqwest::blocking::get uses a default client with no timeout, so a remote that accepts the TCP connection but never responds will hang the CLI indefinitely (only Ctrl-C will get out). Consider building a reqwest::blocking::Client with connect_timeout / timeout and using it here. (not blocking)

Ok(r) => r,
Err(e) => {
eprintln!("error parsing upload response: {e}");
eprintln!("error fetching '{url}': {e}");
std::process::exit(1);
}
};
match body["id"].as_str() {
Some(id) => id.to_string(),

if !resp.status().is_success() {
eprintln!("error: remote server returned {} for '{url}'", resp.status());
std::process::exit(1);
}

let content_length = resp.content_length();
let pb = match content_length {
Some(len) => {
let pb = ProgressBar::new(len);
pb.set_style(
ProgressStyle::with_template(
"{spinner:.green} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})",
)
.unwrap()
.progress_chars("=>-"),
);
pb
}
None => {
eprintln!("error: upload response missing id");
std::process::exit(1);
let pb = ProgressBar::new_spinner();
pb.set_style(
ProgressStyle::with_template("{spinner:.green} {bytes} downloaded ({elapsed})")
.unwrap(),
);
pb
}
}
};
let reader = pb.wrap_read(resp);
finish_upload(api, reader, content_length, &pb)
}

fn collect_tables(api: &ApiClient, connection_id: &str, schema: Option<&str>) -> Vec<InfoTable> {
Expand Down Expand Up @@ -433,6 +478,7 @@ pub fn tables_load(
table: &str,
schema: Option<&str>,
file: Option<&str>,
url: Option<&str>,
upload_id: Option<&str>,
) {
use crossterm::style::Stylize;
Expand All @@ -441,15 +487,16 @@ pub fn tables_load(
let db = resolve_database(&api, database);
let schema = schema_name(schema);

// clap rejects `--file` and `--upload-id` together; the `(Some, Some)` arm is unreachable.
let upload_id = match (upload_id, file) {
(Some(id), None) => id.to_string(),
(None, Some(path)) => upload_parquet_file(&api, path),
(None, None) => {
eprintln!("error: --file <path> or --upload-id <id> is required");
// clap enforces mutual exclusion; only one of these is ever Some.
let upload_id = match (upload_id, file, url) {
(Some(id), None, None) => id.to_string(),
(None, Some(path), None) => upload_parquet_file(&api, path),
(None, None, Some(u)) => upload_parquet_url(&api, u),
(None, None, None) => {
eprintln!("error: --file <path>, --url <url>, or --upload-id <id> is required");
std::process::exit(1);
}
(Some(_), Some(_)) => unreachable!(),
_ => unreachable!(),
};

let path = managed_table_load_path(&db.id, schema, table);
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,13 +412,15 @@ fn main() {
table,
schema,
file,
url,
upload_id,
} => databases::tables_load(
&workspace_id,
&database,
&table,
Some(schema.as_str()),
file.as_deref(),
url.as_deref(),
upload_id.as_deref(),
),
DatabaseTablesCommands::Delete {
Expand Down
Loading