Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Patch] dump ddl #1782

Merged
merged 11 commits into from Nov 21, 2023
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions client/src/command.rs
Expand Up @@ -31,6 +31,7 @@ pub enum Command {
QuietMode(Option<bool>),
OutputFormat(Option<String>),
WriteLineProtocol(String),
ChangeTenant(String),
}

pub enum OutputFormat {
Expand Down Expand Up @@ -96,6 +97,10 @@ impl Command {
let result_set = ResultSet::Bytes((vec![], 0));
print_options.print_batches(&result_set, now)
}
Self::ChangeTenant(tenant) => {
ctx.set_tenant(tenant.to_owned());
Ok(())
}
}
}

Expand All @@ -112,6 +117,7 @@ impl Command {
Self::QuietMode(_) => ("\\quiet (true|false)?", "print or set quiet mode"),
Self::OutputFormat(_) => ("\\pset [NAME [VALUE]]", "set table output option\n(format)"),
Self::WriteLineProtocol(_) => ("\\w path", "line protocol"),
Self::ChangeTenant(_) => ("\\change_tenant <TenantName>", "change tenant."),
}
}
}
Expand Down
56 changes: 55 additions & 1 deletion client/src/ctx.rs
Expand Up @@ -7,7 +7,7 @@ use fly_accept_encoding::Encoding;
use http_protocol::encoding::EncodingExt;
use http_protocol::header::{ACCEPT, PRIVATE_KEY};
use http_protocol::http_client::HttpClient;
use http_protocol::parameter::{SqlParam, WriteParam};
use http_protocol::parameter::{DumpParam, SqlParam, WriteParam};
use http_protocol::status_code::OK;
use reqwest::header::{ACCEPT_ENCODING, CONTENT_ENCODING};

Expand All @@ -22,9 +22,12 @@ pub const DEFAULT_PRECISION: &str = "NS";
pub const DEFAULT_USE_SSL: bool = false;
pub const DEFAULT_USE_UNSAFE_SSL: bool = false;
pub const DEFAULT_CHUNKED: bool = false;
pub const DEFAULT_PROCESS_CLI_COMMAND: bool = false;
pub const DEFAULT_ERROR_STOP: bool = false;

pub const API_V1_SQL_PATH: &str = "/api/v1/sql";
pub const API_V1_WRITE_PATH: &str = "/api/v1/write";
pub const API_V1_DUMP_SQL_DDL_PATH: &str = "/api/v1/dump/sql/ddl";

pub struct SessionConfig {
pub user_info: UserInfo,
Expand All @@ -41,6 +44,8 @@ pub struct SessionConfig {
pub use_ssl: bool,
pub use_unsafe_ssl: bool,
pub chunked: bool,
pub process_cli_command: bool,
pub error_stop: bool,
}

impl SessionConfig {
Expand All @@ -63,6 +68,8 @@ impl SessionConfig {
use_ssl: DEFAULT_USE_SSL,
use_unsafe_ssl: DEFAULT_USE_UNSAFE_SSL,
chunked: DEFAULT_CHUNKED,
process_cli_command: DEFAULT_PROCESS_CLI_COMMAND,
error_stop: DEFAULT_ERROR_STOP,
}
}

Expand Down Expand Up @@ -164,6 +171,18 @@ impl SessionConfig {

self
}

pub fn with_process_cli_command(mut self, process_cli_command: bool) -> Self {
self.process_cli_command = process_cli_command;

self
}

pub fn with_error_stop(mut self, error_stop: bool) -> Self {
self.error_stop = error_stop;

self
}
}

