Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ name = "lsdb_server"
path = "src/bin.rs"

[dependencies]
futures-util = "0.3.30"
arrow = "52.0.0"
parquet = { version = "52.0.0", features = ["arrow", "async"] }
axum = "0.7.5"
polars = { version = "0.40.0", features = ["lazy", "parquet", "dtype-u8"] }
tokio = { version = "1.37.0", features = ["full"] }
hyper = { version="1.3.1", features = ["full"] }
tower = "0.4.13"
Expand Down
3 changes: 1 addition & 2 deletions src/loaders/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
pub mod parquet;
pub mod parsers;
pub mod parquet;
81 changes: 0 additions & 81 deletions src/loaders/parquet.rs

This file was deleted.

96 changes: 96 additions & 0 deletions src/loaders/parquet/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use arrow::array::{Float64Array, Float32Array, Int16Array, Int32Array, Int64Array, Int8Array, BooleanArray};
use arrow::record_batch::RecordBatch;
use arrow::array::BooleanBuilder;
use arrow::datatypes::Schema;
use std::sync::Arc;

/// Create a boolean mask based on the filters provided.
///
/// # Arguments
///
/// * `batch` - A reference to a RecordBatch that will be filtered.
/// * `original_schema` - A reference to the original schema of the RecordBatch.
/// * `filters` - A vector of tuples containing the column name, the comparison operator and the value to compare.
///
/// # Returns
///
/// This function returns an Arrow Result with the boolean mask.
pub fn create_boolean_mask(batch: &RecordBatch, original_schema: &Arc<Schema>, filters: Vec<(&str, &str, &str)>) -> arrow::error::Result<Arc<BooleanArray>> {
let num_rows = batch.num_rows();
let mut boolean_builder = BooleanBuilder::new();

// Initialize all rows as true
for _ in 0..num_rows {
boolean_builder.append_value(true);
}
let mut boolean_mask = boolean_builder.finish();

for filter in filters.iter() {
let column = batch.column(original_schema.index_of(filter.0).unwrap());

if column.data_type() == &arrow::datatypes::DataType::Float32 {
let column = column.as_any().downcast_ref::<Float32Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Float64 {
let column = column.as_any().downcast_ref::<Float64Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int16 {
let column = column.as_any().downcast_ref::<Int16Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int32 {
let column = column.as_any().downcast_ref::<Int32Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int64 {
let column = column.as_any().downcast_ref::<Int64Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int8 {
let column = column.as_any().downcast_ref::<Int8Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Boolean {
let column = column.as_any().downcast_ref::<Int16Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else {
return Err(arrow::error::ArrowError::NotYetImplemented(format!("Data type {:?} not yet implemented", column.data_type())));
}
}
Ok(Arc::new(boolean_mask))
}

/// Apply a filter to a column and update the boolean mask.
///
/// # Arguments
///
/// * `boolean_mask` - A mutable reference to a BooleanArray that will be updated with the filter results.
/// * `column` - A reference to a PrimitiveArray that will be filtered.
/// * `filter` - A tuple containing the column name, the comparison operator and the value to compare.
///
/// # Returns
///
/// This function returns an Arrow Result.
fn apply_filter<T>(boolean_mask: &mut BooleanArray, column: &arrow::array::PrimitiveArray<T>, filter: &(&str, &str, &str)) -> arrow::error::Result<()>
where
T: arrow::datatypes::ArrowPrimitiveType,
T::Native: std::cmp::PartialOrd + std::str::FromStr,
<T::Native as std::str::FromStr>::Err: std::fmt::Debug,
{
let filter_value = filter.2.parse::<T::Native>().unwrap();
let mut new_mask = BooleanBuilder::new();

for (index, value) in column.iter().enumerate() {
let current_mask = boolean_mask.value(index);
let result = match filter.1 {
">" => value.map_or(false, |v| v > filter_value),
"<" => value.map_or(false, |v| v < filter_value),
"=" => value.map_or(false, |v| v == filter_value),
"!=" => value.map_or(false, |v| v != filter_value),
">=" => value.map_or(false, |v| v >= filter_value),
"<=" => value.map_or(false, |v| v <= filter_value),
"==" => value.map_or(false, |v| v == filter_value),
_ => false,
};
new_mask.append_value(current_mask && result);
}

*boolean_mask = new_mask.finish();
Ok(())
}
3 changes: 3 additions & 0 deletions src/loaders/parquet/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod parse_params;
pub mod helpers;
pub mod parquet;
125 changes: 125 additions & 0 deletions src/loaders/parquet/parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@

use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;

use arrow::array::{ArrayRef, NullArray};
use arrow::array::new_null_array;
use arrow::record_batch::RecordBatch;

use parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder;
use parquet::arrow::arrow_reader::ArrowReaderMetadata;
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::file::properties::WriterProperties;

use futures_util::stream::StreamExt;
use tokio::fs::File;

use crate::loaders::parquet::parse_params;
use crate::loaders::parquet::helpers::create_boolean_mask;

/// Process a Parquet file and return the content as a byte stream.
///
/// # Arguments
///
/// * `file_path` - A reference to a string containing the path to the Parquet file.
/// * `params` - A reference to a HashMap of parameters containing 'columns' and 'filters' keys.
///
/// # Returns
///
/// This function returns a byte stream that can be directly used as an HTTP response body.
pub async fn process_and_return_parquet_file(
file_path: &str,
params: &HashMap<String, String>
) -> Result<Vec<u8>, Box<dyn Error>> {
// Open async file containing parquet data
let std_file = std::fs::File::open(file_path)?;
let mut file = File::from_std(std_file);

let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await?;
let stream_builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
file.try_clone().await?,
meta.clone()
);
let original_metadata = meta.metadata();
let metadata_keys = original_metadata
.file_metadata()
.key_value_metadata()
.unwrap()
.clone();

let original_schema = stream_builder
.schema()
.clone();

let all_columns = original_schema
.fields()
.iter()
.map(|field| field.name().to_string())
.collect::<Vec<_>>();

// Parse selected columns from params
let columns = parse_params::parse_columns_from_params_to_str(&params)
.unwrap_or(all_columns);

let filters = parse_params::parse_filters(&params);

// Construct the reader stream
let mut stream = stream_builder
.with_batch_size(8192)
.build()?;

// Set writer properties with the original metadata
let writer_properties = WriterProperties::builder()
.set_key_value_metadata(Some(metadata_keys))
.build();

let mut out_buffer = Vec::new();
let mut writer = ArrowWriter::try_new(
&mut out_buffer,
original_schema.clone(),
Some(writer_properties)
)?;

// Collect all batches and write them to the buffer
while let Some(batch) = stream.next().await {
let mut batch = batch?;

//let predicate = arrow::compute::FilterBuilder::new(&batch, &projection)?;
if filters.is_some() {
let filter_mask = &create_boolean_mask(
&batch,
&original_schema,
filters.clone().unwrap()
).unwrap();
batch = arrow::compute::filter_record_batch(
&batch,
&filter_mask
)?;
}

let selected_arrays = original_schema.fields().iter()
.map(|field| {
if let Ok(index) = batch.schema().index_of(field.name()) {
if columns.contains(&field.name().to_string()) || &field.name().to_string() == "_hipscat_index" {
batch.column(index).clone()
} else {
new_null_array(
field.data_type(),
batch.num_rows()
)
}
} else {
Arc::new(NullArray::new(batch.num_rows())) as ArrayRef
}
})
.collect::<Vec<_>>();

let selected_batch = RecordBatch::try_new(original_schema.clone(), selected_arrays)?;
writer.write(&selected_batch)?;
}

writer.finish()?;
let _ = writer.close();
Ok(out_buffer)
}
46 changes: 46 additions & 0 deletions src/loaders/parquet/parse_params.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::collections::HashMap;
use regex::Regex;

/// # Arguments
///
/// * `params` - A reference to a HashMap of parameters containing 'columns' key.
///
/// # Returns
///
/// A vector of Polars with the selected columns.
pub fn parse_columns_from_params_to_str( params: &HashMap<String, String> ) -> Option<Vec<String>> {
// Parse columns from params
if let Some(cols) = params.get("columns") {
let cols = cols.split(",").collect::<Vec<_>>();
let select_cols = cols.iter().map(|x| x.to_string()).collect::<Vec<_>>();
return Some(select_cols);
}
None
}

/// # Arguments
///
/// * `params` - A reference to a HashMap of parameters containing 'filters' key.
///
/// # Returns
///
/// A vector of tuples containing the column name, the comparison operator and the value to compare.
pub fn parse_filters(params: &HashMap<String, String>) -> Option<Vec<(&str, &str, &str)>> {
let mut filters = Vec::new();
if let Some(query) = params.get("filters") {
filters = query.split(",").collect::<Vec<_>>();
}

if filters.len() == 0 {
return None
}

let re = Regex::new(r"([a-zA-Z_]+)([<>=]+)([-+]?[0-9]*\.?[0-9]*)").unwrap();
let mut filter_vec = Vec::new();
for filter in filters {
let f_vec = re.captures(filter).unwrap();
filter_vec.push((f_vec.get(1).unwrap().as_str(), f_vec.get(2).unwrap().as_str(), f_vec.get(3).unwrap().as_str()));
}

Some(filter_vec)
}
Loading