Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 0 additions & 32 deletions src/query/service/src/servers/http/clickhouse_federated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,51 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_expression::types::StringType;
use common_expression::utils::FromData;
use common_expression::DataBlock;
use common_expression::TableDataType;
use common_expression::TableField;
use common_expression::TableSchemaRef;
use common_expression::TableSchemaRefExt;
use ctor::ctor;
use regex::Regex;

use crate::servers::federated_helper::FederatedHelper;
use crate::servers::http::CLICKHOUSE_VERSION;

pub struct ClickHouseFederated {}

#[ctor]
static FORMAT_REGEX: Regex = Regex::new(r".*(?i)FORMAT\s*([[:alpha:]]*)\s*;?$").unwrap();

impl ClickHouseFederated {
// Build block for select function.
// Format:
// |function_name()|
// |value|
fn select_function_block(name: &str, value: &str) -> Option<(TableSchemaRef, DataBlock)> {
let schema = TableSchemaRefExt::create(vec![TableField::new(name, TableDataType::String)]);
let block = DataBlock::new_from_columns(vec![StringType::from_data(vec![
value.as_bytes().to_vec(),
])]);
Some((schema, block))
}

pub fn get_format(query: &str) -> Option<String> {
match FORMAT_REGEX.captures(query) {
Some(x) => x.get(1).map(|s| s.as_str().to_owned()),
None => None,
}
}

pub fn check(query: &str) -> Option<(TableSchemaRef, DataBlock)> {
#[ctor]
static SELECT_VERSION_RULES: Vec<(Regex, Option<(TableSchemaRef, DataBlock)>)> = vec![(
Regex::new(r".*(?i)SELECT\s*VERSION\(\)\s*;?$").unwrap(),
ClickHouseFederated::select_function_block("version()", CLICKHOUSE_VERSION),
)];

FederatedHelper::block_match_rule(query, &SELECT_VERSION_RULES)
}
}
48 changes: 0 additions & 48 deletions src/query/service/src/servers/http/clickhouse_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ use common_exception::ErrorCode;
use common_exception::Result;
use common_exception::ToErrorCode;
use common_expression::infer_table_schema;
use common_expression::DataBlock;
use common_expression::TableSchemaRef;
use common_formats::ClickhouseFormatType;
use common_formats::FileFormatOptionsExt;
use common_formats::FileFormatTypeExt;
Expand Down Expand Up @@ -57,7 +55,6 @@ use tracing::info;
use crate::interpreters::InterpreterFactory;
use crate::interpreters::InterpreterPtr;
use crate::servers::http::v1::HttpQueryContext;
use crate::servers::http::ClickHouseFederated;
use crate::sessions::QueryContext;
use crate::sessions::SessionType;
use crate::sessions::TableContext;
Expand Down Expand Up @@ -241,18 +238,6 @@ pub async fn clickhouse_handler_get(

let default_format = get_default_format(&params, headers).map_err(BadRequest)?;
let sql = params.query();
if let Some((schema, block)) = ClickHouseFederated::check(&sql) {
return serialize_one_block(
context.clone(),
schema,
block,
&sql,
&params,
default_format,
)
.map_err(InternalServerError);
}

let mut planner = Planner::new(context.clone());
let (plan, _, fmt) = planner
.plan_sql(&sql)
Expand Down Expand Up @@ -312,11 +297,6 @@ pub async fn clickhouse_handler_post(
};
info!("receive clickhouse http post, (query + body) = {}", &msg);

if let Some((schema, block)) = ClickHouseFederated::check(&sql) {
return serialize_one_block(ctx.clone(), schema, block, &sql, &params, default_format)
.map_err(InternalServerError);
}

let mut planner = Planner::new(ctx.clone());
let (mut plan, _, fmt) = planner
.plan_sql(&sql)
Expand Down Expand Up @@ -484,34 +464,6 @@ fn compress_block(input: Vec<u8>) -> Result<Vec<u8>> {
}
}