pub struct UserInfo {
Expand Down Expand Up @@ -221,10 +240,22 @@ impl SessionContext {
self.session_config.database = name.to_string();
}

pub fn set_tenant(&mut self, tenant: String) {
self.session_config.tenant = tenant
}

pub fn get_database(&self) -> &str {
self.session_config.database.as_str()
}

pub fn get_session_config(&self) -> &SessionConfig {
&self.session_config
}

pub fn get_mut_session_config(&mut self) -> &mut SessionConfig {
&mut self.session_config
}

pub async fn sql(&self, sql: String) -> Result<ResultSet> {
let mut sql = sql.into_bytes();
let user_info = &self.session_config.user_info;
Expand Down Expand Up @@ -365,6 +396,29 @@ impl SessionContext {
}
Ok(())
}

pub async fn dump(&self, tenant: Option<String>) -> Result<String> {
let user_info = &self.session_config.user_info;

let param = DumpParam { tenant };

let mut builder = self
.http_client
.get(API_V1_DUMP_SQL_DDL_PATH)
.basic_auth::<&str, &str>(&user_info.user, user_info.password.as_deref());
builder = if let Some(key) = &user_info.private_key {
let key = base64::encode(key);
builder.header(PRIVATE_KEY, key)
} else {
builder
}
.query(&param);

let resp = builder.send().await?;

let res = resp.text().await?;
Ok(res)
}
}

