Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,342 changes: 850 additions & 492 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions columnq/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "columnq"
version = "0.9.0"
version = "0.9.1"
homepage = "https://github.com/roapi/roapi"
license = "MIT"
authors = ["QP Hou <dave2008713@gmail.com>"]
Expand All @@ -13,9 +13,9 @@ path = "src/lib.rs"
[dependencies]
# pulling arrow-schema manually to enable the serde feature.
# TODO: add serde feature in datafusion to avoid this workaround
arrow-schema = { version = "51", features = ["serde"] }
arrow-schema = { version = "52", features = ["serde"] }

datafusion = "37"
datafusion = "39"
object_store = { version = "0", features = ["aws", "gcp", "azure"] }
percent-encoding = "2.2.0"
url = "2.2"
Expand Down Expand Up @@ -48,15 +48,15 @@ hyper-rustls = { version = "0.25", default-features = false, optional = true }
tokio-postgres = { version = "0.7.8", optional = true }

[dependencies.deltalake]
version = "0.17.3"
version = "0.18.1"
# git = "https://github.com/delta-io/delta-rs.git"
# rev = "63c14b3716428ff65e01404c6f7e62f341c98f05"
features = ["datafusion", "s3", "gcs", "azure"]
default-features = false

[dependencies.connectorx]
git = "https://github.com/roapi/connector-x.git"
rev = "0732ad7efb08fdb4c08793f8942ed2a76406f92a"
rev = "f7ba1c38130e554cdb7dc4e04d7a166e3286d4e7"
version = "0.3.3-alpha.1"
features = ["default", "dst_arrow"]
optional = true
Expand Down
1 change: 1 addition & 0 deletions columnq/src/columnq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ impl ColumnQ {

Ok(())
}

