Skip to content

Commit

Permalink
feat: support metadata api for flight sql
Browse files Browse the repository at this point in the history
  • Loading branch information
yukkit authored and roseboy-liu committed May 16, 2023
1 parent 5717a7c commit 03599af
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 34 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions e2e_test/Cargo.toml
Expand Up @@ -16,3 +16,6 @@ flatbuffers = { workspace = true }
tonic = { workspace = true, features = ["tls", "transport"] }
tokio = { workspace = true, features = ["full"] }
tokio-stream = { workspace = true }
datafusion = { workspace = true }
arrow-flight = { workspace = true, features = ["flight-sql-experimental"] }
futures = { workspace = true, default-features = false, features = ["alloc"] }
144 changes: 144 additions & 0 deletions e2e_test/src/flight_sql.rs
@@ -0,0 +1,144 @@
#[cfg(test)]
mod test {
use std::time::Duration;

use arrow_flight::sql::client::FlightSqlServiceClient;
use arrow_flight::sql::{CommandGetDbSchemas, CommandGetTables};
use arrow_flight::utils::flight_data_to_batches;
use arrow_flight::FlightInfo;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty;
use datafusion::{arrow, assert_batches_eq};
use futures::TryStreamExt;
use tonic::transport::{Channel, Endpoint};

async fn flight_channel(host: &str, port: u16) -> Result<Channel, ArrowError> {
let endpoint = Endpoint::new(format!("http://{}:{}", host, port))
.map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
.connect_timeout(Duration::from_secs(20))
.timeout(Duration::from_secs(20))
.tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
.tcp_keepalive(Option::Some(Duration::from_secs(3600)))
.http2_keep_alive_interval(Duration::from_secs(300))
.keep_alive_timeout(Duration::from_secs(20))
.keep_alive_while_idle(true);

let channel = endpoint
.connect()
.await
.map_err(|e| ArrowError::IoError(format!("Cannot connect to endpoint: {e}")))?;

Ok(channel)
}

async fn fetch_result_and_print(
flight_info: FlightInfo,
client: &mut FlightSqlServiceClient,
) -> Vec<RecordBatch> {
let mut batches = vec![];
for ep in &flight_info.endpoint {
if let Some(tkt) = &ep.ticket {
let stream = client.do_get(tkt.clone()).await.unwrap();
let flight_data = stream.try_collect::<Vec<_>>().await.unwrap();
batches.extend(flight_data_to_batches(&flight_data).unwrap());
};
}

batches
}

async fn authed_client() -> FlightSqlServiceClient {
let channel = flight_channel("localhost", 8904).await.unwrap();
let mut client = FlightSqlServiceClient::new(channel);

// 1. handshake, basic authentication
let _ = client.handshake("root", "").await.unwrap();

client
}

#[tokio::test]
async fn test_sql_client_get_catalogs() {
let mut client = authed_client().await;

let flight_info = client.get_catalogs().await.unwrap();

let actual = fetch_result_and_print(flight_info, &mut client).await;

let actual_str = pretty::pretty_format_batches(&actual).unwrap();

assert!(format!("{actual_str}").contains("cnosdb"));
}

#[tokio::test]
async fn test_sql_client_get_db_schemas() {
let mut client = authed_client().await;

let flight_info = client
.get_db_schemas(CommandGetDbSchemas {
catalog: None,
db_schema_filter_pattern: Some("usage_%".to_string()),
})
.await
.unwrap();

let expected = vec![
"+--------------+---------------+",
"| table_schem | table_catalog |",
"+--------------+---------------+",
"| usage_schema | cnosdb |",
"+--------------+---------------+",
];
let actual = fetch_result_and_print(flight_info, &mut client).await;

assert_batches_eq!(expected, &actual);
}

#[tokio::test]
async fn test_sql_client_get_tables() {
let mut client = authed_client().await;

let flight_info = client
.get_tables(CommandGetTables {
catalog: None,
db_schema_filter_pattern: Some("usage_schema".to_string()),
table_name_filter_pattern: Some("coord_%_in".to_string()),
table_types: vec!["TABLE".to_string(), "VIEW".to_string()],
include_schema: false,
})
.await
.unwrap();

let expected = vec![
"+-----------+--------------+---------------+------------+",
"| table_cat | table_schem | table_name | table_type |",
"+-----------+--------------+---------------+------------+",
"| cnosdb | usage_schema | coord_data_in | TABLE |",
"+-----------+--------------+---------------+------------+",
];
let actual = fetch_result_and_print(flight_info, &mut client).await;

assert_batches_eq!(expected, &actual);
}

#[tokio::test]
async fn test_sql_client_get_table_types() {
let mut client = authed_client().await;

let flight_info = client.get_table_types().await.unwrap();

let expected = vec![
"+-----------------+",
"| table_type |",
"+-----------------+",
"| TABLE |",
"| VIEW |",
"| LOCAL TEMPORARY |",
"+-----------------+",
];
let actual = fetch_result_and_print(flight_info, &mut client).await;

assert_batches_eq!(expected, &actual);
}
}
1 change: 1 addition & 0 deletions e2e_test/src/lib.rs
@@ -1,2 +1,3 @@
mod flight_sql;
mod http_api_tests;
mod kv_service_tests;
119 changes: 95 additions & 24 deletions main/src/flight_sql/flight_sql_server.rs
Expand Up @@ -67,7 +67,7 @@ where
{
async fn precess_statement_query_req(
&self,
sql: String,
sql: impl Into<String>,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
// auth request
Expand All @@ -78,7 +78,7 @@ where
let ctx = self.construct_context(user, request.metadata())?;

// build query state machine
let query_state_machine = self.build_query_state_machine(sql, ctx).await?;
let query_state_machine = self.build_query_state_machine(sql.into(), ctx).await?;

// build logical plan
let logical_plan = self.build_logical_plan(query_state_machine.clone()).await?;
Expand Down Expand Up @@ -163,10 +163,10 @@ where

async fn build_query_state_machine(
&self,
sql: String,
sql: impl Into<String>,
ctx: Context,
) -> Result<QueryStateMachineRef, Status> {
let query = Query::new(ctx, sql);
let query = Query::new(ctx, sql.into());
let query_state_machine = self
.instance
.build_query_state_machine(query)
Expand Down Expand Up @@ -399,8 +399,6 @@ where
Ok(Response::new(flight_info))
}

/// TODO support
/// wait for <https://github.com/cnosdb/cnosdb/issues/642>
async fn get_flight_info_catalogs(
&self,
query: CommandGetCatalogs,
Expand All @@ -411,13 +409,18 @@ where
query, request
);

Err(Status::unimplemented(
"get_flight_info_catalogs not implemented",
))
self.precess_statement_query_req(
"SELECT
TENANT_NAME AS TABLE_CAT
FROM
CLUSTER_SCHEMA.TENANTS
ORDER BY
TABLE_CAT",
request,
)
.await
}

/// TODO support
/// wait for <https://github.com/cnosdb/cnosdb/issues/642>
async fn get_flight_info_schemas(
&self,
query: CommandGetDbSchemas,
Expand All @@ -428,13 +431,38 @@ where
query, request
);

Err(Status::unimplemented(
"get_flight_info_schemas not implemented",
))
let CommandGetDbSchemas {
catalog,
db_schema_filter_pattern,
} = query;

let mut filters = vec![];
let _ = catalog.map(|e| filters.push(format!("TABLE_CATALOG = '{}'", e)));
let _ =
db_schema_filter_pattern.map(|e| filters.push(format!("DATABASE_NAME LIKE '{e}'",)));

let filter = if filters.is_empty() {
"".to_string()
} else {
format!("WHERE {}", filters.join(" AND "))
};

self.precess_statement_query_req(
format!(
"SELECT
DATABASE_NAME AS TABLE_SCHEM,
TENANT_NAME AS TABLE_CATALOG
FROM
INFORMATION_SCHEMA.DATABASES
{filter}
ORDER BY
TABLE_CATALOG, TABLE_SCHEM"
),
request,
)
.await
}

