Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support metadata api for flight sql #1173

Merged
merged 1 commit into from May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions e2e_test/Cargo.toml
Expand Up @@ -16,3 +16,6 @@ flatbuffers = { workspace = true }
tonic = { workspace = true, features = ["tls", "transport"] }
tokio = { workspace = true, features = ["full"] }
tokio-stream = { workspace = true }
datafusion = { workspace = true }
arrow-flight = { workspace = true, features = ["flight-sql-experimental"] }
futures = { workspace = true, default-features = false, features = ["alloc"] }
144 changes: 144 additions & 0 deletions e2e_test/src/flight_sql.rs
@@ -0,0 +1,144 @@
#[cfg(test)]
mod test {
use std::time::Duration;

use arrow_flight::sql::client::FlightSqlServiceClient;
use arrow_flight::sql::{CommandGetDbSchemas, CommandGetTables};
use arrow_flight::utils::flight_data_to_batches;
use arrow_flight::FlightInfo;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty;
use datafusion::{arrow, assert_batches_eq};
use futures::TryStreamExt;
use tonic::transport::{Channel, Endpoint};

async fn flight_channel(host: &str, port: u16) -> Result<Channel, ArrowError> {
let endpoint = Endpoint::new(format!("http://{}:{}", host, port))
.map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
.connect_timeout(Duration::from_secs(20))
.timeout(Duration::from_secs(20))
.tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
.tcp_keepalive(Option::Some(Duration::from_secs(3600)))
.http2_keep_alive_interval(Duration::from_secs(300))
.keep_alive_timeout(Duration::from_secs(20))
.keep_alive_while_idle(true);

let channel = endpoint
.connect()
.await
.map_err(|e| ArrowError::IoError(format!("Cannot connect to endpoint: {e}")))?;

Ok(channel)
}

async fn fetch_result_and_print(
flight_info: FlightInfo,
client: &mut FlightSqlServiceClient,
) -> Vec<RecordBatch> {
let mut batches = vec![];
for ep in &flight_info.endpoint {
if let Some(tkt) = &ep.ticket {
let stream = client.do_get(tkt.clone()).await.unwrap();
let flight_data = stream.try_collect::<Vec<_>>().await.unwrap();
batches.extend(flight_data_to_batches(&flight_data).unwrap());
};
}

batches
}

async fn authed_client() -> FlightSqlServiceClient {
let channel = flight_channel("localhost", 8904).await.unwrap();
let mut client = FlightSqlServiceClient::new(channel);

// 1. handshake, basic authentication
let _ = client.handshake("root", "").await.unwrap();

client
}

#[tokio::test]
async fn test_sql_client_get_catalogs() {
let mut client = authed_client().await;

let flight_info = client.get_catalogs().await.unwrap();

let actual = fetch_result_and_print(flight_info, &mut client).await;

let actual_str = pretty::pretty_format_batches(&actual).unwrap();

assert!(format!("{actual_str}").contains("cnosdb"));
}

#[tokio::test]
async fn test_sql_client_get_db_schemas() {
let mut client = authed_client().await;

let flight_info = client
.get_db_schemas(CommandGetDbSchemas {
catalog: None,
db_schema_filter_pattern: Some("usage_%".to_string()),
})
.await
.unwrap();

let expected = vec![
"+--------------+---------------+",
"| table_schem | table_catalog |",
"+--------------+---------------+",
"| usage_schema | cnosdb |",
"+--------------+---------------+",
];
let actual = fetch_result_and_print(flight_info, &mut client).await;

assert_batches_eq!(expected, &actual);
}

#[tokio::test]
async fn test_sql_client_get_tables() {
let mut client = authed_client().await;

let flight_info = client
.get_tables(CommandGetTables {
catalog: None,
db_schema_filter_pattern: Some("usage_schema".to_string()),
table_name_filter_pattern: Some("coord_%_in".to_string()),
table_types: vec!["TABLE".to_string(), "VIEW".to_string()],
include_schema: false,
})
.await
.unwrap();

let expected = vec![
"+-----------+--------------+---------------+------------+",
"| table_cat | table_schem | table_name | table_type |",
"+-----------+--------------+---------------+------------+",
"| cnosdb | usage_schema | coord_data_in | TABLE |",
"+-----------+--------------+---------------+------------+",
];
let actual = fetch_result_and_print(flight_info, &mut client).await;

assert_batches_eq!(expected, &actual);
}

#[tokio::test]
async fn test_sql_client_get_table_types() {
let mut client = authed_client().await;

let flight_info = client.get_table_types().await.unwrap();

let expected = vec![
"+-----------------+",
"| table_type |",
"+-----------------+",
"| TABLE |",
"| VIEW |",
"| LOCAL TEMPORARY |",
"+-----------------+",
];
let actual = fetch_result_and_print(flight_info, &mut client).await;

assert_batches_eq!(expected, &actual);
}
}
1 change: 1 addition & 0 deletions e2e_test/src/lib.rs
@@ -1,2 +1,3 @@
mod flight_sql;
mod http_api_tests;
mod kv_service_tests;
119 changes: 95 additions & 24 deletions main/src/flight_sql/flight_sql_server.rs
Expand Up @@ -67,7 +67,7 @@ where
{
async fn precess_statement_query_req(
&self,
sql: String,
sql: impl Into<String>,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
// auth request
Expand All @@ -78,7 +78,7 @@ where
let ctx = self.construct_context(user, request.metadata())?;

// build query state machine
let query_state_machine = self.build_query_state_machine(sql, ctx).await?;
let query_state_machine = self.build_query_state_machine(sql.into(), ctx).await?;

// build logical plan
let logical_plan = self.build_logical_plan(query_state_machine.clone()).await?;
Expand Down Expand Up @@ -163,10 +163,10 @@ where

async fn build_query_state_machine(
&self,
sql: String,
sql: impl Into<String>,
ctx: Context,
) -> Result<QueryStateMachineRef, Status> {
let query = Query::new(ctx, sql);
let query = Query::new(ctx, sql.into());
let query_state_machine = self
.instance
.build_query_state_machine(query)
Expand Down Expand Up @@ -399,8 +399,6 @@ where
Ok(Response::new(flight_info))
}

/// TODO support
/// wait for <https://github.com/cnosdb/cnosdb/issues/642>
async fn get_flight_info_catalogs(
&self,
query: CommandGetCatalogs,
Expand All @@ -411,13 +409,18 @@ where
query, request
);

Err(Status::unimplemented(
"get_flight_info_catalogs not implemented",
))
self.precess_statement_query_req(
"SELECT
TENANT_NAME AS TABLE_CAT
FROM
CLUSTER_SCHEMA.TENANTS
ORDER BY
TABLE_CAT",
request,
)
.await
}

/// TODO support
/// wait for <https://github.com/cnosdb/cnosdb/issues/642>
async fn get_flight_info_schemas(
&self,
query: CommandGetDbSchemas,
Expand All @@ -428,13 +431,38 @@ where
query, request
);

Err(Status::unimplemented(
"get_flight_info_schemas not implemented",
))
let CommandGetDbSchemas {
catalog,
db_schema_filter_pattern,
} = query;

let mut filters = vec![];
let _ = catalog.map(|e| filters.push(format!("TABLE_CATALOG = '{}'", e)));
let _ =
db_schema_filter_pattern.map(|e| filters.push(format!("DATABASE_NAME LIKE '{e}'",)));

let filter = if filters.is_empty() {
"".to_string()
} else {
format!("WHERE {}", filters.join(" AND "))
};

self.precess_statement_query_req(
format!(
"SELECT
DATABASE_NAME AS TABLE_SCHEM,
TENANT_NAME AS TABLE_CATALOG
FROM
INFORMATION_SCHEMA.DATABASES
{filter}
ORDER BY
TABLE_CATALOG, TABLE_SCHEM"
),
request,
)
.await
}

