From fdd89d98e88a5832d11907d7b3f87163cf5c1f71 Mon Sep 17 00:00:00 2001 From: schwarzam Date: Tue, 4 Jun 2024 02:04:31 +0000 Subject: [PATCH 1/4] testing new approach --- Cargo.toml | 3 +++ src/loaders/parquet.rs | 36 ++++++++++++++++++++++++++++++++++++ src/routes.rs | 2 +- 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index d34eaa4..bfee149 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,9 @@ name = "lsdb_server" path = "src/bin.rs" [dependencies] +futures-util = "0.3.30" +arrow = "51.0.0" +parquet = { version = "51.0.0", features = ["arrow", "async"] } axum = "0.7.5" polars = { version = "0.40.0", features = ["lazy", "parquet", "dtype-u8"] } tokio = { version = "1.37.0", features = ["full"] } diff --git a/src/loaders/parquet.rs b/src/loaders/parquet.rs index c6831ef..ae571e2 100644 --- a/src/loaders/parquet.rs +++ b/src/loaders/parquet.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use std::os::fd::FromRawFd; use polars::prelude::*; use polars::io::HiveOptions; @@ -79,3 +80,38 @@ pub async fn process_and_return_parquet_file_lazy( Ok(buf) } +use parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder; +use parquet::arrow::arrow_reader::ArrowReaderMetadata; +use parquet::arrow::arrow_writer::ArrowWriter; +use futures_util::stream::StreamExt; + +pub async fn process_and_return_parquet_file( + file_path: &str, + params: &HashMap +) -> Result, Box> { + + // Open async file containing parquet data + let std_file = std::fs::File::open(file_path)?; + let mut file = tokio::fs::File::from_std(std_file); + + // Construct the reader + let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await?; + let mut stream = ParquetRecordBatchStreamBuilder::new_with_metadata( + file.try_clone().await?, + meta.clone(), + + ).with_row_filter(filter).with_batch_size(8192).build()?; + + let mut buf = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buf, stream.schema().clone(), None)?; + + // Collect all batches and write them to the buffer + while let Some(batch) = stream.next().await { + let batch = batch?; + writer.write(&batch)?; + } + + writer.finish()?; + let _ = writer.close(); + Ok(buf) +} \ No newline at end of file diff --git a/src/routes.rs b/src/routes.rs index 2dd53eb..a24f173 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -25,7 +25,7 @@ pub async fn entry_route(uri: OriginalUri, Query(params): Query Date: Tue, 4 Jun 2024 12:17:01 +0000 Subject: [PATCH 2/4] getting to work with low level parquet --- src/loaders/parquet.rs | 57 ++++++++++++++++++++++++----- src/loaders/parsers/parse_params.rs | 17 +++++++++ 2 files changed, 64 insertions(+), 10 deletions(-) diff --git a/src/loaders/parquet.rs b/src/loaders/parquet.rs index ae571e2..1bc5b23 100644 --- a/src/loaders/parquet.rs +++ b/src/loaders/parquet.rs @@ -80,38 +80,75 @@ pub async fn process_and_return_parquet_file_lazy( Ok(buf) } +use arrow::array::{ArrayRef, NullArray}; +use arrow::array::{Int32Array, new_null_array}; +use arrow::array::{make_array, ArrayDataBuilder}; +use arrow::record_batch::RecordBatch; +use futures_util::stream::StreamExt; use parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder; use parquet::arrow::arrow_reader::ArrowReaderMetadata; use parquet::arrow::arrow_writer::ArrowWriter; -use futures_util::stream::StreamExt; +use std::error::Error; +use std::sync::Arc; +use tokio::fs::File; +use parquet::file::properties::WriterProperties; + pub async fn process_and_return_parquet_file( file_path: &str, params: &HashMap -) -> Result, Box> { - +) -> Result, Box> { // Open async file containing parquet data let std_file = std::fs::File::open(file_path)?; - let mut file = tokio::fs::File::from_std(std_file); + let mut file = File::from_std(std_file); + // Parse selected columns from params + let selected_cols = parse_params::parse_columns_from_params_to_str(¶ms).unwrap_or(Vec::new()); + // Construct the reader let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await?; let mut stream = ParquetRecordBatchStreamBuilder::new_with_metadata( file.try_clone().await?, meta.clone(), - - ).with_row_filter(filter).with_batch_size(8192).build()?; + ) + .with_batch_size(8192).build()?; + let original_metadata = meta.metadata(); + let metadata_keys = original_metadata.file_metadata().key_value_metadata().unwrap().clone(); + let original_schema = stream.schema().clone(); + let mut buf = Vec::new(); - let mut writer = ArrowWriter::try_new(&mut buf, stream.schema().clone(), None)?; + + // Set writer properties with the original metadata + let writer_properties = WriterProperties::builder() + .set_key_value_metadata(Some(metadata_keys)) + .build(); + + let mut writer = ArrowWriter::try_new(&mut buf, original_schema.clone(), Some(writer_properties))?; // Collect all batches and write them to the buffer while let Some(batch) = stream.next().await { let batch = batch?; - writer.write(&batch)?; + + let selected_arrays = original_schema.fields().iter() + .map(|field| { + if let Ok(index) = batch.schema().index_of(field.name()) { + if selected_cols.contains(&field.name().to_string()) { + batch.column(index).clone() + } else { + new_null_array(field.data_type(), batch.num_rows()) + } + } else { + Arc::new(NullArray::new(batch.num_rows())) as ArrayRef + } + }) + .collect::>(); + + let selected_batch = RecordBatch::try_new(original_schema.clone(), selected_arrays)?; + writer.write(&selected_batch)?; } - + writer.finish()?; let _ = writer.close(); Ok(buf) -} \ No newline at end of file +} diff --git a/src/loaders/parsers/parse_params.rs b/src/loaders/parsers/parse_params.rs index 14b37c1..53aca7e 100644 --- a/src/loaders/parsers/parse_params.rs +++ b/src/loaders/parsers/parse_params.rs @@ -19,6 +19,23 @@ pub fn parse_columns_from_params( params: &HashMap ) -> Option ) -> Option> { + // Parse columns from params + if let Some(cols) = params.get("columns") { + let cols = cols.split(",").collect::>(); + let select_cols = cols.iter().map(|x| x.to_string()).collect::>(); + return Some(select_cols); + } + None +} + /// Parses a list of filter conditions from query parameter of hashmap. /// /// # Arguments From 808b9b2ffc7e92336c5a1cf85b586157e541a638 Mon Sep 17 00:00:00 2001 From: schwarzam Date: Sat, 8 Jun 2024 17:08:23 +0000 Subject: [PATCH 3/4] got columns and filters to work to some extent --- Cargo.toml | 2 +- src/loaders/parquet.rs | 65 +++++++++++++++++++++++++++++++++++------- 2 files changed, 55 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bfee149..ffb37ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ futures-util = "0.3.30" arrow = "51.0.0" parquet = { version = "51.0.0", features = ["arrow", "async"] } axum = "0.7.5" -polars = { version = "0.40.0", features = ["lazy", "parquet", "dtype-u8"] } +polars = { version = "0.40.0", features = ["lazy", "parquet", "dtype-u8", "dtype-full"] } tokio = { version = "1.37.0", features = ["full"] } hyper = { version="1.3.1", features = ["full"] } tower = "0.4.13" diff --git a/src/loaders/parquet.rs b/src/loaders/parquet.rs index 1bc5b23..ae87f11 100644 --- a/src/loaders/parquet.rs +++ b/src/loaders/parquet.rs @@ -70,18 +70,39 @@ pub async fn process_and_return_parquet_file_lazy( if !df.get_column_names().contains(&col.as_str()) { let series = Series::full_null(col, df.height(), &dtype); df.with_column(series)?; + } } df = df.select(&all_columns.iter().map(|(col, _)| col.as_str()).collect::>())?; + let col_names = df.get_column_names_owned(); + for (index, name) in col_names.iter().enumerate() { + let col = df.column(&name).unwrap(); + if col.dtype() == &ArrowDataType::LargeUtf8 { + //modifying the column to categorical + // modify the schema to be categorica + // df.try_apply(name, |s| s.categorical().cloned())?; + } + } + //println!("{:?}", df.schema()); + + // Checking if anything changed + // for (index, name) in col_names.iter().enumerate() { + // let col = df.column(&name).unwrap(); + // if col.dtype() == &ArrowDataType::LargeUtf8 { + // println!("Column in LargeUtf8 {:?}", name); + // } + // } + let mut buf = Vec::new(); ParquetWriter::new(&mut buf) .finish(&mut df)?; Ok(buf) } -use arrow::array::{ArrayRef, NullArray}; -use arrow::array::{Int32Array, new_null_array}; + +use arrow::array::{ArrayRef, Float64Array, NullArray}; +use arrow::array::{BooleanArray, Float32Array, new_null_array}; use arrow::array::{make_array, ArrayDataBuilder}; use arrow::record_batch::RecordBatch; use futures_util::stream::StreamExt; @@ -93,6 +114,22 @@ use std::sync::Arc; use tokio::fs::File; use parquet::file::properties::WriterProperties; +fn create_boolean_mask(batch: &RecordBatch, original_schema: &Arc) -> arrow::error::Result> { + // Extract the "PROB_GAL" column and downcast it to Float32Array + let prob_gal = batch.column(original_schema.index_of("PROB_GAL")?); + // Downcast to original schema type + let prob_gal = prob_gal.as_any().downcast_ref::().unwrap(); + + // Create a boolean mask where true is prob_gal > 0.8 + let mut builder = BooleanArray::builder(prob_gal.len()); + for value in prob_gal.iter() { + builder.append_value(value.map_or(false, |v| v > 0.8)); + } + + let filter_mask = builder.finish(); + Ok(Arc::new(filter_mask)) +} + pub async fn process_and_return_parquet_file( file_path: &str, @@ -105,18 +142,21 @@ pub async fn process_and_return_parquet_file( // Parse selected columns from params let selected_cols = parse_params::parse_columns_from_params_to_str(¶ms).unwrap_or(Vec::new()); - // Construct the reader let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await?; - let mut stream = ParquetRecordBatchStreamBuilder::new_with_metadata( + + let stream_builder = ParquetRecordBatchStreamBuilder::new_with_metadata( file.try_clone().await?, - meta.clone(), - ) - .with_batch_size(8192).build()?; - + meta.clone() + ); let original_metadata = meta.metadata(); let metadata_keys = original_metadata.file_metadata().key_value_metadata().unwrap().clone(); - let original_schema = stream.schema().clone(); + let original_schema = stream_builder.schema().clone(); + // Construct the reader + let mut stream = stream_builder + .with_batch_size(8192) + .build()?; + let mut buf = Vec::new(); // Set writer properties with the original metadata @@ -128,12 +168,15 @@ pub async fn process_and_return_parquet_file( // Collect all batches and write them to the buffer while let Some(batch) = stream.next().await { - let batch = batch?; + let mut batch = batch?; + + //let predicate = arrow::compute::FilterBuilder::new(&batch, &projection)?; + batch = arrow::compute::filter_record_batch(&batch, &create_boolean_mask(&batch, &original_schema).unwrap())?; let selected_arrays = original_schema.fields().iter() .map(|field| { if let Ok(index) = batch.schema().index_of(field.name()) { - if selected_cols.contains(&field.name().to_string()) { + if selected_cols.contains(&field.name().to_string()) || &field.name().to_string() == "_hipscat_index" { batch.column(index).clone() } else { new_null_array(field.data_type(), batch.num_rows()) From 78794ad2ad729df5d4004b0bf266112a8ac199a1 Mon Sep 17 00:00:00 2001 From: schwarzam Date: Sun, 9 Jun 2024 04:26:31 +0000 Subject: [PATCH 4/4] REDONE all with parquet, not polars --- Cargo.toml | 5 +- src/loaders/mod.rs | 3 +- src/loaders/parquet.rs | 197 ---------------------------- src/loaders/parquet/helpers.rs | 96 ++++++++++++++ src/loaders/parquet/mod.rs | 3 + src/loaders/parquet/parquet.rs | 125 ++++++++++++++++++ src/loaders/parquet/parse_params.rs | 46 +++++++ src/loaders/parsers/helpers.rs | 91 ------------- src/loaders/parsers/mod.rs | 2 - src/loaders/parsers/parse_params.rs | 94 ------------- src/routes.rs | 2 +- tests/parquet.rs | 5 +- tests/parsers.rs | 17 --- 13 files changed, 275 insertions(+), 411 deletions(-) delete mode 100644 src/loaders/parquet.rs create mode 100644 src/loaders/parquet/helpers.rs create mode 100644 src/loaders/parquet/mod.rs create mode 100644 src/loaders/parquet/parquet.rs create mode 100644 src/loaders/parquet/parse_params.rs delete mode 100644 src/loaders/parsers/helpers.rs delete mode 100644 src/loaders/parsers/mod.rs delete mode 100644 src/loaders/parsers/parse_params.rs diff --git a/Cargo.toml b/Cargo.toml index ffb37ef..12c4056 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,10 +18,9 @@ path = "src/bin.rs" [dependencies] futures-util = "0.3.30" -arrow = "51.0.0" -parquet = { version = "51.0.0", features = ["arrow", "async"] } +arrow = "52.0.0" +parquet = { version = "52.0.0", features = ["arrow", "async"] } axum = "0.7.5" -polars = { version = "0.40.0", features = ["lazy", "parquet", "dtype-u8", "dtype-full"] } tokio = { version = "1.37.0", features = ["full"] } hyper = { version="1.3.1", features = ["full"] } tower = "0.4.13" diff --git a/src/loaders/mod.rs b/src/loaders/mod.rs index aee2043..4f83702 100644 --- a/src/loaders/mod.rs +++ b/src/loaders/mod.rs @@ -1,2 +1 @@ -pub mod parquet; -pub mod parsers; \ No newline at end of file +pub mod parquet; \ No newline at end of file diff --git a/src/loaders/parquet.rs b/src/loaders/parquet.rs deleted file mode 100644 index ae87f11..0000000 --- a/src/loaders/parquet.rs +++ /dev/null @@ -1,197 +0,0 @@ - -use std::collections::HashMap; -use std::os::fd::FromRawFd; - -use polars::prelude::*; -use polars::io::HiveOptions; -use crate::loaders::parsers::parse_params; - - -pub async fn process_and_return_parquet_file_lazy( - file_path: &str, - params: &HashMap -) -> Result, Box> { - let mut args = ScanArgsParquet::default(); - - // TODO: fix the parquet reader hive_options with _hipscat_index - args.hive_options = HiveOptions{enabled:false, schema: None}; - - let lf = LazyFrame::scan_parquet(file_path, args).unwrap(); - - // Retrieve the schema of the LazyFrame - let schema = lf.schema()?; - let all_columns: Vec<(String, DataType)> = schema - .iter_fields() - .map(|field| (field.name().to_string(), field.data_type().clone())) - .collect(); - - let mut selected_cols = parse_params::parse_columns_from_params(¶ms).unwrap_or(Vec::new()); - selected_cols = parse_params::parse_exclude_columns_from_params(¶ms, &lf).unwrap_or(selected_cols); - - //println!("{:?}", ¶ms.get("filters").unwrap()); - let filters = parse_params::parse_filters_from_params(¶ms); - - // HACK: Find a better way to handle each combination of selected params - let mut df; - //In case we have selected columns and filters - if filters.is_ok() && selected_cols.len() > 0{ - df = lf - .drop(["_hipscat_index"]) - .filter( - // only if combined_condition is not empty - filters? - ) - .select(selected_cols) - .collect()?; - } - // In case we have only filters - else if filters.is_ok() { - df = lf - //TODO: fix the parquet reader hive_options with _hipscat_index - .drop(["_hipscat_index"]) - .filter( - // only if combined_condition is not empty - filters? - ) - .collect()?; - } - // In case we have only selected columns - else if selected_cols.len() > 0 { - df = lf - .select(selected_cols) - .collect()?; - } - // In case we have no selected columns or filters, return whole dataframe - else { - df = lf.drop(["_hipscat_index"]).collect()?; - } - - for (col, dtype) in &all_columns { - if !df.get_column_names().contains(&col.as_str()) { - let series = Series::full_null(col, df.height(), &dtype); - df.with_column(series)?; - - } - } - df = df.select(&all_columns.iter().map(|(col, _)| col.as_str()).collect::>())?; - - let col_names = df.get_column_names_owned(); - for (index, name) in col_names.iter().enumerate() { - let col = df.column(&name).unwrap(); - if col.dtype() == &ArrowDataType::LargeUtf8 { - //modifying the column to categorical - // modify the schema to be categorica - // df.try_apply(name, |s| s.categorical().cloned())?; - } - } - //println!("{:?}", df.schema()); - - // Checking if anything changed - // for (index, name) in col_names.iter().enumerate() { - // let col = df.column(&name).unwrap(); - // if col.dtype() == &ArrowDataType::LargeUtf8 { - // println!("Column in LargeUtf8 {:?}", name); - // } - // } - - let mut buf = Vec::new(); - ParquetWriter::new(&mut buf) - .finish(&mut df)?; - Ok(buf) -} - - -use arrow::array::{ArrayRef, Float64Array, NullArray}; -use arrow::array::{BooleanArray, Float32Array, new_null_array}; -use arrow::array::{make_array, ArrayDataBuilder}; -use arrow::record_batch::RecordBatch; -use futures_util::stream::StreamExt; -use parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder; -use parquet::arrow::arrow_reader::ArrowReaderMetadata; -use parquet::arrow::arrow_writer::ArrowWriter; -use std::error::Error; -use std::sync::Arc; -use tokio::fs::File; -use parquet::file::properties::WriterProperties; - -fn create_boolean_mask(batch: &RecordBatch, original_schema: &Arc) -> arrow::error::Result> { - // Extract the "PROB_GAL" column and downcast it to Float32Array - let prob_gal = batch.column(original_schema.index_of("PROB_GAL")?); - // Downcast to original schema type - let prob_gal = prob_gal.as_any().downcast_ref::().unwrap(); - - // Create a boolean mask where true is prob_gal > 0.8 - let mut builder = BooleanArray::builder(prob_gal.len()); - for value in prob_gal.iter() { - builder.append_value(value.map_or(false, |v| v > 0.8)); - } - - let filter_mask = builder.finish(); - Ok(Arc::new(filter_mask)) -} - - -pub async fn process_and_return_parquet_file( - file_path: &str, - params: &HashMap -) -> Result, Box> { - // Open async file containing parquet data - let std_file = std::fs::File::open(file_path)?; - let mut file = File::from_std(std_file); - - // Parse selected columns from params - let selected_cols = parse_params::parse_columns_from_params_to_str(¶ms).unwrap_or(Vec::new()); - - let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await?; - - let stream_builder = ParquetRecordBatchStreamBuilder::new_with_metadata( - file.try_clone().await?, - meta.clone() - ); - let original_metadata = meta.metadata(); - let metadata_keys = original_metadata.file_metadata().key_value_metadata().unwrap().clone(); - let original_schema = stream_builder.schema().clone(); - - // Construct the reader - let mut stream = stream_builder - .with_batch_size(8192) - .build()?; - - let mut buf = Vec::new(); - - // Set writer properties with the original metadata - let writer_properties = WriterProperties::builder() - .set_key_value_metadata(Some(metadata_keys)) - .build(); - - let mut writer = ArrowWriter::try_new(&mut buf, original_schema.clone(), Some(writer_properties))?; - - // Collect all batches and write them to the buffer - while let Some(batch) = stream.next().await { - let mut batch = batch?; - - //let predicate = arrow::compute::FilterBuilder::new(&batch, &projection)?; - batch = arrow::compute::filter_record_batch(&batch, &create_boolean_mask(&batch, &original_schema).unwrap())?; - - let selected_arrays = original_schema.fields().iter() - .map(|field| { - if let Ok(index) = batch.schema().index_of(field.name()) { - if selected_cols.contains(&field.name().to_string()) || &field.name().to_string() == "_hipscat_index" { - batch.column(index).clone() - } else { - new_null_array(field.data_type(), batch.num_rows()) - } - } else { - Arc::new(NullArray::new(batch.num_rows())) as ArrayRef - } - }) - .collect::>(); - - let selected_batch = RecordBatch::try_new(original_schema.clone(), selected_arrays)?; - writer.write(&selected_batch)?; - } - - writer.finish()?; - let _ = writer.close(); - Ok(buf) -} diff --git a/src/loaders/parquet/helpers.rs b/src/loaders/parquet/helpers.rs new file mode 100644 index 0000000..6770c10 --- /dev/null +++ b/src/loaders/parquet/helpers.rs @@ -0,0 +1,96 @@ +use arrow::array::{Float64Array, Float32Array, Int16Array, Int32Array, Int64Array, Int8Array, BooleanArray}; +use arrow::record_batch::RecordBatch; +use arrow::array::BooleanBuilder; +use arrow::datatypes::Schema; +use std::sync::Arc; + +/// Create a boolean mask based on the filters provided. +/// +/// # Arguments +/// +/// * `batch` - A reference to a RecordBatch that will be filtered. +/// * `original_schema` - A reference to the original schema of the RecordBatch. +/// * `filters` - A vector of tuples containing the column name, the comparison operator and the value to compare. +/// +/// # Returns +/// +/// This function returns an Arrow Result with the boolean mask. +pub fn create_boolean_mask(batch: &RecordBatch, original_schema: &Arc, filters: Vec<(&str, &str, &str)>) -> arrow::error::Result> { + let num_rows = batch.num_rows(); + let mut boolean_builder = BooleanBuilder::new(); + + // Initialize all rows as true + for _ in 0..num_rows { + boolean_builder.append_value(true); + } + let mut boolean_mask = boolean_builder.finish(); + + for filter in filters.iter() { + let column = batch.column(original_schema.index_of(filter.0).unwrap()); + + if column.data_type() == &arrow::datatypes::DataType::Float32 { + let column = column.as_any().downcast_ref::().unwrap(); + apply_filter(&mut boolean_mask, column, filter)?; + } else if column.data_type() == &arrow::datatypes::DataType::Float64 { + let column = column.as_any().downcast_ref::().unwrap(); + apply_filter(&mut boolean_mask, column, filter)?; + } else if column.data_type() == &arrow::datatypes::DataType::Int16 { + let column = column.as_any().downcast_ref::().unwrap(); + apply_filter(&mut boolean_mask, column, filter)?; + } else if column.data_type() == &arrow::datatypes::DataType::Int32 { + let column = column.as_any().downcast_ref::().unwrap(); + apply_filter(&mut boolean_mask, column, filter)?; + } else if column.data_type() == &arrow::datatypes::DataType::Int64 { + let column = column.as_any().downcast_ref::().unwrap(); + apply_filter(&mut boolean_mask, column, filter)?; + } else if column.data_type() == &arrow::datatypes::DataType::Int8 { + let column = column.as_any().downcast_ref::().unwrap(); + apply_filter(&mut boolean_mask, column, filter)?; + } else if column.data_type() == &arrow::datatypes::DataType::Boolean { + let column = column.as_any().downcast_ref::().unwrap(); + apply_filter(&mut boolean_mask, column, filter)?; + } else { + return Err(arrow::error::ArrowError::NotYetImplemented(format!("Data type {:?} not yet implemented", column.data_type()))); + } + } + Ok(Arc::new(boolean_mask)) +} + +/// Apply a filter to a column and update the boolean mask. +/// +/// # Arguments +/// +/// * `boolean_mask` - A mutable reference to a BooleanArray that will be updated with the filter results. +/// * `column` - A reference to a PrimitiveArray that will be filtered. +/// * `filter` - A tuple containing the column name, the comparison operator and the value to compare. +/// +/// # Returns +/// +/// This function returns an Arrow Result. +fn apply_filter(boolean_mask: &mut BooleanArray, column: &arrow::array::PrimitiveArray, filter: &(&str, &str, &str)) -> arrow::error::Result<()> +where + T: arrow::datatypes::ArrowPrimitiveType, + T::Native: std::cmp::PartialOrd + std::str::FromStr, + ::Err: std::fmt::Debug, +{ + let filter_value = filter.2.parse::().unwrap(); + let mut new_mask = BooleanBuilder::new(); + + for (index, value) in column.iter().enumerate() { + let current_mask = boolean_mask.value(index); + let result = match filter.1 { + ">" => value.map_or(false, |v| v > filter_value), + "<" => value.map_or(false, |v| v < filter_value), + "=" => value.map_or(false, |v| v == filter_value), + "!=" => value.map_or(false, |v| v != filter_value), + ">=" => value.map_or(false, |v| v >= filter_value), + "<=" => value.map_or(false, |v| v <= filter_value), + "==" => value.map_or(false, |v| v == filter_value), + _ => false, + }; + new_mask.append_value(current_mask && result); + } + + *boolean_mask = new_mask.finish(); + Ok(()) +} \ No newline at end of file diff --git a/src/loaders/parquet/mod.rs b/src/loaders/parquet/mod.rs new file mode 100644 index 0000000..aecf1a5 --- /dev/null +++ b/src/loaders/parquet/mod.rs @@ -0,0 +1,3 @@ +pub mod parse_params; +pub mod helpers; +pub mod parquet; \ No newline at end of file diff --git a/src/loaders/parquet/parquet.rs b/src/loaders/parquet/parquet.rs new file mode 100644 index 0000000..17f632f --- /dev/null +++ b/src/loaders/parquet/parquet.rs @@ -0,0 +1,125 @@ + +use std::collections::HashMap; +use std::error::Error; +use std::sync::Arc; + +use arrow::array::{ArrayRef, NullArray}; +use arrow::array::new_null_array; +use arrow::record_batch::RecordBatch; + +use parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder; +use parquet::arrow::arrow_reader::ArrowReaderMetadata; +use parquet::arrow::arrow_writer::ArrowWriter; +use parquet::file::properties::WriterProperties; + +use futures_util::stream::StreamExt; +use tokio::fs::File; + +use crate::loaders::parquet::parse_params; +use crate::loaders::parquet::helpers::create_boolean_mask; + +/// Process a Parquet file and return the content as a byte stream. +/// +/// # Arguments +/// +/// * `file_path` - A reference to a string containing the path to the Parquet file. +/// * `params` - A reference to a HashMap of parameters containing 'columns' and 'filters' keys. +/// +/// # Returns +/// +/// This function returns a byte stream that can be directly used as an HTTP response body. +pub async fn process_and_return_parquet_file( + file_path: &str, + params: &HashMap +) -> Result, Box> { + // Open async file containing parquet data + let std_file = std::fs::File::open(file_path)?; + let mut file = File::from_std(std_file); + + let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await?; + let stream_builder = ParquetRecordBatchStreamBuilder::new_with_metadata( + file.try_clone().await?, + meta.clone() + ); + let original_metadata = meta.metadata(); + let metadata_keys = original_metadata + .file_metadata() + .key_value_metadata() + .unwrap() + .clone(); + + let original_schema = stream_builder + .schema() + .clone(); + + let all_columns = original_schema + .fields() + .iter() + .map(|field| field.name().to_string()) + .collect::>(); + + // Parse selected columns from params + let columns = parse_params::parse_columns_from_params_to_str(¶ms) + .unwrap_or(all_columns); + + let filters = parse_params::parse_filters(¶ms); + + // Construct the reader stream + let mut stream = stream_builder + .with_batch_size(8192) + .build()?; + + // Set writer properties with the original metadata + let writer_properties = WriterProperties::builder() + .set_key_value_metadata(Some(metadata_keys)) + .build(); + + let mut out_buffer = Vec::new(); + let mut writer = ArrowWriter::try_new( + &mut out_buffer, + original_schema.clone(), + Some(writer_properties) + )?; + + // Collect all batches and write them to the buffer + while let Some(batch) = stream.next().await { + let mut batch = batch?; + + //let predicate = arrow::compute::FilterBuilder::new(&batch, &projection)?; + if filters.is_some() { + let filter_mask = &create_boolean_mask( + &batch, + &original_schema, + filters.clone().unwrap() + ).unwrap(); + batch = arrow::compute::filter_record_batch( + &batch, + &filter_mask + )?; + } + + let selected_arrays = original_schema.fields().iter() + .map(|field| { + if let Ok(index) = batch.schema().index_of(field.name()) { + if columns.contains(&field.name().to_string()) || &field.name().to_string() == "_hipscat_index" { + batch.column(index).clone() + } else { + new_null_array( + field.data_type(), + batch.num_rows() + ) + } + } else { + Arc::new(NullArray::new(batch.num_rows())) as ArrayRef + } + }) + .collect::>(); + + let selected_batch = RecordBatch::try_new(original_schema.clone(), selected_arrays)?; + writer.write(&selected_batch)?; + } + + writer.finish()?; + let _ = writer.close(); + Ok(out_buffer) +} diff --git a/src/loaders/parquet/parse_params.rs b/src/loaders/parquet/parse_params.rs new file mode 100644 index 0000000..8c59302 --- /dev/null +++ b/src/loaders/parquet/parse_params.rs @@ -0,0 +1,46 @@ +use std::collections::HashMap; +use regex::Regex; + +/// # Arguments +/// +/// * `params` - A reference to a HashMap of parameters containing 'columns' key. +/// +/// # Returns +/// +/// A vector of Polars with the selected columns. +pub fn parse_columns_from_params_to_str( params: &HashMap ) -> Option> { + // Parse columns from params + if let Some(cols) = params.get("columns") { + let cols = cols.split(",").collect::>(); + let select_cols = cols.iter().map(|x| x.to_string()).collect::>(); + return Some(select_cols); + } + None +} + +/// # Arguments +/// +/// * `params` - A reference to a HashMap of parameters containing 'filters' key. +/// +/// # Returns +/// +/// A vector of tuples containing the column name, the comparison operator and the value to compare. +pub fn parse_filters(params: &HashMap) -> Option> { + let mut filters = Vec::new(); + if let Some(query) = params.get("filters") { + filters = query.split(",").collect::>(); + } + + if filters.len() == 0 { + return None + } + + let re = Regex::new(r"([a-zA-Z_]+)([<>=]+)([-+]?[0-9]*\.?[0-9]*)").unwrap(); + let mut filter_vec = Vec::new(); + for filter in filters { + let f_vec = re.captures(filter).unwrap(); + filter_vec.push((f_vec.get(1).unwrap().as_str(), f_vec.get(2).unwrap().as_str(), f_vec.get(3).unwrap().as_str())); + } + + Some(filter_vec) +} \ No newline at end of file diff --git a/src/loaders/parsers/helpers.rs b/src/loaders/parsers/helpers.rs deleted file mode 100644 index 89f8541..0000000 --- a/src/loaders/parsers/helpers.rs +++ /dev/null @@ -1,91 +0,0 @@ -use polars::prelude::*; - -/// Returns the column names of a LazyFrame. -/// -/// # Arguments -/// -/// * `lf` - A reference to a LazyFrame. -/// -/// # Returns -/// -/// A vector of strings representing the column names of the DataFrame. -pub fn get_lazyframe_column_names(lf : &LazyFrame) -> Vec { - let df = lf.clone().first().collect().unwrap(); - df.get_column_names().iter().map(|x| x.to_string()).collect() -} - -/// Parses a filter condition from a string into a Polars expression. -/// -/// The expected format for `condition` is "{column_name} {operator} {value}", where: -/// - `column_name` identifies a DataFrame column. -/// - `operator` is one of `<`, `<=`, `>`, `>=`, or `=`. -/// - `value` is a number compared against the column's values. -/// -/// # Parameters -/// * `condition` - A string slice representing the filter condition. -/// -/// # Returns -/// A `Result` containing either: -/// - `Ok(Expr)`: A Polars expression if the parsing succeeds. -/// - `Err(Box)`: An error if the format is incorrect or parsing fails. -pub fn str_filter_to_expr(condition: &str) -> Result> { - use regex::Regex; - - // Regex to catch "{column_name} {operator} {value}" - let re = Regex::new(r"([a-zA-Z_]+)([<>=]+)([-+]?[0-9]*\.?[0-9]*)").unwrap(); - let parts = re.captures(condition).unwrap(); - - if parts.len() == 4 { - let column = parts.get(1).unwrap().as_str(); - let operator = parts.get(2).unwrap().as_str(); - let value = parts.get(3).unwrap().as_str(); - - match operator { - "<" => Ok(col(column).lt(lit(value.parse::()?))), - "<=" => Ok(col(column).lt_eq(lit(value.parse::()?))), - ">" => Ok(col(column).gt(lit(value.parse::()?))), - ">=" => Ok(col(column).gt_eq(lit(value.parse::()?))), - "=" => Ok(col(column).eq(lit(value.parse::()?))), - _ => Err("Unsupported operator".into()), - } - } else { - Err("Invalid condition format".into()) - } -} - - -/// Parses filter conditions from a list of tuples into Polars expressions. -/// -/// The expected format for each tuple in `filters` is (column_name, operator, value), where: -/// - `column_name` identifies a DataFrame column. -/// - `operator` is one of "==", "=", ">", ">=", "<", "<=", "!=", "in", "not in". -/// - `value` is a number or a list of values compared against the column's values. -/// -/// # Parameters -/// * `filters` - An optional vector of tuples representing the filter conditions. -/// -/// # Returns -/// A `Result` containing either: -/// - `Ok(Vec)`: A vector of Polars expressions if parsing succeeds. -/// - `Err(Box)`: An error if the format is incorrect or parsing fails. -pub fn filters_to_expr(filters: Option)>>) -> Result, Box> { - let mut expressions = Vec::new(); - - if let Some(conditions) = filters { - for (column, operator, values) in conditions { - let expression = match operator.as_str() { - "=" | "==" => col(&column).eq(lit(values[0])), - "!=" => col(&column).neq(lit(values[0])), - ">" => col(&column).gt(lit(values[0])), - ">=" => col(&column).gt_eq(lit(values[0])), - "<" => col(&column).lt(lit(values[0])), - "<=" => col(&column).lt_eq(lit(values[0])), - _ => return Err("Unsupported operator".into()), - }; - expressions.push(expression); - } - } - - Ok(expressions) -} - diff --git a/src/loaders/parsers/mod.rs b/src/loaders/parsers/mod.rs deleted file mode 100644 index 7f23e7e..0000000 --- a/src/loaders/parsers/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod parse_params; -pub mod helpers; \ No newline at end of file diff --git a/src/loaders/parsers/parse_params.rs b/src/loaders/parsers/parse_params.rs deleted file mode 100644 index 53aca7e..0000000 --- a/src/loaders/parsers/parse_params.rs +++ /dev/null @@ -1,94 +0,0 @@ -use polars::{lazy::dsl::col, prelude::*}; -use std::collections::HashMap; -use crate::loaders::parsers::helpers; - -/// # Arguments -/// -/// * `params` - A reference to a HashMap of parameters containing 'columns' key. -/// -/// # Returns -/// -/// A vector of Polars with the selected columns. -pub fn parse_columns_from_params( params: &HashMap ) -> Option> { - // Parse columns from params - if let Some(cols) = params.get("columns") { - let cols = cols.split(",").collect::>(); - let select_cols = cols.iter().map(|x| col(x)).collect::>(); - return Some(select_cols); - } - None -} - -/// # Arguments -/// -/// * `params` - A reference to a HashMap of parameters containing 'columns' key. -/// -/// # Returns -/// -/// A vector of Polars with the selected columns. -pub fn parse_columns_from_params_to_str( params: &HashMap ) -> Option> { - // Parse columns from params - if let Some(cols) = params.get("columns") { - let cols = cols.split(",").collect::>(); - let select_cols = cols.iter().map(|x| x.to_string()).collect::>(); - return Some(select_cols); - } - None -} - -/// Parses a list of filter conditions from query parameter of hashmap. -/// -/// # Arguments -/// -/// * `params` - A reference to a HashMap of parameters. -/// -/// # Returns -/// -/// A Polars expression representing the combined filter conditions. -pub fn parse_filters_from_params(params: &HashMap) -> Result> { - let mut filters = Vec::new(); - if let Some(query) = params.get("filters") { - filters = query.split(",").collect::>(); - } - - //TODO: DEPRECATE - let conditions: Result, _> = filters.iter() - .map(|condition: &&str| helpers::str_filter_to_expr(*condition)) - .collect(); - - let combined_condition = conditions?.into_iter() - .reduce(|acc, cond| acc.and(cond)) - .ok_or(""); // Handle case where no conditions are provided - - match combined_condition { - Ok(_) => { Ok(combined_condition.unwrap()) }, - Err(_) => { Err( "Couldnt parse queries".into() ) }, - } -} - - -/// # Arguments -/// -/// * `params` - The client request HashMap of parameters. -/// * `lf` - A reference to a LazyFrame. -/// -/// # Returns -/// -/// A vector of Polars expressions representing the columns to exclude. -pub fn parse_exclude_columns_from_params( params: &HashMap, lf : &LazyFrame ) -> Option> { - // Parse columns from params - if let Some(exclude_cols) = params.get("exclude_cols") { - let exclude_cols = exclude_cols.split(",").collect::>(); - let exclude_cols = exclude_cols.iter().map(|&x| x).collect::>(); - - let cols = helpers::get_lazyframe_column_names(&lf); - - let select_cols = cols.iter() - .filter(|&x| !exclude_cols.contains( &x.as_str() )) - .map(|x| col(x)) - .collect::>(); - - return Some(select_cols); - } - None -} diff --git a/src/routes.rs b/src/routes.rs index a24f173..d0c27a9 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -25,7 +25,7 @@ pub async fn entry_route(uri: OriginalUri, Query(params): Query=30.1241,DEC<=-30.3".to_string()); - let result = parquet::process_and_return_parquet_file_lazy( + let result = parquet::parquet::process_and_return_parquet_file( file_path.to_str().unwrap(), ¶ms ).await; - - // Add assertions here to verify the result } diff --git a/tests/parsers.rs b/tests/parsers.rs index 1f2ccbe..fba86a4 100644 --- a/tests/parsers.rs +++ b/tests/parsers.rs @@ -1,22 +1,5 @@ #[cfg(test)] mod parser { - use lsdb_server::loaders::parsers::helpers; - use polars::{lazy::dsl::col, prelude::*}; - #[test] - fn test_parse_condition() { - let expr = helpers::str_filter_to_expr("ra>=30.1241").unwrap(); - assert_eq!(expr, col("ra").gt_eq(lit(30.1241))); - - let expr = helpers::str_filter_to_expr("dec<=-30.3").unwrap(); - assert_eq!(expr, col("dec").lt_eq(lit(-30.3))); - - let expr = helpers::str_filter_to_expr("dec>4").unwrap(); - assert_eq!(expr, col("dec").gt(lit(4.0))); - - let expr = helpers::str_filter_to_expr("dec=4").unwrap(); - assert_eq!(expr, col("dec").eq(lit(4.0))); - } - } \ No newline at end of file