/// TODO support
/// wait for <https://github.com/cnosdb/cnosdb/issues/642>
async fn get_flight_info_tables(
&self,
query: CommandGetTables,
Expand All @@ -445,13 +473,52 @@ where
query, request
);

Err(Status::unimplemented(
"get_flight_info_tables not implemented",
))
let CommandGetTables {
catalog,
db_schema_filter_pattern,
table_name_filter_pattern,
table_types,
include_schema: _,
} = query;

let mut filters = vec![];
let _ = catalog.map(|e| filters.push(format!("TABLE_CATALOG = '{}'", e)));
let _ =
db_schema_filter_pattern.map(|e| filters.push(format!("TABLE_DATABASE LIKE '{e}'")));
let _ = table_name_filter_pattern.map(|e| filters.push(format!("TABLE_NAME LIKE '{e}'")));
if !table_types.is_empty() {
let table_types = table_types
.iter()
.map(|e| format!("'{}'", e))
.collect::<Vec<_>>()
.join(",");
filters.push(format!("TABLE_TYPE IN ({})", table_types));
}

let filter = if filters.is_empty() {
"".to_string()
} else {
format!("WHERE {}", filters.join(" AND "))
};

let sql = format!(
"SELECT
TABLE_TENANT as TABLE_CAT,
TABLE_DATABASE as TABLE_SCHEM,
TABLE_NAME,
TABLE_TYPE
FROM
INFORMATION_SCHEMA.TABLES
{filter}
ORDER BY
TABLE_TYPE, TABLE_CAT, TABLE_SCHEM, TABLE_NAME"
);