fn serialize_one_block(
ctx: Arc<QueryContext>,
schema: TableSchemaRef,
block: DataBlock,
sql: &str,
params: &StatementHandlerParams,
default_format: ClickhouseFormatType,
) -> Result<WithContentType<Body>> {
let format = match ClickHouseFederated::get_format(sql) {
Some(format) => ClickhouseFormatType::parse_clickhouse_format(&format)?,
None => default_format,
};
let format_typ = format.typ.clone();
let mut output_format = FileFormatOptionsExt::get_output_format_from_clickhouse_format(
format,
schema,
&ctx.get_settings(),
)?;
let mut res = output_format.serialize_prefix()?;
let mut data = output_format.serialize_block(&block)?;
if params.compress() {
data = compress_block(data)?;
}
res.append(&mut data);
res.append(&mut output_format.finalize()?);
Ok(Body::from(res).with_content_type(format_typ.get_content_type()))
}

fn get_default_format(
params: &StatementHandlerParams,
headers: &HeaderMap,
Expand Down
7 changes: 0 additions & 7 deletions src/query/service/src/servers/mysql/mysql_federated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,6 @@ impl MySQLFederated {
fn federated_mixed_check(&self, query: &str) -> Option<(TableSchemaRef, DataBlock)> {
#[ctor]
static MIXED_RULES: Vec<(Regex, Option<(TableSchemaRef, DataBlock)>)> = vec![
(
Regex::new(r"(?i)^(SELECT VERSION\(\s*\))").unwrap(),
MySQLFederated::select_function_block(
"version()",
format!("{}-{}", MYSQL_VERSION, DATABEND_COMMIT_VERSION.clone()).as_str(),
),
),
// Txn.
(Regex::new("(?i)^(ROLLBACK(.*))").unwrap(), None),
(Regex::new("(?i)^(COMMIT(.*))").unwrap(), None),
Expand Down
14 changes: 13 additions & 1 deletion src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,16 @@ use crate::sessions::ProcessInfo;
use crate::sessions::QueryContextShared;
use crate::sessions::Session;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;
use crate::storages::Table;
const MYSQL_VERSION: &str = "8.0.26";
const CLICKHOUSE_VERSION: &str = "8.12.14";

#[derive(Clone)]
pub struct QueryContext {
version: String,
mysql_version: String,
clickhouse_version: String,
partition_queue: Arc<RwLock<VecDeque<PartInfoPtr>>>,
shared: Arc<QueryContextShared>,
fragment_id: Arc<AtomicUsize>,
Expand All @@ -90,6 +95,8 @@ impl QueryContext {
Arc::new(QueryContext {
partition_queue: Arc::new(RwLock::new(VecDeque::new())),
version: format!("DatabendQuery {}", *DATABEND_COMMIT_VERSION),
mysql_version: format!("{}-{}", MYSQL_VERSION, *DATABEND_COMMIT_VERSION),
clickhouse_version: CLICKHOUSE_VERSION.to_string(),
shared,
fragment_id: Arc::new(AtomicUsize::new(0)),
})
Expand Down Expand Up @@ -354,7 +361,12 @@ impl TableContext for QueryContext {
}

fn get_fuse_version(&self) -> String {
self.version.clone()
let session = self.get_current_session();
match session.get_type() {
SessionType::ClickHouseHttpHandler => self.clickhouse_version.clone(),
SessionType::MySQL => self.mysql_version.clone(),
_ => self.version.clone(),
}
}

fn get_format_settings(&self) -> Result<FormatSettings> {
Expand Down
15 changes: 0 additions & 15 deletions src/query/service/tests/it/servers/mysql/mysql_federated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,6 @@ fn test_mysql_federated() -> Result<()> {
assert!(result.is_none());
}

// select version()
{
let query = "select version()";
let result = federated.check(query);
assert!(result.is_some());

if let Some(block) = result {
assert!(!block.1.is_empty())
}

let query = "select versiona";
let result = federated.check(query);
assert!(result.is_none());
}

// variables
{
let query = "select @@tx_isolation, @@session.tx_isolation";
Expand Down
7 changes: 7 additions & 0 deletions tests/sqllogictests/suites/query/case_sensitivity/query.test
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,10 @@ query I
select A.* from (WITH source2 AS (select 1 as e) SELECT * FROM source2) A
----
1

onlyif mysql
query T
select substr(version(), 1, 6);
----
8.0.26