pub enum ResultSet {
Expand Down
53 changes: 44 additions & 9 deletions client/src/exec.rs
Expand Up @@ -5,6 +5,7 @@ use std::io::prelude::*;
use std::io::BufReader;
use std::time::Instant;

use anyhow::bail;
use rustyline::error::ReadlineError;
use rustyline::Editor;

Expand All @@ -19,21 +20,45 @@ pub async fn exec_from_lines(
ctx: &mut SessionContext,
reader: &mut BufReader<File>,
print_options: &PrintOptions,
) {
) -> Result<()> {
let mut query = "".to_owned();

for line in reader.lines() {
match line {
Ok(line) if line.starts_with("--") => {
continue;
}
Ok(line) if line.trim().starts_with("\\change_tenant") => {
let tenant = line.trim().trim_start_matches("\\change_tenant").trim();
if ctx.get_session_config().process_cli_command {
ctx.set_tenant(tenant.to_string());
} else {
bail!("Can't process \"\\change_tenant {}\", please add arg --process_cli_command", tenant)
}
}
Ok(line) if line.starts_with("\\c") => {
let database = line.trim_start_matches("\\c").trim();
if ctx.get_session_config().process_cli_command {
ctx.set_database(database);
} else {
bail!(
"Can't process \"\\c {}\", please add arg --process_cli_command",
database
)
}
}
Ok(line) => {
let line = line.trim_end();
query.push_str(line);
if line.ends_with(';') {
match exec_and_print(ctx, print_options, query).await {
match exec_and_print(ctx, print_options, query.clone()).await {
Ok(_) => {}
Err(err) => println!("{:?}", err),
Err(err) => {
eprintln!("{:?}", err);
if ctx.get_session_config().error_stop {
bail!("{} execute fail, STOP!", query)
}
}
}
query = "".to_owned();
} else {
Expand All @@ -48,26 +73,36 @@ pub async fn exec_from_lines(

// run the left over query if the last statement doesn't contain ‘;’
if !query.is_empty() {
match exec_and_print(ctx, print_options, query).await {
match exec_and_print(ctx, print_options, query.clone()).await {
Ok(_) => {}
Err(err) => println!("{:?}", err),
Err(err) => {
eprintln!("{:?}", err);
if ctx.get_session_config().error_stop {
bail!("{} execute fail, STOP!", query)
}
}
}
}
Ok(())
}

pub async fn exec_from_files(
files: Vec<String>,
ctx: &mut SessionContext,
print_options: &PrintOptions,
) {
) -> Result<()> {
let files = files
.into_iter()
.map(|file_path| File::open(file_path).unwrap())
.map(|file_path| File::open(&file_path).map(|f| (file_path, f)))
.collect::<Vec<_>>();
for file in files {
let (path, file) = file?;
let mut reader = BufReader::new(file);
exec_from_lines(ctx, &mut reader, print_options).await;
if let Err(e) = exec_from_lines(ctx, &mut reader, print_options).await {
bail!("Execute file {} fail, Error: {}", path, e)
};
}
Ok(())
}

/// run and execute SQL statements and commands against a context with the given print options
Expand Down Expand Up @@ -113,7 +148,7 @@ pub async fn exec_from_repl(ctx: &mut SessionContext, print_options: &mut PrintO
Ok(line) if parse_use_database(&line).is_some() => {
if let Some(db) = parse_use_database(&line) {
if connect_database(&db, ctx).await.is_err() {
println!("Cannot use database {}.", db);
eprintln!("Cannot use database {}.", db);
}
}
}
Expand Down
72 changes: 62 additions & 10 deletions client/src/main.rs
Expand Up @@ -2,17 +2,20 @@ use std::path::{Path, PathBuf};
use std::{env, fs};

use clap::builder::PossibleValuesParser;
use clap::{value_parser, Parser};
use clap::{value_parser, Args, Parser, Subcommand};
use client::ctx::{SessionConfig, SessionContext};
use client::print_format::PrintFormat;
use client::print_options::PrintOptions;
use client::{exec, CNOSDB_CLI_VERSION};
use fly_accept_encoding::Encoding;
use http_protocol::encoding::EncodingExt;

#[derive(Debug, Parser, PartialEq)]
#[derive(Debug, Clone, Parser, PartialEq)]
#[command(author, version, about, long_about= None)]
struct Args {
struct CliArgs {
#[command(subcommand)]
subcommand: Option<CliCommand>,

/// Host of CnosDB server.
#[arg(
short = 'H', long,
Expand Down Expand Up @@ -122,16 +125,46 @@ struct Args {

#[arg(long, default_value = "false")]
chunked: bool,

#[arg(long, default_value = "false")]
error_stop: bool,

#[arg(long, default_value = "false")]
process_cli_command: bool,
}

#[derive(Debug, Clone, Subcommand, PartialOrd, PartialEq)]
enum CliCommand {
DumpDDL(DumpDDL),

RestoreDumpDDL(RestoreDumpDDL),
}

#[derive(Debug, Clone, Args, PartialOrd, PartialEq)]
struct DumpDDL {
#[arg(short, long)]
tenant: Option<String>,
}

#[derive(Debug, Clone, Args, PartialOrd, PartialEq)]
struct RestoreDumpDDL {
#[arg(short, long)]
tenant: Option<String>,

#[arg()]
files: Vec<String>,
}

#[tokio::main]
pub async fn main() -> Result<(), anyhow::Error> {
env_logger::init();
let args = Args::parse();
let args = CliArgs::parse();

if !args.quiet {
if !args.quiet && args.subcommand.is_none() {
println!("CnosDB CLI v{}", CNOSDB_CLI_VERSION);
println!("Input arguments: {:?}", args);
let mut new_args = args.clone();
new_args.password = args.password.as_ref().map(|_p| "*****".to_string());
println!("Input arguments: {:?}", new_args);
}

if let Some(ref path) = args.data_path {
Expand Down Expand Up @@ -161,7 +194,9 @@ pub async fn main() -> Result<(), anyhow::Error> {
.with_ssl(args.use_ssl)
.with_unsafe_ssl(args.use_unsafe_ssl)
.with_ca_certs(args.cacert)
.with_chunked(args.chunked);
.with_chunked(args.chunked)
.with_process_cli_command(args.process_cli_command)
.with_error_stop(args.error_stop);

let mut ctx = SessionContext::new(session_config);
if let Some(ref path) = args.write_line_protocol {
Expand All @@ -174,7 +209,24 @@ pub async fn main() -> Result<(), anyhow::Error> {
quiet: args.quiet,
};

let files = args.file;
match args.subcommand {
Some(CliCommand::DumpDDL(d)) => {
let res = ctx.dump(d.tenant).await?;
println!("{}", res);
return Ok(());
}
Some(CliCommand::RestoreDumpDDL(r)) => {
ctx.get_mut_session_config().process_cli_command = true;
if let Some(t) = r.tenant {
ctx.set_tenant(t)
}
let files = r.files;
return exec::exec_from_files(files, &mut ctx, &print_options).await;
}
None => {}
}

let files = args.file.clone();
let rc = match args.rc {
Some(file) => file,
None => {
Expand All @@ -190,10 +242,10 @@ pub async fn main() -> Result<(), anyhow::Error> {
}
};
if !files.is_empty() {
exec::exec_from_files(files, &mut ctx, &print_options).await
exec::exec_from_files(files, &mut ctx, &print_options).await?;
} else {
if !rc.is_empty() {
exec::exec_from_files(rc, &mut ctx, &print_options).await
exec::exec_from_files(rc, &mut ctx, &print_options).await?;
}
exec::exec_from_repl(&mut ctx, &mut print_options).await;
}
Expand Down