pub fn register_object_storage(
&mut self,
url: &Url,
Expand Down
14 changes: 5 additions & 9 deletions columnq/src/table/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::sync::Arc;
use datafusion::arrow;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
Expand Down Expand Up @@ -43,18 +42,17 @@ pub async fn to_datafusion_table(
.option
.clone()
.unwrap_or_else(|| TableLoadOption::csv(TableOptionCsv::default()));
if opt
let opt = opt
.as_csv()
.expect("Invalid table format option, expect csv")
.use_memory_table
{
.expect("Invalid table format option, expect csv");
if opt.use_memory_table {
return to_mem_table(t, dfctx).await;
}
let table_url =
ListingTableUrl::parse(t.get_uri_str()).with_context(|_| table::ListingTableUriSnafu {
uri: t.get_uri_str().to_string(),
})?;
let mut options = ListingOptions::new(Arc::new(CsvFormat::default()));
let mut options = ListingOptions::new(Arc::new(opt.as_df_csv_format()));
if let Some(partition_cols) = t.datafusion_partition_cols() {
options = options.with_table_partition_cols(partition_cols)
}
Expand Down Expand Up @@ -96,12 +94,10 @@ pub async fn to_mem_table(
let schema_ref: arrow::datatypes::SchemaRef = match &t.schema {
Some(s) => Arc::new(s.into()),
None => {
let fmt = opt.as_arrow_csv_format();
let schemas = partitions_from_table_source!(
t,
|r| {
let fmt = arrow::csv::reader::Format::default()
.with_delimiter(delimiter)
.with_header(has_header);
let (schema, record_count) = fmt
.infer_schema(r, None)
.context(InferSchemaSnafu)
Expand Down
4 changes: 2 additions & 2 deletions columnq/src/table/excel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ mod tests {
infer_schema(&range, &TableOptionExcel::default(), &Some(table_schema)).unwrap();

assert_eq!(
schema.all_fields(),
schema.flattened_fields(),
vec![
&Field::new("float_column", DataType::Float64, true),
&Field::new("integer_column", DataType::Int64, true),
Expand Down Expand Up @@ -727,7 +727,7 @@ option:
let rb = excel_range_to_record_batch(range, &TableOptionExcel::default(), shema).unwrap();

assert_eq!(
rb.schema().all_fields(),
rb.schema().flattened_fields(),
vec![
&Field::new("float_column", DataType::Float64, true),
&Field::new("integer_column", DataType::Int64, true),
Expand Down
29 changes: 20 additions & 9 deletions columnq/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::io::Read;
use std::path::Path;
use std::sync::Arc;

use datafusion::datasource::TableProvider;

use datafusion::arrow;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTableUrl};
use datafusion::datasource::TableProvider;
use serde::de::{Deserialize, Deserializer};
use serde_derive::Deserialize;
use snafu::prelude::*;
Expand Down Expand Up @@ -216,6 +216,18 @@ impl TableOptionCsv {
pub fn default_use_memory_table() -> bool {
true
}

pub fn as_arrow_csv_format(&self) -> arrow::csv::reader::Format {
arrow::csv::reader::Format::default()
.with_delimiter(self.delimiter)
.with_header(self.has_header)
}

pub fn as_df_csv_format(&self) -> CsvFormat {
CsvFormat::default()
.with_has_header(self.has_header)
.with_delimiter(self.delimiter)
}
}

impl Default for TableOptionCsv {
Expand Down Expand Up @@ -604,14 +616,13 @@ pub async fn datafusion_get_or_infer_schema(
.expect("Failed to create file url"),
)
.context(InferSchemaSnafu)?;
let inferred_schema = listing_options
.infer_schema(&dfctx.state(), &file_url)
.await
.context(InferSchemaSnafu)?;
schemas.push(
Arc::into_inner(
listing_options
.infer_schema(&dfctx.state(), &file_url)
.await
.context(InferSchemaSnafu)?,
)
.expect("Failed to unwrap schemaref into schema on merge"),
Arc::into_inner(inferred_schema)
.expect("Failed to unwrap schemaref into schema on merge"),
);
}
Arc::new(arrow::datatypes::Schema::try_merge(schemas).context(MergeSchemaSnafu)?)
Expand Down
14 changes: 7 additions & 7 deletions roapi/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "roapi"
version = "0.12.0"
version = "0.12.1"
authors = ["QP Hou <dave2008713@gmail.com>"]
homepage = "https://github.com/roapi/roapi"
license = "MIT"
Expand Down Expand Up @@ -45,9 +45,9 @@ thiserror = "1"
snafu = "0"

# flight-sql
arrow-flight = { version = "51", features = ["flight-sql-experimental"] }
arrow-flight = { version = "52", features = ["flight-sql-experimental"] }
tonic = { version = "0.11", features = ["tls"] }
prost = "0"
prost = "0.12"
futures = "0"
# TODO: remove once_cell dependency
once_cell = "*"
Expand All @@ -56,13 +56,13 @@ uuid = "1"

[dependencies.convergence]
version = "0"
git = "https://github.com/returnString/convergence.git"
rev = "8360bb4f6ee3778f4a4951026a2e1a3cdb6f4df7"
git = "https://github.com/roapi/convergence.git"
rev = "40c5fca38d83611f6c941c9ffe86b597c2e5851b"

[dependencies.convergence-arrow]
version = "0"
git = "https://github.com/returnString/convergence.git"
rev = "8360bb4f6ee3778f4a4951026a2e1a3cdb6f4df7"
git = "https://github.com/roapi/convergence.git"
rev = "40c5fca38d83611f6c941c9ffe86b597c2e5851b"

[features]
default = ["rustls", "snmalloc"]
Expand Down
6 changes: 3 additions & 3 deletions roapi/src/server/flight_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use arrow_flight::sql::{
CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys, CommandGetPrimaryKeys,
CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
CommandPreparedStatementQuery, CommandPreparedStatementUpdate, CommandStatementQuery,
CommandStatementSubstraitPlan, CommandStatementUpdate, Nullable, ProstMessageExt, Searchable,
SqlInfo, TicketStatementQuery, XdbcDataType,
CommandStatementSubstraitPlan, CommandStatementUpdate, DoPutPreparedStatementResult, Nullable,
ProstMessageExt, Searchable, SqlInfo, TicketStatementQuery, XdbcDataType,
};
use arrow_flight::{
flight_service_server::FlightService, Action, FlightDescriptor, FlightEndpoint, FlightInfo,
Expand Down Expand Up @@ -800,7 +800,7 @@ impl<H: RoapiContext> FlightSqlService for RoapiFlightSqlService<H> {
&self,
_query: CommandPreparedStatementQuery,
_request: Request<PeekableFlightDataStream>,
) -> Result<Response<<Self as FlightService>::DoPutStream>, Status> {
) -> Result<DoPutPreparedStatementResult, Status> {
Err(Status::unimplemented(
"do_put_prepared_statement_query not implemented",
))
Expand Down
2 changes: 1 addition & 1 deletion roapi/tests/flight_sql_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ async fn test_flight_sql_get_tables() {
let schema_bytes = schema_arr.value(0);
let schema = try_schema_from_ipc_buffer(schema_bytes).expect("Invalid schema data");
assert_eq!(
schema.all_fields(),
schema.flattened_fields(),
vec![
&Field::new("city", DataType::Utf8, true),
&Field::new("lat", DataType::Float64, true),
Expand Down
11 changes: 9 additions & 2 deletions roapi/tests/postgres_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ async fn test_postgres_count() {
.unwrap();

match &rows[0] {
tokio_postgres::SimpleQueryMessage::RowDescription(_) => {}
_ => {
panic!("expect row description from query result.");
}
}

match &rows[1] {
tokio_postgres::SimpleQueryMessage::Row(row) => {
assert_eq!(row.get(0).unwrap(), "132");
}
Expand All @@ -34,7 +41,7 @@ async fn test_postgres_count() {
}
}

match &rows[1] {
match &rows[2] {
tokio_postgres::SimpleQueryMessage::CommandComplete(modified) => {
assert_eq!(modified, &1);
}
Expand All @@ -43,5 +50,5 @@ async fn test_postgres_count() {
}
}

assert_eq!(rows.len(), 2);
assert_eq!(rows.len(), 3);
}