/// TODO support
/// wait for <https://github.com/cnosdb/cnosdb/issues/642>
async fn get_flight_info_tables(
&self,
query: CommandGetTables,
Expand All @@ -445,13 +473,52 @@ where
query, request
);

Err(Status::unimplemented(
"get_flight_info_tables not implemented",
))
let CommandGetTables {
catalog,
db_schema_filter_pattern,
table_name_filter_pattern,
table_types,
include_schema: _,
} = query;

let mut filters = vec![];
let _ = catalog.map(|e| filters.push(format!("TABLE_CATALOG = '{}'", e)));
let _ =
db_schema_filter_pattern.map(|e| filters.push(format!("TABLE_DATABASE LIKE '{e}'")));
let _ = table_name_filter_pattern.map(|e| filters.push(format!("TABLE_NAME LIKE '{e}'")));
if !table_types.is_empty() {
let table_types = table_types
.iter()
.map(|e| format!("'{}'", e))
.collect::<Vec<_>>()
.join(",");
filters.push(format!("TABLE_TYPE IN ({})", table_types));
}

let filter = if filters.is_empty() {
"".to_string()
} else {
format!("WHERE {}", filters.join(" AND "))
};

let sql = format!(
"SELECT
TABLE_TENANT as TABLE_CAT,
TABLE_DATABASE as TABLE_SCHEM,
TABLE_NAME,
TABLE_TYPE
FROM
INFORMATION_SCHEMA.TABLES
{filter}
ORDER BY
TABLE_TYPE, TABLE_CAT, TABLE_SCHEM, TABLE_NAME"
);

