From 03599af024591a192b61128835996307cfbd4e99 Mon Sep 17 00:00:00 2001 From: "yukkit.zhang" Date: Thu, 11 May 2023 13:43:50 +0800 Subject: [PATCH] feat: support metadata api for flight sql --- Cargo.lock | 3 + e2e_test/Cargo.toml | 3 + e2e_test/src/flight_sql.rs | 144 ++++++++++++++++++ e2e_test/src/lib.rs | 1 + main/src/flight_sql/flight_sql_server.rs | 119 ++++++++++++--- .../builder/tables.rs | 2 +- query_server/sqllogicaltests/src/instance.rs | 4 +- .../test/cases/ddl/create_stream_table.result | 2 +- .../dml/explain/explain_stream_query.result | 6 +- .../information_schema/tables.result | 6 +- 10 files changed, 256 insertions(+), 34 deletions(-) create mode 100644 e2e_test/src/flight_sql.rs diff --git a/Cargo.lock b/Cargo.lock index 1c43d08ae..6848716ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2247,7 +2247,10 @@ name = "e2e_test" version = "2.3.0" dependencies = [ "actix-rt", + "arrow-flight", + "datafusion", "flatbuffers 22.12.6", + "futures", "http_protocol", "protos", "tokio", diff --git a/e2e_test/Cargo.toml b/e2e_test/Cargo.toml index 1e3d7a988..57610bc97 100644 --- a/e2e_test/Cargo.toml +++ b/e2e_test/Cargo.toml @@ -16,3 +16,6 @@ flatbuffers = { workspace = true } tonic = { workspace = true, features = ["tls", "transport"] } tokio = { workspace = true, features = ["full"] } tokio-stream = { workspace = true } +datafusion = { workspace = true } +arrow-flight = { workspace = true, features = ["flight-sql-experimental"] } +futures = { workspace = true, default-features = false, features = ["alloc"] } diff --git a/e2e_test/src/flight_sql.rs b/e2e_test/src/flight_sql.rs new file mode 100644 index 000000000..c66a18e9b --- /dev/null +++ b/e2e_test/src/flight_sql.rs @@ -0,0 +1,144 @@ +#[cfg(test)] +mod test { + use std::time::Duration; + + use arrow_flight::sql::client::FlightSqlServiceClient; + use arrow_flight::sql::{CommandGetDbSchemas, CommandGetTables}; + use arrow_flight::utils::flight_data_to_batches; + use arrow_flight::FlightInfo; + use datafusion::arrow::error::ArrowError; + use datafusion::arrow::record_batch::RecordBatch; + use datafusion::arrow::util::pretty; + use datafusion::{arrow, assert_batches_eq}; + use futures::TryStreamExt; + use tonic::transport::{Channel, Endpoint}; + + async fn flight_channel(host: &str, port: u16) -> Result { + let endpoint = Endpoint::new(format!("http://{}:{}", host, port)) + .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))? + .connect_timeout(Duration::from_secs(20)) + .timeout(Duration::from_secs(20)) + .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait + .tcp_keepalive(Option::Some(Duration::from_secs(3600))) + .http2_keep_alive_interval(Duration::from_secs(300)) + .keep_alive_timeout(Duration::from_secs(20)) + .keep_alive_while_idle(true); + + let channel = endpoint + .connect() + .await + .map_err(|e| ArrowError::IoError(format!("Cannot connect to endpoint: {e}")))?; + + Ok(channel) + } + + async fn fetch_result_and_print( + flight_info: FlightInfo, + client: &mut FlightSqlServiceClient, + ) -> Vec { + let mut batches = vec![]; + for ep in &flight_info.endpoint { + if let Some(tkt) = &ep.ticket { + let stream = client.do_get(tkt.clone()).await.unwrap(); + let flight_data = stream.try_collect::>().await.unwrap(); + batches.extend(flight_data_to_batches(&flight_data).unwrap()); + }; + } + + batches + } + + async fn authed_client() -> FlightSqlServiceClient { + let channel = flight_channel("localhost", 8904).await.unwrap(); + let mut client = FlightSqlServiceClient::new(channel); + + // 1. handshake, basic authentication + let _ = client.handshake("root", "").await.unwrap(); + + client + } + + #[tokio::test] + async fn test_sql_client_get_catalogs() { + let mut client = authed_client().await; + + let flight_info = client.get_catalogs().await.unwrap(); + + let actual = fetch_result_and_print(flight_info, &mut client).await; + + let actual_str = pretty::pretty_format_batches(&actual).unwrap(); + + assert!(format!("{actual_str}").contains("cnosdb")); + } + + #[tokio::test] + async fn test_sql_client_get_db_schemas() { + let mut client = authed_client().await; + + let flight_info = client + .get_db_schemas(CommandGetDbSchemas { + catalog: None, + db_schema_filter_pattern: Some("usage_%".to_string()), + }) + .await + .unwrap(); + + let expected = vec![ + "+--------------+---------------+", + "| table_schem | table_catalog |", + "+--------------+---------------+", + "| usage_schema | cnosdb |", + "+--------------+---------------+", + ]; + let actual = fetch_result_and_print(flight_info, &mut client).await; + + assert_batches_eq!(expected, &actual); + } + + #[tokio::test] + async fn test_sql_client_get_tables() { + let mut client = authed_client().await; + + let flight_info = client + .get_tables(CommandGetTables { + catalog: None, + db_schema_filter_pattern: Some("usage_schema".to_string()), + table_name_filter_pattern: Some("coord_%_in".to_string()), + table_types: vec!["TABLE".to_string(), "VIEW".to_string()], + include_schema: false, + }) + .await + .unwrap(); + + let expected = vec![ + "+-----------+--------------+---------------+------------+", + "| table_cat | table_schem | table_name | table_type |", + "+-----------+--------------+---------------+------------+", + "| cnosdb | usage_schema | coord_data_in | TABLE |", + "+-----------+--------------+---------------+------------+", + ]; + let actual = fetch_result_and_print(flight_info, &mut client).await; + + assert_batches_eq!(expected, &actual); + } + + #[tokio::test] + async fn test_sql_client_get_table_types() { + let mut client = authed_client().await; + + let flight_info = client.get_table_types().await.unwrap(); + + let expected = vec![ + "+-----------------+", + "| table_type |", + "+-----------------+", + "| TABLE |", + "| VIEW |", + "| LOCAL TEMPORARY |", + "+-----------------+", + ]; + let actual = fetch_result_and_print(flight_info, &mut client).await; + + assert_batches_eq!(expected, &actual); + } +} diff --git a/e2e_test/src/lib.rs b/e2e_test/src/lib.rs index e032243a7..7a2801238 100644 --- a/e2e_test/src/lib.rs +++ b/e2e_test/src/lib.rs @@ -1,2 +1,3 @@ +mod flight_sql; mod http_api_tests; mod kv_service_tests; diff --git a/main/src/flight_sql/flight_sql_server.rs b/main/src/flight_sql/flight_sql_server.rs index e43fd4340..fbed61c7a 100644 --- a/main/src/flight_sql/flight_sql_server.rs +++ b/main/src/flight_sql/flight_sql_server.rs @@ -67,7 +67,7 @@ where { async fn precess_statement_query_req( &self, - sql: String, + sql: impl Into, request: Request, ) -> Result, Status> { // auth request @@ -78,7 +78,7 @@ where let ctx = self.construct_context(user, request.metadata())?; // build query state machine - let query_state_machine = self.build_query_state_machine(sql, ctx).await?; + let query_state_machine = self.build_query_state_machine(sql.into(), ctx).await?; // build logical plan let logical_plan = self.build_logical_plan(query_state_machine.clone()).await?; @@ -163,10 +163,10 @@ where async fn build_query_state_machine( &self, - sql: String, + sql: impl Into, ctx: Context, ) -> Result { - let query = Query::new(ctx, sql); + let query = Query::new(ctx, sql.into()); let query_state_machine = self .instance .build_query_state_machine(query) @@ -399,8 +399,6 @@ where Ok(Response::new(flight_info)) } - /// TODO support - /// wait for async fn get_flight_info_catalogs( &self, query: CommandGetCatalogs, @@ -411,13 +409,18 @@ where query, request ); - Err(Status::unimplemented( - "get_flight_info_catalogs not implemented", - )) + self.precess_statement_query_req( + "SELECT + TENANT_NAME AS TABLE_CAT + FROM + CLUSTER_SCHEMA.TENANTS + ORDER BY + TABLE_CAT", + request, + ) + .await } - /// TODO support - /// wait for async fn get_flight_info_schemas( &self, query: CommandGetDbSchemas, @@ -428,13 +431,38 @@ where query, request ); - Err(Status::unimplemented( - "get_flight_info_schemas not implemented", - )) + let CommandGetDbSchemas { + catalog, + db_schema_filter_pattern, + } = query; + + let mut filters = vec![]; + let _ = catalog.map(|e| filters.push(format!("TABLE_CATALOG = '{}'", e))); + let _ = + db_schema_filter_pattern.map(|e| filters.push(format!("DATABASE_NAME LIKE '{e}'",))); + + let filter = if filters.is_empty() { + "".to_string() + } else { + format!("WHERE {}", filters.join(" AND ")) + }; + + self.precess_statement_query_req( + format!( + "SELECT + DATABASE_NAME AS TABLE_SCHEM, + TENANT_NAME AS TABLE_CATALOG + FROM + INFORMATION_SCHEMA.DATABASES + {filter} + ORDER BY + TABLE_CATALOG, TABLE_SCHEM" + ), + request, + ) + .await } - /// TODO support - /// wait for async fn get_flight_info_tables( &self, query: CommandGetTables, @@ -445,13 +473,52 @@ where query, request ); - Err(Status::unimplemented( - "get_flight_info_tables not implemented", - )) + let CommandGetTables { + catalog, + db_schema_filter_pattern, + table_name_filter_pattern, + table_types, + include_schema: _, + } = query; + + let mut filters = vec![]; + let _ = catalog.map(|e| filters.push(format!("TABLE_CATALOG = '{}'", e))); + let _ = + db_schema_filter_pattern.map(|e| filters.push(format!("TABLE_DATABASE LIKE '{e}'"))); + let _ = table_name_filter_pattern.map(|e| filters.push(format!("TABLE_NAME LIKE '{e}'"))); + if !table_types.is_empty() { + let table_types = table_types + .iter() + .map(|e| format!("'{}'", e)) + .collect::>() + .join(","); + filters.push(format!("TABLE_TYPE IN ({})", table_types)); + } + + let filter = if filters.is_empty() { + "".to_string() + } else { + format!("WHERE {}", filters.join(" AND ")) + }; + + let sql = format!( + "SELECT + TABLE_TENANT as TABLE_CAT, + TABLE_DATABASE as TABLE_SCHEM, + TABLE_NAME, + TABLE_TYPE + FROM + INFORMATION_SCHEMA.TABLES + {filter} + ORDER BY + TABLE_TYPE, TABLE_CAT, TABLE_SCHEM, TABLE_NAME" + ); + + trace::warn!("CommandGetTables:\n{sql}"); + + self.precess_statement_query_req(sql, request).await } - /// TODO support - /// wait for async fn get_flight_info_table_types( &self, query: CommandGetTableTypes, @@ -462,9 +529,13 @@ where query, request ); - Err(Status::unimplemented( - "get_flight_info_table_types not implemented", - )) + self.precess_statement_query_req( + "SELECT TABLE_TYPE + FROM + (VALUES('TABLE'),('VIEW'),('LOCAL TEMPORARY')) t(TABLE_TYPE)", + request, + ) + .await } /// not support diff --git a/query_server/query/src/metadata/information_schema_provider/builder/tables.rs b/query_server/query/src/metadata/information_schema_provider/builder/tables.rs index 660b1806c..e9114f55c 100644 --- a/query_server/query/src/metadata/information_schema_provider/builder/tables.rs +++ b/query_server/query/src/metadata/information_schema_provider/builder/tables.rs @@ -58,7 +58,7 @@ impl InformationSchemaTablesBuilder { self.database_names.append_value(database_name.as_ref()); self.table_names.append_value(table_name.as_ref()); self.table_types.append_value(match table_type { - TableType::Base => "BASE TABLE", + TableType::Base => "TABLE", TableType::View => "VIEW", TableType::Temporary => "LOCAL TEMPORARY", }); diff --git a/query_server/sqllogicaltests/src/instance.rs b/query_server/sqllogicaltests/src/instance.rs index 993be449a..5605571f8 100644 --- a/query_server/sqllogicaltests/src/instance.rs +++ b/query_server/sqllogicaltests/src/instance.rs @@ -78,7 +78,7 @@ async fn run_query( target_partitions, } = options; - let channel = fligjt_channel(host, *port).await?; + let channel = flight_channel(host, *port).await?; let mut client = FlightSqlServiceClient::new(channel); client.set_header("TENANT", tenant); @@ -108,7 +108,7 @@ async fn run_query( Ok((schema, batches)) } -async fn fligjt_channel(host: &str, port: u16) -> Result { +async fn flight_channel(host: &str, port: u16) -> Result { let endpoint = Endpoint::new(format!("http://{}:{}", host, port)) .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))? .connect_timeout(Duration::from_secs(20)) diff --git a/query_server/test/cases/ddl/create_stream_table.result b/query_server/test/cases/ddl/create_stream_table.result index fd0a3daf3..74af650df 100644 --- a/query_server/test/cases/ddl/create_stream_table.result +++ b/query_server/test/cases/ddl/create_stream_table.result @@ -18,4 +18,4 @@ -- EXECUTE SQL: select * from information_schema.tables where table_database = 'createstreamtable' order by table_name; -- 200 OK table_tenant,table_database,table_name,table_type,table_engine,table_options -cnosdb,createstreamtable,test0,BASE TABLE,TSKV,TODO +cnosdb,createstreamtable,test0,TABLE,TSKV,TODO diff --git a/query_server/test/cases/dml/explain/explain_stream_query.result b/query_server/test/cases/dml/explain/explain_stream_query.result index 7bb91dbae..b9a4b8bb9 100644 --- a/query_server/test/cases/dml/explain/explain_stream_query.result +++ b/query_server/test/cases/dml/explain/explain_stream_query.result @@ -36,9 +36,9 @@ -- EXECUTE SQL: select * from information_schema.tables where table_database = 'explain_stream_query' order by table_name; -- 200 OK table_tenant,table_database,table_name,table_type,table_engine,table_options -cnosdb,explain_stream_query,test0,BASE TABLE,TSKV,TODO -cnosdb,explain_stream_query,tskvtable,BASE TABLE,STREAM,TODO -cnosdb,explain_stream_query,tskvtablewithoutschema,BASE TABLE,STREAM,TODO +cnosdb,explain_stream_query,test0,TABLE,TSKV,TODO +cnosdb,explain_stream_query,tskvtable,TABLE,STREAM,TODO +cnosdb,explain_stream_query,tskvtablewithoutschema,TABLE,STREAM,TODO -- EXECUTE SQL: explain select * from TskvTable; -- 200 OK diff --git a/query_server/test/cases/sys_table/information_schema/tables.result b/query_server/test/cases/sys_table/information_schema/tables.result index 38cf1fa8d..857086bee 100644 --- a/query_server/test/cases/sys_table/information_schema/tables.result +++ b/query_server/test/cases/sys_table/information_schema/tables.result @@ -46,17 +46,17 @@ -- AFTER_SORT -- 200 OK table_tenant,table_database,table_name,table_type,table_engine,table_options -test_tbls_tenant1,test_tbls_db1,test_info_schema_tbl,BASE TABLE,TSKV,TODO +test_tbls_tenant1,test_tbls_db1,test_info_schema_tbl,TABLE,TSKV,TODO -- EXECUTE SQL: select * from information_schema.tables; -- -- AFTER_SORT -- 200 OK table_tenant,table_database,table_name,table_type,table_engine,table_options -test_tbls_tenant1,test_tbls_db1,test_info_schema_tbl,BASE TABLE,TSKV,TODO +test_tbls_tenant1,test_tbls_db1,test_info_schema_tbl,TABLE,TSKV,TODO -- EXECUTE SQL: select * from information_schema.tables; -- -- AFTER_SORT -- 200 OK table_tenant,table_database,table_name,table_type,table_engine,table_options -test_tbls_tenant1,test_tbls_db1,test_info_schema_tbl,BASE TABLE,TSKV,TODO +test_tbls_tenant1,test_tbls_db1,test_info_schema_tbl,TABLE,TSKV,TODO