Skip to content

Commit

Permalink
feat: sqllogicaltest can accept shard and replication as argument (#2137
Browse files Browse the repository at this point in the history
)

* feat: sqllogicaltest can accept shard and replication as argument

* feat: support cluster and singleton
  • Loading branch information
h4ofanya committed May 29, 2024
1 parent 663711e commit ee7cbfd
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 25 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion query_server/sqllogicaltests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ nom = { workspace = true }
humantime = { workspace = true }
reqwest = { workspace = true }
url = { workspace = true }

clap = {workspace = true, features = ["derive"]}
regex = {workspace = true}
[features]
default = []
backtrace = ["async-backtrace"]
6 changes: 4 additions & 2 deletions query_server/sqllogicaltests/src/db_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use nom::IResult;

use crate::error::SqlError;
use crate::instance::{
run_lp_write, run_open_tsdb_json_write, run_open_tsdb_write, run_query, SqlClientOptions,
run_lp_write, run_open_tsdb_json_write, run_open_tsdb_write, run_query, CreateOptions,
SqlClientOptions,
};

type Result<T, E = SqlError> = std::result::Result<T, E>;
Expand Down Expand Up @@ -118,12 +119,13 @@ impl DBRequest {
pub async fn execute(
&self,
options: &SqlClientOptions,
create_option: &CreateOptions,
path: &Path,
) -> Result<(Schema, Vec<RecordBatch>)> {
match self {
DBRequest::Sql(sql) => {
println!("[{}] Execute Sql: \"{}\"", path.display(), sql);
Ok(run_query(options, sql).await?)
Ok(run_query(options, create_option, sql).await?)
}
DBRequest::LineProtocol(lp) => {
println!("[{}] Execute LineProtocol: \"{}\"", path.display(), lp);
Expand Down
52 changes: 49 additions & 3 deletions query_server/sqllogicaltests/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use arrow_flight::sql::client::FlightSqlServiceClient;
use arrow_flight::utils::flight_data_to_batches;
use async_trait::async_trait;
use futures::TryStreamExt;
use regex::Regex;
use reqwest::{Client, Method, Request, Url};
use sqllogictest::{ColumnType, DBOutput};
use tonic::transport::{Channel, Endpoint};
Expand All @@ -20,18 +21,24 @@ use crate::error::{Result, SqlError};
use crate::utils::normalize;

pub struct CnosDBClient {
engine_name: String,
relative_path: PathBuf,
options: SqlClientOptions,
create_options: CreateOptions,
}

impl CnosDBClient {
pub fn new(
engine_name: impl Into<String>,
relative_path: impl Into<PathBuf>,
options: SqlClientOptions,
create_options: CreateOptions,
) -> Result<Self, SqlError> {
Ok(Self {
engine_name: engine_name.into(),
relative_path: relative_path.into(),
options,
create_options,
})
}
}
Expand All @@ -48,7 +55,9 @@ impl sqllogictest::AsyncDB for CnosDBClient {
) -> std::result::Result<DBOutput<Self::ColumnType>, Self::Error> {
let request = DBRequest::parse_db_request(request, &mut self.options)?;

let (schema, batches) = request.execute(&self.options, &self.relative_path).await?;
let (schema, batches) = request
.execute(&self.options, &self.create_options, &self.relative_path)
.await?;
let types = normalize::convert_schema_to_types(schema.fields());
let rows = normalize::convert_batches(batches)?;

Expand All @@ -60,7 +69,7 @@ impl sqllogictest::AsyncDB for CnosDBClient {
}

fn engine_name(&self) -> &str {
"CnosDB"
&self.engine_name
}

async fn sleep(dur: Duration) {
Expand Down Expand Up @@ -163,6 +172,7 @@ fn build_http_write_request(option: &SqlClientOptions, url: Url, body: &str) ->

pub async fn run_query(
options: &SqlClientOptions,
create_option: &CreateOptions,
sql: impl Into<String>,
) -> Result<(Schema, Vec<RecordBatch>)> {
let SqlClientOptions {
Expand All @@ -187,7 +197,37 @@ pub async fn run_query(
let _ = client.handshake(username, password).await?;

// 2. execute query, get result metadata
let mut stmt = client.prepare(sql.into(), None).await?;
let mut sql = Into::<String>::into(sql);
let re = Regex::new(r"create\s+database").unwrap();
if re.is_match(&sql) {
let with = sql.to_ascii_lowercase().find("with");
if with.is_some() {
let with = with.unwrap();
if !sql.contains("shard") {
sql.insert_str(
with + 4,
format!(" shard {}", create_option.shard_num).as_str(),
);
}

if !sql.contains("REPLICA") {
sql.insert_str(
with + 4,
format!(" REPLICA {}", create_option.replication_num).as_str(),
);
}
} else {
sql.insert_str(
sql.len() - 1,
format!(
" with shard {} replica {}",
create_option.shard_num, create_option.replication_num
)
.as_str(),
);
}
}
let mut stmt = client.prepare(sql, None).await?;
let flight_info = stmt.execute().await?;

let mut batches = vec![];
Expand Down Expand Up @@ -301,6 +341,12 @@ impl ColumnType for CnosDBColumnType {
}
}

#[derive(Debug, Clone)]
pub struct CreateOptions {
pub replication_num: u32,
pub shard_num: u32,
}

#[derive(Debug, Clone)]
pub struct SqlClientOptions {
pub flight_host: String,
Expand Down
78 changes: 62 additions & 16 deletions query_server/sqllogicaltests/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::error::Error;
use std::path::{Path, PathBuf};
use std::process::exit;

use crate::instance::SqlClientOptions;
use clap::Parser;

use crate::instance::{CreateOptions, SqlClientOptions};

mod db_request;
mod error;
Expand Down Expand Up @@ -31,6 +34,11 @@ const CNOSDB_TARGET_PARTITIONS_DEFAULT: usize = 8;
pub async fn main() -> Result<(), Box<dyn Error>> {
let options = Options::new();

if options.mode != "singleton" && options.mode != "cluster" {
eprintln!("Unsupported mode: {}", options.mode);
exit(1);
}

let db_options = SqlClientOptions {
flight_host: options.flight_host.clone(),
flight_port: options.flight_port,
Expand All @@ -46,6 +54,11 @@ pub async fn main() -> Result<(), Box<dyn Error>> {
chunked: None,
};

let create_options = CreateOptions {
shard_num: options.shard_count,
replication_num: options.replica_count,
};

println!("{options:?}");
println!("{db_options:?}");

Expand All @@ -63,9 +76,23 @@ pub async fn main() -> Result<(), Box<dyn Error>> {
}
}
if options.complete_mode {
os::run_complete_file(&path, relative_path, db_options.clone()).await?;
os::run_complete_file(
options.mode.clone(),
&path,
relative_path,
db_options.clone(),
create_options.clone(),
)
.await?;
} else {
os::run_test_file(&path, relative_path, db_options.clone()).await?;
os::run_test_file(
options.mode.clone(),
&path,
relative_path,
db_options.clone(),
create_options.clone(),
)
.await?;
}
}

Expand Down Expand Up @@ -113,17 +140,42 @@ struct Options {
/// Auto complete mode to fill out expected results
complete_mode: bool,

/// Number of shards
shard_count: u32,

/// Number of replication sets
replica_count: u32,

mode: String,

flight_host: String,
flight_port: u16,
http_host: String,
http_port: u16,
}

#[derive(Debug, Parser)]
struct CliOptions {
#[arg()]
filters: Vec<String>,

#[arg(long = "shard", default_value = "1")]
shard_count: u32,

#[arg(long = "replica", default_value = "1")]
replica_count: u32,

#[arg(long = "complete", default_value = "false")]
complete_mode: bool,

#[arg(short = 'M', long = "mode", default_value = "singleton")]
mode: String,
}

impl Options {
fn new() -> Self {
let args: Vec<_> = std::env::args().collect();
let args = CliOptions::parse();

let complete_mode = args.iter().any(|a| a == "--complete");
let flight_host =
std::env::var(CNOSDB_FLIGHT_HOST_ENV).unwrap_or(CNOSDB_FLIGHT_HOST_DEFAULT.into());
let flight_port = std::env::var(CNOSDB_FLIGHT_PORT_ENV)
Expand All @@ -138,19 +190,13 @@ impl Options {
});

// treat args after the first as filters to run (substring matching)
let filters = if !args.is_empty() {
args.into_iter()
.skip(1)
// ignore command line arguments like `--complete`
.filter(|arg| !arg.as_str().starts_with("--"))
.collect::<Vec<_>>()
} else {
vec![]
};

Self {
filters,
complete_mode,
filters: args.filters,
complete_mode: args.complete_mode,
shard_count: args.shard_count,
replica_count: args.replica_count,
mode: args.mode,
flight_host,
flight_port,
http_host,
Expand Down
20 changes: 17 additions & 3 deletions query_server/sqllogicaltests/src/os/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,45 @@ use std::path::{Path, PathBuf};
use sqllogictest::{default_column_validator, default_validator};
use trace::info;

use crate::instance::{CnosDBClient, SqlClientOptions};
use crate::instance::{CnosDBClient, CreateOptions, SqlClientOptions};

pub async fn run_test_file(
engine_name: String,
path: &Path,
relative_path: PathBuf,
options: SqlClientOptions,
createoptions: CreateOptions,
) -> Result<(), Box<dyn Error>> {
info!("Running with DataFusion runner: {}", path.display());
let mut runner = sqllogictest::Runner::new(|| async {
CnosDBClient::new(relative_path.clone(), options.clone())
CnosDBClient::new(
engine_name.clone(),
relative_path.clone(),
options.clone(),
createoptions.clone(),
)
});
runner.run_file_async(path).await?;
Ok(())
}

pub async fn run_complete_file(
engine_name: String,
path: &Path,
relative_path: PathBuf,
options: SqlClientOptions,
createoptions: CreateOptions,
) -> Result<(), Box<dyn Error>> {
info!("Using complete mode to complete: {}", path.display());

// let mut data = 3;
let mut runner = sqllogictest::Runner::new(|| async {
CnosDBClient::new(relative_path.clone(), options.clone())
CnosDBClient::new(
engine_name.clone(),
relative_path.clone(),
options.clone(),
createoptions.clone(),
)
});
let col_separator = " ";
let validator = default_validator;
Expand Down

0 comments on commit ee7cbfd

Please sign in to comment.