diff --git a/packages/cubejs-backend-native/js/index.ts b/packages/cubejs-backend-native/js/index.ts index 4aa4d6b5a63ec..24b4b875b0146 100644 --- a/packages/cubejs-backend-native/js/index.ts +++ b/packages/cubejs-backend-native/js/index.ts @@ -1,5 +1,6 @@ import fs from 'fs'; import path from 'path'; +import { Writable } from 'stream'; // import { getEnv } from '@cubejs-backend/shared'; export interface BaseMeta { @@ -115,6 +116,9 @@ function wrapNativeFunctionWithChannelCallback( }; }; +const errorString = (err: any) => + err.message || err.stack?.toString() || typeof err === 'string' ? err.toString() : JSON.stringify(err); + // TODO: Refactor - define classes function wrapNativeFunctionWithStream( fn: (extra: any) => unknown | Promise @@ -129,27 +133,48 @@ function wrapNativeFunctionWithStream( streamResponse = await fn(JSON.parse(extra)); if (streamResponse && streamResponse.stream) { writer.start(); - let chunk: object[] = []; - streamResponse.stream.on('data', (c: object) => { - chunk.push(c); - if (chunk.length >= chunkLength) { - if (!writer.chunk(JSON.stringify(chunk))) { - // TODO replace with actual stream and high watermark implementation - streamResponse.stream.destroy({ - stack: "Rejected by client" - }); + + let chunkBuffer: any[] = []; + const writable = new Writable({ + objectMode: true, + highWaterMark: chunkLength, + write(row: any, encoding: BufferEncoding, callback: (error?: (Error | null)) => void) { + chunkBuffer.push(row); + if (chunkBuffer.length < chunkLength) { + callback(null); + } else { + const toSend = chunkBuffer; + chunkBuffer = []; + writer.chunk(toSend, callback); } - chunk = []; - } - }); - streamResponse.stream.on('close', () => { - if (chunk.length > 0) { - writer.chunk(JSON.stringify(chunk)); + + }, + final(callback: (error?: (Error | null)) => void) { + const end = (err: any) => { + if (err) { + callback(err); + } else { + writer.end(callback); + } + } + if (chunkBuffer.length > 0) { + const toSend = chunkBuffer; + chunkBuffer = []; + writer.chunk(toSend, end); + } else { + end(null); + } + }, + destroy(error: Error | null, callback: (error: (Error | null)) => void) { + if (error) { + writer.reject(errorString(error)); + } + callback(null); } - writer.end(""); }); + streamResponse.stream.pipe(writable); streamResponse.stream.on('error', (err: any) => { - writer.reject(err.message || err.toString()); + writable.destroy(err); }); } else { throw new Error(`Expected stream but nothing returned`); @@ -158,7 +183,7 @@ function wrapNativeFunctionWithStream( if (!!streamResponse && !!streamResponse.stream) { streamResponse.stream.destroy(e); } - writer.reject(e.message || e.toString()); + writer.reject(errorString(e)); } }; }; diff --git a/packages/cubejs-backend-native/src/stream.rs b/packages/cubejs-backend-native/src/stream.rs index 4aedd996572b5..baf2d425d0324 100644 --- a/packages/cubejs-backend-native/src/stream.rs +++ b/packages/cubejs-backend-native/src/stream.rs @@ -1,3 +1,7 @@ +use cubesql::compile::engine::df::scan::{ + transform_response, FieldValue, MemberField, RecordBatch, SchemaRef, ValueObject, +}; +use std::future::Future; use std::sync::{Arc, Mutex}; use cubesql::CubeError; @@ -10,11 +14,14 @@ use crate::utils::bind_method; use tokio::sync::mpsc::{channel as mpsc_channel, Receiver, Sender}; use tokio::sync::oneshot; -type Chunk = Result; +type Chunk = Option>; pub struct JsWriteStream { sender: Sender, ready_sender: Mutex>>>, + tokio_handle: tokio::runtime::Handle, + schema: SchemaRef, + member_fields: Vec, } impl Finalize for JsWriteStream {} @@ -45,10 +52,13 @@ impl JsWriteStream { Ok(obj) } - fn push_chunk(&self, chunk: String) -> bool { - match self.sender.try_send(Ok(chunk)) { - Err(_) => false, - Ok(_) => true, + fn push_chunk(&self, chunk: RecordBatch) -> impl Future> { + let sender = self.sender.clone(); + async move { + sender + .send(Some(Ok(chunk))) + .await + .map_err(|e| CubeError::user(format!("Can't send to channel: {}", e))) } } @@ -58,29 +68,119 @@ impl JsWriteStream { } } - fn end(&self) { - self.push_chunk("".to_string()); + fn end(&self) -> impl Future> { + let sender = self.sender.clone(); + async move { + sender + .send(None) + .await + .map_err(|e| CubeError::user(format!("Can't send to channel: {}", e))) + } } fn reject(&self, err: String) { if let Some(ready_sender) = self.ready_sender.lock().unwrap().take() { let _ = ready_sender.send(Err(CubeError::internal(err.to_string()))); } - let _ = self.sender.try_send(Err(CubeError::internal(err))); + let _ = self.sender.try_send(Some(Err(CubeError::internal(err)))); } } -fn js_stream_push_chunk(mut cx: FunctionContext) -> JsResult { +fn wait_for_future_and_execute_callback( + tokio_handle: tokio::runtime::Handle, + channel: Channel, + callback: Root, + future: impl Future> + Send + Sync + 'static, +) { + tokio_handle.spawn(async move { + let push_result = future.await; + let send_result = channel.try_send(move |mut cx| { + let undefined = cx.undefined(); + let result = match push_result { + Ok(()) => { + let args = vec![cx.null().upcast::(), cx.null().upcast::()]; + callback.into_inner(&mut cx).call(&mut cx, undefined, args) + } + Err(e) => { + let args = vec![cx.string(e.message).upcast::()]; + callback.into_inner(&mut cx).call(&mut cx, undefined, args) + } + }; + if let Err(e) = result { + log::error!("Error during callback execution: {}", e); + } + Ok(()) + }); + if let Err(e) = send_result { + log::error!("Can't execute callback on node event loop: {}", e); + } + }); +} + +pub struct JsValueObject<'a> { + pub cx: FunctionContext<'a>, + pub handle: Handle<'a, JsArray>, +} + +impl ValueObject for JsValueObject<'_> { + fn len(&mut self) -> Result { + Ok(self.handle.len(&mut self.cx) as usize) + } + + fn get(&mut self, index: usize, field_name: &str) -> Result { + let value = self + .handle + .get::(&mut self.cx, index as u32) + .map_err(|e| { + CubeError::user(format!("Can't get object at array index {}: {}", index, e)) + })? + .get::(&mut self.cx, field_name) + .map_err(|e| { + CubeError::user(format!("Can't get '{}' field value: {}", field_name, e)) + })?; + if let Ok(s) = value.downcast::(&mut self.cx) { + Ok(FieldValue::String(s.value(&mut self.cx))) + } else if let Ok(n) = value.downcast::(&mut self.cx) { + Ok(FieldValue::Number(n.value(&mut self.cx))) + } else if let Ok(b) = value.downcast::(&mut self.cx) { + Ok(FieldValue::Bool(b.value(&mut self.cx))) + } else if value.downcast::(&mut self.cx).is_ok() + || value.downcast::(&mut self.cx).is_ok() + { + Ok(FieldValue::Null) + } else { + Err(CubeError::user(format!( + "Expected primitive value but found: {:?}", + value + ))) + } + } +} + +fn js_stream_push_chunk(mut cx: FunctionContext) -> JsResult { #[cfg(build = "debug")] trace!("JsWriteStream.push_chunk"); let this = cx .this() .downcast_or_throw::, _>(&mut cx)?; - let result = cx.argument::(0)?; - let result = this.push_chunk(result.value(&mut cx)); - - Ok(cx.boolean(result)) + let chunk_array = cx.argument::(0)?; + let callback = cx.argument::(1)?.root(&mut cx); + let mut value_object = JsValueObject { + cx, + handle: chunk_array, + }; + let value = + transform_response(&mut value_object, this.schema.clone(), &this.member_fields).unwrap(); + let future = this.push_chunk(value); + wait_for_future_and_execute_callback( + this.tokio_handle.clone(), + value_object.cx.channel(), + callback, + future, + ); + + Ok(value_object.cx.undefined()) } fn js_stream_start(mut cx: FunctionContext) -> JsResult { @@ -102,7 +202,9 @@ fn js_stream_end(mut cx: FunctionContext) -> JsResult { let this = cx .this() .downcast_or_throw::, _>(&mut cx)?; - this.end(); + let future = this.end(); + let callback = cx.argument::(0)?.root(&mut cx); + wait_for_future_and_execute_callback(this.tokio_handle.clone(), cx.channel(), callback, future); Ok(cx.undefined()) } @@ -123,16 +225,19 @@ pub async fn call_js_with_stream_as_callback( channel: Arc, js_method: Arc>, query: Option, + schema: SchemaRef, + member_fields: Vec, ) -> Result, CubeError> { - let chunk_size = std::env::var("CUBEJS_DB_QUERY_STREAM_HIGH_WATER_MARK") + let channel_size = std::env::var("CUBEJS_DB_QUERY_STREAM_HIGH_WATER_MARK") .ok() .map(|v| v.parse::().unwrap()) .unwrap_or(8192); - let channel_size = 1_000_000 / chunk_size; let (sender, receiver) = mpsc_channel::(channel_size); let (ready_sender, ready_receiver) = oneshot::channel(); + let tokio_handle = tokio::runtime::Handle::current(); + channel .try_send(move |mut cx| { // https://github.com/neon-bindings/neon/issues/672 @@ -144,6 +249,9 @@ pub async fn call_js_with_stream_as_callback( let stream = JsWriteStream { sender, ready_sender: Mutex::new(Some(ready_sender)), + tokio_handle, + schema, + member_fields, }; let this = cx.undefined(); let args: Vec> = vec![ diff --git a/packages/cubejs-backend-native/src/transport.rs b/packages/cubejs-backend-native/src/transport.rs index ca5608384c11e..1add7aad3bbe0 100644 --- a/packages/cubejs-backend-native/src/transport.rs +++ b/packages/cubejs-backend-native/src/transport.rs @@ -3,6 +3,7 @@ use neon::prelude::*; use async_trait::async_trait; use cubeclient::models::{V1Error, V1LoadRequestQuery, V1LoadResponse, V1MetaResponse}; +use cubesql::compile::engine::df::scan::{MemberField, SchemaRef}; use cubesql::{ di_service, sql::AuthContextRef, @@ -175,6 +176,8 @@ impl TransportService for NodeBridgeTransport { query: V1LoadRequestQuery, ctx: AuthContextRef, meta: LoadRequestMeta, + schema: SchemaRef, + member_fields: Vec, ) -> Result { trace!("[transport] Request ->"); @@ -201,11 +204,13 @@ impl TransportService for NodeBridgeTransport { self.channel.clone(), self.on_load_stream.clone(), Some(extra), + schema.clone(), + member_fields.clone(), ) .await; if let Err(e) = &res { - if e.message.to_lowercase() == "continue wait" { + if e.message.to_lowercase().contains("continue wait") { continue; } } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 0d03a56985bc4..cb2b5ea40fdff 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -601,10 +601,10 @@ export class QueryCache { .then(([client]) => client.streamQuery(req.query, req.values)) .then((source) => { const cleanup = (error) => { - if (!source.destroyed) { + if (error && !source.destroyed) { source.destroy(error); } - if (!target.destroyed) { + if (error && !target.destroyed) { target.destroy(error); } if (!logged && source.destroyed && target.destroyed) { @@ -625,13 +625,13 @@ export class QueryCache { } }; - source.once('end', cleanup); + source.once('end', () => cleanup(undefined)); source.once('error', cleanup); - source.once('close', cleanup); + source.once('close', () => cleanup(undefined)); - target.once('end', cleanup); + target.once('end', () => cleanup(undefined)); target.once('error', cleanup); - target.once('close', cleanup); + target.once('close', () => cleanup(undefined)); source.pipe(target); }) diff --git a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs index 4416ac1ef61d5..1aae2d3a29e24 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs @@ -7,7 +7,7 @@ use std::{ use async_trait::async_trait; use cubeclient::models::{V1LoadRequestQuery, V1LoadResult, V1LoadResultAnnotation}; -use datafusion::{ +pub use datafusion::{ arrow::{ array::{ArrayRef, BooleanBuilder, Float64Builder, Int64Builder, StringBuilder}, datatypes::{DataType, SchemaRef}, @@ -28,6 +28,7 @@ use log::warn; use crate::{ sql::AuthContextRef, transport::{CubeStreamReceiver, LoadRequestMeta, TransportService}, + CubeError, }; use chrono::{TimeZone, Utc}; use datafusion::{ @@ -35,7 +36,7 @@ use datafusion::{ execution::context::TaskContext, scalar::ScalarValue, }; -use serde_json::json; +use serde_json::{json, Value}; #[derive(Debug, Clone, Eq, PartialEq)] pub enum MemberField { @@ -172,28 +173,86 @@ struct CubeScanExecutionPlan { meta: LoadRequestMeta, } +#[derive(Debug)] +pub enum FieldValue { + String(String), + Number(f64), + Bool(bool), + Null, +} + +pub trait ValueObject { + fn len(&mut self) -> std::result::Result; + + fn get(&mut self, index: usize, field_name: &str) + -> std::result::Result; +} + +pub struct JsonValueObject { + rows: Vec, +} + +impl JsonValueObject { + pub fn new(rows: Vec) -> Self { + JsonValueObject { rows } + } +} + +impl ValueObject for JsonValueObject { + fn len(&mut self) -> std::result::Result { + Ok(self.rows.len()) + } + + fn get<'a>( + &'a mut self, + index: usize, + field_name: &str, + ) -> std::result::Result { + let option = self.rows[index].as_object_mut(); + let as_object = if let Some(as_object) = option { + as_object + } else { + return Err(CubeError::user(format!( + "Unexpected response from Cube, row is not an object: {:?}", + self.rows[index] + ))); + }; + let value = as_object.remove(field_name).ok_or(CubeError::user(format!( + r#"Unexpected response from Cube, Field "{}" doesn't exist in row: {:?}"#, + field_name, as_object + )))?; + Ok(match value { + Value::String(s) => FieldValue::String(s), + Value::Number(n) => FieldValue::Number(n.as_f64().ok_or( + DataFusionError::Execution(format!("Can't convert {:?} to float", n)), + )?), + Value::Bool(b) => FieldValue::Bool(b), + Value::Null => FieldValue::Null, + x => { + return Err(CubeError::user(format!( + "Expected primitive value but found: {:?}", + x + ))); + } + }) + } +} + macro_rules! build_column { ($data_type:expr, $builder_ty:ty, $response:expr, $field_name:expr, { $($builder_block:tt)* }, { $($scalar_block:tt)* }) => {{ - let mut builder = <$builder_ty>::new($response.len()); + let len = $response.len()?; + let mut builder = <$builder_ty>::new(len); match $field_name { MemberField::Member(field_name) => { - for row in $response.iter() { - let as_object = row.as_object().ok_or( - DataFusionError::Execution( - format!("Unexpected response from Cube.js, row is not an object, actual: {}", row) - ), - )?; - let value = as_object.get(field_name).ok_or( - DataFusionError::Execution( - format!(r#"Unexpected response from Cube.js, Field "{}" doesn't exist in row"#, field_name) - ), - )?; - match (&value, &mut builder) { - (serde_json::Value::Null, builder) => builder.append_null()?, + for i in 0..len { + let value = $response.get(i, field_name)?; + match (value, &mut builder) { + (FieldValue::Null, builder) => builder.append_null()?, $($builder_block)* + #[allow(unreachable_patterns)] (v, _) => { - return Err(DataFusionError::Execution(format!( + return Err(CubeError::user(format!( "Unable to map value {:?} to {:?}", v, $data_type @@ -203,11 +262,11 @@ macro_rules! build_column { } } MemberField::Literal(value) => { - for _ in 0..$response.len() { + for _ in 0..len { match (value, &mut builder) { $($scalar_block)* (v, _) => { - return Err(DataFusionError::Execution(format!( + return Err(CubeError::user(format!( "Unable to map value {:?} to {:?}", v, $data_type @@ -297,11 +356,16 @@ impl ExecutionPlan for CubeScanExecutionPlan { if stream_mode { let result = self .transport - .load_stream(self.request.clone(), self.auth_context.clone(), meta) + .load_stream( + self.request.clone(), + self.auth_context.clone(), + meta, + self.schema.clone(), + self.member_fields.clone(), + ) .await; let stream = result.map_err(|err| DataFusionError::Execution(err.to_string()))?; - let main_stream = - CubeScanMemoryStream::new(stream, self.schema.clone(), self.member_fields.clone()); + let main_stream = CubeScanMemoryStream::new(stream); return Ok(Box::pin(CubeScanStreamRouter::new( Some(main_stream), @@ -310,7 +374,7 @@ impl ExecutionPlan for CubeScanExecutionPlan { ))); } - one_shot_stream.data = Some(transform_response( + let mut response = JsonValueObject::new( load_data( request, self.auth_context.clone(), @@ -320,9 +384,15 @@ impl ExecutionPlan for CubeScanExecutionPlan { ) .await? .data, - one_shot_stream.schema.clone(), - &one_shot_stream.member_fields, - )?); + ); + one_shot_stream.data = Some( + transform_response( + &mut response, + one_shot_stream.schema.clone(), + &one_shot_stream.member_fields, + ) + .map_err(|e| DataFusionError::Execution(e.message.to_string()))?, + ); Ok(Box::pin(CubeScanStreamRouter::new( None, @@ -393,28 +463,18 @@ impl CubeScanOneShotStream { struct CubeScanMemoryStream { receiver: CubeStreamReceiver, - /// Schema representing the data - schema: SchemaRef, - member_fields: Vec, } impl CubeScanMemoryStream { - pub fn new( - receiver: CubeStreamReceiver, - schema: SchemaRef, - member_fields: Vec, - ) -> Self { - Self { - receiver, - schema, - member_fields, - } + pub fn new(receiver: CubeStreamReceiver) -> Self { + Self { receiver } } fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - self.receiver.poll_recv(cx).map(|poll| match poll { - Some(Ok(chunk)) => parse_chunk(chunk, self.schema.clone(), &self.member_fields), - Some(Err(err)) => Some(Err(ArrowError::ComputeError(err.to_string()))), + self.receiver.poll_recv(cx).map(|res| match res { + Some(Some(Ok(chunk))) => Some(Ok(chunk)), + Some(Some(Err(err))) => Some(Err(ArrowError::ComputeError(err.to_string()))), + Some(None) => None, None => None, }) } @@ -545,38 +605,24 @@ fn load_to_stream_sync(one_shot_stream: &mut CubeScanOneShotStream) -> Result<() .join() .map_err(|_| DataFusionError::Execution(format!("Can't load to stream")))?; - one_shot_stream.data = Some(transform_response( - res.unwrap().data, - one_shot_stream.schema.clone(), - &one_shot_stream.member_fields, - )?); + let mut response = JsonValueObject::new(res.unwrap().data); + one_shot_stream.data = Some( + transform_response( + &mut response, + one_shot_stream.schema.clone(), + &one_shot_stream.member_fields, + ) + .map_err(|e| DataFusionError::Execution(e.message.to_string()))?, + ); Ok(()) } -fn parse_chunk( - chunk: String, +pub fn transform_response( + response: &mut V, schema: SchemaRef, member_fields: &Vec, -) -> Option> { - if chunk.is_empty() { - return None; - } - - match serde_json::from_str::>(&chunk) { - Ok(data) => match transform_response(data, schema, member_fields) { - Ok(batch) => Some(Ok(batch)), - Err(err) => Some(Err(err.into())), - }, - Err(e) => Some(Err(e.into())), - } -} - -fn transform_response( - response: Vec, - schema: SchemaRef, - member_fields: &Vec, -) -> Result { +) -> std::result::Result { let mut columns = vec![]; for (i, schema_field) in schema.fields().iter().enumerate() { @@ -589,9 +635,9 @@ fn transform_response( response, field_name, { - (serde_json::Value::String(v), builder) => builder.append_value(v)?, - (serde_json::Value::Bool(v), builder) => builder.append_value(if *v { "true" } else { "false" })?, - (serde_json::Value::Number(v), builder) => builder.append_value(v.to_string())?, + (FieldValue::String(v), builder) => builder.append_value(v)?, + (FieldValue::Bool(v), builder) => builder.append_value(if v { "true" } else { "false" })?, + (FieldValue::Number(v), builder) => builder.append_value(v.to_string())?, }, { (ScalarValue::Utf8(v), builder) => builder.append_option(v.as_ref())?, @@ -605,11 +651,8 @@ fn transform_response( response, field_name, { - (serde_json::Value::Number(number), builder) => match number.as_i64() { - Some(v) => builder.append_value(v)?, - None => builder.append_null()?, - }, - (serde_json::Value::String(s), builder) => match s.parse::() { + (FieldValue::Number(number), builder) => builder.append_value(number.round() as i64)?, + (FieldValue::String(s), builder) => match s.parse::() { Ok(v) => builder.append_value(v)?, Err(error) => { warn!( @@ -633,11 +676,8 @@ fn transform_response( response, field_name, { - (serde_json::Value::Number(number), builder) => match number.as_f64() { - Some(v) => builder.append_value(v)?, - None => builder.append_null()?, - }, - (serde_json::Value::String(s), builder) => match s.parse::() { + (FieldValue::Number(number), builder) => builder.append_value(number)?, + (FieldValue::String(s), builder) => match s.parse::() { Ok(v) => builder.append_value(v)?, Err(error) => { warn!( @@ -661,8 +701,8 @@ fn transform_response( response, field_name, { - (serde_json::Value::Bool(v), builder) => builder.append_value(*v)?, - (serde_json::Value::String(v), builder) => match v.as_str() { + (FieldValue::Bool(v), builder) => builder.append_value(v)?, + (FieldValue::String(v), builder) => match v.as_str() { "true" | "1" => builder.append_value(true)?, "false" | "0" => builder.append_value(false)?, _ => { @@ -684,9 +724,12 @@ fn transform_response( response, field_name, { - (serde_json::Value::String(s), builder) => { + (FieldValue::String(s), builder) => { let timestamp = Utc .datetime_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S.%f") + .or_else(|_| Utc + .datetime_from_str(s.as_str(), "%Y-%m-%d %H:%M:%S.%f") + ) .map_err(|e| { DataFusionError::Execution(format!( "Can't parse timestamp: '{}': {}", @@ -702,8 +745,8 @@ fn transform_response( ) } t => { - return Err(DataFusionError::NotImplemented(format!( - "Type {} is not supported in response transformation from Cube.js", + return Err(CubeError::user(format!( + "Type {} is not supported in response transformation from Cube", t, ))) } @@ -797,6 +840,8 @@ mod tests { _query: V1LoadRequestQuery, _ctx: AuthContextRef, _meta_fields: LoadRequestMeta, + _schema: SchemaRef, + _member_fields: Vec, ) -> Result { panic!("It's a fake transport"); } diff --git a/rust/cubesql/cubesql/src/compile/test/mod.rs b/rust/cubesql/cubesql/src/compile/test/mod.rs index 4d1c3e93785ec..574079bd11f4c 100644 --- a/rust/cubesql/cubesql/src/compile/test/mod.rs +++ b/rust/cubesql/cubesql/src/compile/test/mod.rs @@ -5,8 +5,10 @@ use cubeclient::models::{ V1CubeMeta, V1CubeMetaDimension, V1CubeMetaJoin, V1CubeMetaMeasure, V1CubeMetaSegment, V1LoadRequestQuery, V1LoadResponse, }; +use datafusion::arrow::datatypes::SchemaRef; use crate::{ + compile::engine::df::scan::MemberField, sql::{ session::DatabaseProtocol, AuthContextRef, AuthenticateResponse, HttpAuthContext, ServerManager, Session, SessionManager, SqlAuthService, @@ -255,6 +257,8 @@ pub fn get_test_transport() -> Arc { _query: V1LoadRequestQuery, _ctx: AuthContextRef, _meta_fields: LoadRequestMeta, + _schema: SchemaRef, + _member_fields: Vec, ) -> Result { panic!("It's a fake transport"); } diff --git a/rust/cubesql/cubesql/src/transport/service.rs b/rust/cubesql/cubesql/src/transport/service.rs index 1dda87de38d2e..39a15cd1f630f 100644 --- a/rust/cubesql/cubesql/src/transport/service.rs +++ b/rust/cubesql/cubesql/src/transport/service.rs @@ -4,6 +4,7 @@ use cubeclient::{ models::{V1LoadRequest, V1LoadRequestQuery, V1LoadResponse}, }; +use datafusion::arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use serde_derive::*; use std::{fmt::Debug, sync::Arc, time::Duration}; use tokio::{ @@ -12,7 +13,7 @@ use tokio::{ }; use crate::{ - compile::MetaContext, + compile::{engine::df::scan::MemberField, MetaContext}, sql::{AuthContextRef, HttpAuthContext}, CubeError, }; @@ -67,10 +68,12 @@ pub trait TransportService: Send + Sync + Debug { query: V1LoadRequestQuery, ctx: AuthContextRef, meta_fields: LoadRequestMeta, + schema: SchemaRef, + member_fields: Vec, ) -> Result; } -pub type CubeStreamReceiver = Receiver>; +pub type CubeStreamReceiver = Receiver>>; #[derive(Debug)] struct MetaCacheBucket { @@ -172,6 +175,8 @@ impl TransportService for HttpTransport { _query: V1LoadRequestQuery, _ctx: AuthContextRef, _meta_fields: LoadRequestMeta, + _schema: SchemaRef, + _member_fields: Vec, ) -> Result { panic!("Does not work for standalone mode yet"); }