trace::warn!("CommandGetTables:\n{sql}");

self.precess_statement_query_req(sql, request).await
}

/// TODO support
/// wait for <https://github.com/cnosdb/cnosdb/issues/642>
async fn get_flight_info_table_types(
&self,
query: CommandGetTableTypes,
Expand All @@ -462,9 +529,13 @@ where
query, request
);

Err(Status::unimplemented(
"get_flight_info_table_types not implemented",
))
self.precess_statement_query_req(
"SELECT TABLE_TYPE
FROM
(VALUES('TABLE'),('VIEW'),('LOCAL TEMPORARY')) t(TABLE_TYPE)",
request,
)
.await
}

/// not support
Expand Down
Expand Up @@ -58,7 +58,7 @@ impl InformationSchemaTablesBuilder {
self.database_names.append_value(database_name.as_ref());
self.table_names.append_value(table_name.as_ref());
self.table_types.append_value(match table_type {
TableType::Base => "BASE TABLE",
TableType::Base => "TABLE",
TableType::View => "VIEW",
TableType::Temporary => "LOCAL TEMPORARY",
});
Expand Down
4 changes: 2 additions & 2 deletions query_server/sqllogicaltests/src/instance.rs
Expand Up @@ -78,7 +78,7 @@ async fn run_query(
target_partitions,
} = options;

let channel = fligjt_channel(host, *port).await?;
let channel = flight_channel(host, *port).await?;

let mut client = FlightSqlServiceClient::new(channel);
client.set_header("TENANT", tenant);
Expand Down Expand Up @@ -108,7 +108,7 @@ async fn run_query(
Ok((schema, batches))
}

async fn fligjt_channel(host: &str, port: u16) -> Result<Channel> {
async fn flight_channel(host: &str, port: u16) -> Result<Channel> {
let endpoint = Endpoint::new(format!("http://{}:{}", host, port))
.map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
.connect_timeout(Duration::from_secs(20))
Expand Down
2 changes: 1 addition & 1 deletion query_server/test/cases/ddl/create_stream_table.result
Expand Up @@ -18,4 +18,4 @@
-- EXECUTE SQL: select * from information_schema.tables where table_database = 'createstreamtable' order by table_name; --
200 OK
table_tenant,table_database,table_name,table_type,table_engine,table_options
cnosdb,createstreamtable,test0,BASE TABLE,TSKV,TODO
cnosdb,createstreamtable,test0,TABLE,TSKV,TODO
Expand Up @@ -36,9 +36,9 @@
-- EXECUTE SQL: select * from information_schema.tables where table_database = 'explain_stream_query' order by table_name; --
200 OK
table_tenant,table_database,table_name,table_type,table_engine,table_options
cnosdb,explain_stream_query,test0,BASE TABLE,TSKV,TODO
cnosdb,explain_stream_query,tskvtable,BASE TABLE,STREAM,TODO
cnosdb,explain_stream_query,tskvtablewithoutschema,BASE TABLE,STREAM,TODO
cnosdb,explain_stream_query,test0,TABLE,TSKV,TODO
cnosdb,explain_stream_query,tskvtable,TABLE,STREAM,TODO
cnosdb,explain_stream_query,tskvtablewithoutschema,TABLE,STREAM,TODO

-- EXECUTE SQL: explain select * from TskvTable; --
200 OK
Expand Down

0 comments on commit 03599af

Please sign in to comment.