Skip to content

Commit

Permalink
feat: sqllogicaltest can accept shard and replication as argument
Browse files Browse the repository at this point in the history
  • Loading branch information
h4ofanya committed May 21, 2024
1 parent e39d7bc commit ec6147e
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 24 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion query_server/sqllogicaltests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ nom = { workspace = true }
humantime = { workspace = true }
reqwest = { workspace = true }
url = { workspace = true }

clap = {workspace = true, features = ["derive"]}
regex = {workspace = true}
[features]
default = []
backtrace = ["async-backtrace"]
6 changes: 4 additions & 2 deletions query_server/sqllogicaltests/src/db_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use nom::IResult;

use crate::error::SqlError;
use crate::instance::{
run_lp_write, run_open_tsdb_json_write, run_open_tsdb_write, run_query, SqlClientOptions,
run_lp_write, run_open_tsdb_json_write, run_open_tsdb_write, run_query, CreateOptions,
SqlClientOptions,
};

type Result<T, E = SqlError> = std::result::Result<T, E>;
Expand Down Expand Up @@ -118,12 +119,13 @@ impl DBRequest {
pub async fn execute(
&self,
options: &SqlClientOptions,
create_option: &CreateOptions,
path: &Path,
) -> Result<(Schema, Vec<RecordBatch>)> {
match self {
DBRequest::Sql(sql) => {
println!("[{}] Execute Sql: \"{}\"", path.display(), sql);
Ok(run_query(options, sql).await?)
Ok(run_query(options, create_option, sql).await?)
}
DBRequest::LineProtocol(lp) => {
println!("[{}] Execute LineProtocol: \"{}\"", path.display(), lp);
Expand Down
47 changes: 45 additions & 2 deletions query_server/sqllogicaltests/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use arrow_flight::sql::client::FlightSqlServiceClient;
use arrow_flight::utils::flight_data_to_batches;
use async_trait::async_trait;
use futures::TryStreamExt;
use regex::Regex;
use reqwest::{Client, Method, Request, Url};
use sqllogictest::{ColumnType, DBOutput};
use tonic::transport::{Channel, Endpoint};
Expand All @@ -22,16 +23,19 @@ use crate::utils::normalize;
pub struct CnosDBClient {
relative_path: PathBuf,
options: SqlClientOptions,
create_options: CreateOptions,
}

impl CnosDBClient {
pub fn new(
relative_path: impl Into<PathBuf>,
options: SqlClientOptions,
create_options: CreateOptions,
) -> Result<Self, SqlError> {
Ok(Self {
relative_path: relative_path.into(),
options,
create_options,
})
}
}
Expand All @@ -48,7 +52,9 @@ impl sqllogictest::AsyncDB for CnosDBClient {
) -> std::result::Result<DBOutput<Self::ColumnType>, Self::Error> {
let request = DBRequest::parse_db_request(request, &mut self.options)?;

let (schema, batches) = request.execute(&self.options, &self.relative_path).await?;
let (schema, batches) = request
.execute(&self.options, &self.create_options, &self.relative_path)
.await?;
let types = normalize::convert_schema_to_types(schema.fields());
let rows = normalize::convert_batches(batches)?;

Expand Down Expand Up @@ -163,6 +169,7 @@ fn build_http_write_request(option: &SqlClientOptions, url: Url, body: &str) ->

pub async fn run_query(
options: &SqlClientOptions,
create_option: &CreateOptions,
sql: impl Into<String>,
) -> Result<(Schema, Vec<RecordBatch>)> {
let SqlClientOptions {
Expand All @@ -187,7 +194,37 @@ pub async fn run_query(
let _ = client.handshake(username, password).await?;

// 2. execute query, get result metadata
let mut stmt = client.prepare(sql.into(), None).await?;
let mut sql = Into::<String>::into(sql);
let re = Regex::new(r"create\s+database").unwrap();
if re.is_match(&sql) {
let with = sql.to_ascii_lowercase().find("with");
if with.is_some() {
let with = with.unwrap();
if !sql.contains("shard") {
sql.insert_str(
with + 4,
format!(" shard {}", create_option.shard_num).as_str(),
);
}

if !sql.contains("REPLICA") {
sql.insert_str(
with + 4,
format!(" REPLICA {}", create_option.replication_num).as_str(),
);
}
} else {
sql.insert_str(
sql.len() - 1,
format!(
" with shard {} replica {}",
create_option.shard_num, create_option.replication_num
)
.as_str(),
);
}
}
let mut stmt = client.prepare(sql, None).await?;
let flight_info = stmt.execute().await?;

let mut batches = vec![];
Expand Down Expand Up @@ -301,6 +338,12 @@ impl ColumnType for CnosDBColumnType {
}
}

#[derive(Debug, Clone)]
pub struct CreateOptions {
pub replication_num: u32,
pub shard_num: u32,
}

#[derive(Debug, Clone)]
pub struct SqlClientOptions {
pub flight_host: String,
Expand Down
64 changes: 48 additions & 16 deletions query_server/sqllogicaltests/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::error::Error;
use std::path::{Path, PathBuf};

use crate::instance::SqlClientOptions;
use clap::Parser;

use crate::instance::{CreateOptions, SqlClientOptions};

mod db_request;
mod error;
Expand Down Expand Up @@ -46,6 +48,11 @@ pub async fn main() -> Result<(), Box<dyn Error>> {
chunked: None,
};

let create_options = CreateOptions {
shard_num: options.shard_count,
replication_num: options.replica_count,
};

println!("{options:?}");
println!("{db_options:?}");

Expand All @@ -63,9 +70,21 @@ pub async fn main() -> Result<(), Box<dyn Error>> {
}
}
if options.complete_mode {
os::run_complete_file(&path, relative_path, db_options.clone()).await?;
os::run_complete_file(
&path,
relative_path,
db_options.clone(),
create_options.clone(),
)
.await?;
} else {
os::run_test_file(&path, relative_path, db_options.clone()).await?;
os::run_test_file(
&path,
relative_path,
db_options.clone(),
create_options.clone(),
)
.await?;
}
}

Expand Down Expand Up @@ -113,17 +132,37 @@ struct Options {
/// Auto complete mode to fill out expected results
complete_mode: bool,

/// Number of shards
shard_count: u32,

/// Number of replication sets
replica_count: u32,

flight_host: String,
flight_port: u16,
http_host: String,
http_port: u16,
}

#[derive(Debug, Parser)]
struct CliOptions {
#[arg()]
filters: Vec<String>,

#[arg(long = "shard", default_value = "1")]
shard_count: u32,

#[arg(long = "replica", default_value = "1")]
replica_count: u32,

#[arg(long = "complete", default_value = "false")]
complete_mode: bool,
}

impl Options {
fn new() -> Self {
let args: Vec<_> = std::env::args().collect();
let args = CliOptions::parse();

let complete_mode = args.iter().any(|a| a == "--complete");
let flight_host =
std::env::var(CNOSDB_FLIGHT_HOST_ENV).unwrap_or(CNOSDB_FLIGHT_HOST_DEFAULT.into());
let flight_port = std::env::var(CNOSDB_FLIGHT_PORT_ENV)
Expand All @@ -138,19 +177,12 @@ impl Options {
});

// treat args after the first as filters to run (substring matching)
let filters = if !args.is_empty() {
args.into_iter()
.skip(1)
// ignore command line arguments like `--complete`
.filter(|arg| !arg.as_str().starts_with("--"))
.collect::<Vec<_>>()
} else {
vec![]
};

Self {
filters,
complete_mode,
filters: args.filters,
complete_mode: args.complete_mode,
shard_count: args.shard_count,
replica_count: args.replica_count,
flight_host,
flight_port,
http_host,
Expand Down
16 changes: 13 additions & 3 deletions query_server/sqllogicaltests/src/os/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@ use std::path::{Path, PathBuf};
use sqllogictest::{default_column_validator, default_validator};
use trace::info;

use crate::instance::{CnosDBClient, SqlClientOptions};
use crate::instance::{CnosDBClient, CreateOptions, SqlClientOptions};

pub async fn run_test_file(
path: &Path,
relative_path: PathBuf,
options: SqlClientOptions,
createoptions: CreateOptions,
) -> Result<(), Box<dyn Error>> {
info!("Running with DataFusion runner: {}", path.display());
let mut runner = sqllogictest::Runner::new(|| async {
CnosDBClient::new(relative_path.clone(), options.clone())
CnosDBClient::new(
relative_path.clone(),
options.clone(),
createoptions.clone(),
)
});
runner.run_file_async(path).await?;
Ok(())
Expand All @@ -23,12 +28,17 @@ pub async fn run_complete_file(
path: &Path,
relative_path: PathBuf,
options: SqlClientOptions,
createoptions: CreateOptions,
) -> Result<(), Box<dyn Error>> {
info!("Using complete mode to complete: {}", path.display());

// let mut data = 3;
let mut runner = sqllogictest::Runner::new(|| async {
CnosDBClient::new(relative_path.clone(), options.clone())
CnosDBClient::new(
relative_path.clone(),
options.clone(),
createoptions.clone(),
)
});
let col_separator = " ";
let validator = default_validator;
Expand Down

0 comments on commit ec6147e

Please sign in to comment.