Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 39 additions & 15 deletions packages/cubejs-backend-native/src/node_export.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use cubesql::compile::parser::parse_sql_to_statement;
use cubesql::compile::{convert_statement_to_cube_query, get_df_batches};
use cubesql::config::processing_loop::ShutdownMode;
use cubesql::sql::dataframe::{arrow_to_column_type, Column};
use cubesql::sql::ColumnFlags;
use cubesql::sql::dataframe::arrow_to_column_type;
use cubesql::sql::ColumnType;
use cubesql::transport::{SpanId, TransportService};
use futures::StreamExt;

Expand Down Expand Up @@ -33,6 +33,8 @@ use cubesql::{telemetry::ReportingLogger, CubeError};
#[cfg(feature = "async-log")]
use log_nonblock::NonBlockingLoggerBuilder;
use neon::prelude::*;
use serde::Serialize;

use neon::result::Throw;
#[cfg(not(feature = "async-log"))]
use simple_logger::SimpleLogger;
Expand All @@ -43,6 +45,14 @@ pub(crate) struct SQLInterface {

impl Finalize for SQLInterface {}

#[derive(Serialize)]
pub(crate) struct SchemaColumn {
name: String,
column_type: ColumnType,
#[serde(skip_serializing_if = "Option::is_none")]
format: Option<serde_json::Value>,
}

impl SQLInterface {
pub fn new(services: Arc<NodeCubeServices>) -> Self {
Self { services }
Expand Down Expand Up @@ -302,7 +312,7 @@ async fn handle_sql_query(
parse_sql_to_statement(sql_query, session.state.protocol.clone(), &mut None)?;
let query_plan = convert_statement_to_cube_query(
stmt,
meta_context,
meta_context.clone(),
session,
&mut None,
span_id_clone,
Expand All @@ -321,23 +331,37 @@ async fn handle_sql_query(

drain_handler.handle(stream_methods.on.clone()).await?;

// Get schema from stream and convert to DataFrame columns format
let stream_schema = stream.schema();
let mut columns = Vec::with_capacity(stream_schema.fields().len());
for field in stream_schema.fields().iter() {
columns.push(Column::new(
field.name().clone(),
arrow_to_column_type(field.data_type().clone())?,
ColumnFlags::empty(),
));
// Get schema from stream and convert to schema columns with format
let mut columns = Vec::with_capacity(stream.schema().fields().len());

for field in stream.schema().fields().iter() {
let format = field
.metadata()
.and_then(|m| m.get("member_name"))
.and_then(|member_name| {
meta_context
.find_measure_with_name(member_name)
.and_then(|m| m.format.as_ref())
.or_else(|| {
meta_context
.find_dimension_with_name(member_name)
.and_then(|d| d.format.as_ref())
})
})
.and_then(|fmt| serde_json::to_value(fmt.as_ref()).ok());

columns.push(SchemaColumn {
name: field.name().clone(),
column_type: arrow_to_column_type(field.data_type().clone())?,
format,
});
}

// Send schema first
let columns_json = serde_json::to_value(&columns)?;
let mut schema_response = Map::new();
schema_response.insert("schema".into(), columns_json);
schema_response.insert("schema".into(), serde_json::to_value(&columns)?);

if let Some(last_refresh_time) = stream_schema.metadata().get("lastRefreshTime") {
if let Some(last_refresh_time) = stream.schema().metadata().get("lastRefreshTime") {
schema_response.insert(
"lastRefreshTime".into(),
serde_json::Value::String(last_refresh_time.clone()),
Expand Down
3 changes: 3 additions & 0 deletions packages/cubejs-client-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import HttpTransport, { ErrorResponse, ITransport, TransportOptions } from './Ht
import RequestError from './RequestError';
import {
CacheMode,
DimensionFormat,
ExtractTimeMembers,
LoadResponse,
MeasureFormat,
MetaResponse,
PivotQuery,
ProgressResponse,
Expand Down Expand Up @@ -123,6 +125,7 @@ export type CubeSqlSchemaColumn = {
name: string;
// eslint-disable-next-line camelcase
column_type: string;
format?: DimensionFormat | MeasureFormat;
};

export type CubeSqlResult = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ cube(`Orders`, {
totalAmount: {
sql: `amount`,
type: `sum`,
format: `currency`,
},
toRemove: {
type: `count`,
},
numberTotal: {
sql: `${totalAmount}`,
type: `number`
type: `number`,
format: '$,.2f',
},
amountRank: {
multi_stage: true,
Expand Down Expand Up @@ -121,7 +123,8 @@ cube(`Orders`, {

createdAt: {
sql: `created_at`,
type: `time`
type: `time`,
format: '%Y-%m-%d',
},

updatedAt: {
Expand Down
31 changes: 27 additions & 4 deletions packages/cubejs-testing/test/smoke-cubesql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ describe('SQL API', () => {
const execute = () => new Promise<void>((resolve, reject) => {
const onData = jest.fn((chunk: Buffer) => {
const chunkStr = chunk.toString('utf-8');

if (isFirstChunk) {
isFirstChunk = false;
const json = JSON.parse(chunkStr);
Expand Down Expand Up @@ -201,13 +201,13 @@ describe('SQL API', () => {
});

await execute();

// Verify schema was sent first
expect(schemaReceived).toBe(true);

// Verify empty data was sent
expect(emptyDataReceived).toBe(true);

// Verify no actual rows were returned
const dataLines = data.split('\n').filter((it) => it.trim());
if (dataLines.length > 0) {
Expand All @@ -218,6 +218,29 @@ describe('SQL API', () => {
}
});

it('includes format in schema columns', async () => {
const response = await fetch(`${birdbox.configuration.apiUrl}/cubesql`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: token,
},
body: JSON.stringify({
query: 'SELECT DATE_TRUNC(\'year\', createdAt) AS createdAt, totalAmount, numberTotal, status FROM Orders LIMIT 1',
}),
});

const text = await response.text();
const { schema } = JSON.parse(text.split('\n')[0]);

expect(schema).toEqual([
{ name: 'createdAt', column_type: 'Timestamp', format: { type: 'custom-time', value: '%Y-%m-%d' } },
{ name: 'totalAmount', column_type: 'Double', format: 'currency' },
{ name: 'numberTotal', column_type: 'Double', format: { type: 'custom-numeric', value: '$,.2f' } },
{ name: 'status', column_type: 'String' },
]);
});

describe('sql4sql', () => {
async function generateSql(query: string, disablePostPprocessing: boolean = false) {
const response = await fetch(`${birdbox.configuration.apiUrl}/sql`, {
Expand Down
18 changes: 17 additions & 1 deletion rust/cubesql/cubesql/src/compile/rewrite/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2090,7 +2090,23 @@ impl LanguageToLogicalPlanConverter {

let node = Arc::new(CubeScanNode::new(
Arc::new(DFSchema::new_with_metadata(
fields.into_iter().map(|(f, _)| f).collect(),
fields
.into_iter()
.map(|(f, m)| {
if let MemberField::Member(member) = &m {
let mut metadata =
f.field().metadata().cloned().unwrap_or_default();
metadata
.insert("member_name".to_string(), member.member.clone());
DFField::from_qualified(
f.qualifier().unwrap_or(&"".to_string()),
f.field().clone().with_metadata(Some(metadata)),
)
} else {
f
}
})
.collect(),
HashMap::new(),
)?),
member_fields,
Expand Down
Loading