trace::warn!("CommandGetTables:\n{sql}");

self.precess_statement_query_req(sql, request).await
}

/// TODO support
/// wait for <https://github.com/cnosdb/cnosdb/issues/642>
async fn get_flight_info_table_types(
&self,
query: CommandGetTableTypes,
Expand All @@ -462,9 +529,13 @@ where
query, request
);

Err(Status::unimplemented(
"get_flight_info_table_types not implemented",
))
self.precess_statement_query_req(
"SELECT TABLE_TYPE
FROM
(VALUES('TABLE'),('VIEW'),('LOCAL TEMPORARY')) t(TABLE_TYPE)",
request,
)
.await
}

/// not support
Expand Down
Expand Up @@ -58,7 +58,7 @@ impl InformationSchemaTablesBuilder {
self.database_names.append_value(database_name.as_ref());
self.table_names.append_value(table_name.as_ref());
self.table_types.append_value(match table_type {
TableType::Base => "BASE TABLE",
TableType::Base => "TABLE",
TableType::View => "VIEW",
TableType::Temporary => "LOCAL TEMPORARY",
});
Expand Down
4 changes: 2 additions & 2 deletions query_server/sqllogicaltests/src/instance.rs
Expand Up @@ -78,7 +78,7 @@ async fn run_query(
target_partitions,
} = options;

let channel = fligjt_channel(host, *port).await?;
let channel = flight_channel(host, *port).await?;

let mut client = FlightSqlServiceClient::new(channel);
client.set_header("TENANT", tenant);
Expand Down Expand Up @@ -108,7 +108,7 @@ async fn run_query(
Ok((schema, batches))
}

async fn fligjt_channel(host: &str, port: u16) -> Result<Channel> {
async fn flight_channel(host: &str, port: u16) -> Result<Channel> {
let endpoint = Endpoint::new(format!("http://{}:{}", host, port))
.map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
.connect_timeout(Duration::from_secs(20))
Expand Down
2 changes: 1 addition & 1 deletion query_server/test/cases/ddl/create_stream_table.result
Expand Up @@ -18,4 +18,4 @@
-- EXECUTE SQL: select * from information_schema.tables where table_database = 'createstreamtable' order by table_name; --
200 OK
table_tenant,table_database,table_name,table_type,table_engine,table_options
cnosdb,createstreamtable,test0,BASE TABLE,TSKV,TODO
cnosdb,createstreamtable,test0,TABLE,TSKV,TODO
Expand Up @@ -36,9 +36,9 @@
-- EXECUTE SQL: select * from information_schema.tables where table_database = 'explain_stream_query' order by table_name; --
200 OK
table_tenant,table_database,table_name,table_type,table_engine,table_options
cnosdb,explain_stream_query,test0,BASE TABLE,TSKV,TODO
cnosdb,explain_stream_query,tskvtable,BASE TABLE,STREAM,TODO
cnosdb,explain_stream_query,tskvtablewithoutschema,BASE TABLE,STREAM,TODO
cnosdb,explain_stream_query,test0,TABLE,TSKV,TODO
cnosdb,explain_stream_query,tskvtable,TABLE,STREAM,TODO
cnosdb,explain_stream_query,tskvtablewithoutschema,TABLE,STREAM,TODO

-- EXECUTE SQL: explain select * from TskvTable; --
200 OK
Expand Down