Skip to content

Commit

Permalink
Upgrading to polars 0.35 (nushell#11241)
Browse files Browse the repository at this point in the history
Co-authored-by: Jack Wright <jack.wright@disqo.com>
  • Loading branch information
2 people authored and dmatos2012 committed Feb 20, 2024
1 parent f8cbbfc commit 0cda1fd
Show file tree
Hide file tree
Showing 14 changed files with 529 additions and 349 deletions.
750 changes: 448 additions & 302 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions crates/nu-cmd-dataframe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ nu-protocol = { path = "../nu-protocol", version = "0.87.2" }

# Potential dependencies for extras
chrono = { version = "0.4", features = ["std", "unstable-locales"], default-features = false }
fancy-regex = "0.11"
chrono-tz = "0.8"
fancy-regex = "0.12"
indexmap = { version = "2.1" }
num = { version = "0.4", optional = true }
serde = { version = "1.0", features = ["derive"] }
sqlparser = { version = "0.36.1", optional = true }
polars-io = { version = "0.33", features = ["avro"], optional = true }
sqlparser = { version = "0.39", optional = true }
polars-io = { version = "0.35", features = ["avro"], optional = true }
polars-arrow = "0.35"
polars-ops = "0.35"
polars-plan = "0.35"

[dependencies.polars]
features = [
Expand Down Expand Up @@ -58,7 +62,7 @@ features = [
"to_dummies",
]
optional = true
version = "0.33"
version = "0.35"

[features]
dataframe = ["num", "polars", "polars-io", "sqlparser"]
Expand Down
5 changes: 3 additions & 2 deletions crates/nu-cmd-dataframe/src/dataframe/eager/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ fn from_parquet(
low_memory: false,
cloud_options: None,
use_statistics: false,
hive_partitioning: false,
};

let df: NuLazyFrame = LazyFrame::scan_parquet(file, args)
Expand Down Expand Up @@ -411,7 +412,7 @@ fn from_csv(
Some(d) => d as u8,
None => unreachable!(),
};
csv_reader.with_delimiter(delimiter)
csv_reader.with_separator(delimiter)
}
}
};
Expand Down Expand Up @@ -472,7 +473,7 @@ fn from_csv(
Some(d) => d as u8,
None => unreachable!(),
};
csv_reader.with_delimiter(delimiter)
csv_reader.with_separator(delimiter)
}
}
};
Expand Down
8 changes: 5 additions & 3 deletions crates/nu-cmd-dataframe/src/dataframe/eager/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use nu_protocol::{
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Spanned, SyntaxShape, Type,
};
use polars::prelude::NamedFrom;
use polars::series::Series;

use super::super::values::NuDataFrame;

Expand Down Expand Up @@ -81,7 +83,7 @@ fn command(
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let rows: Option<Spanned<usize>> = call.get_flag(engine_state, stack, "n-rows")?;
let rows: Option<Spanned<i64>> = call.get_flag(engine_state, stack, "n-rows")?;
let fraction: Option<Spanned<f64>> = call.get_flag(engine_state, stack, "fraction")?;
let seed: Option<u64> = call
.get_flag::<i64>(engine_state, stack, "seed")?
Expand All @@ -94,7 +96,7 @@ fn command(
match (rows, fraction) {
(Some(rows), None) => df
.as_ref()
.sample_n(rows.item, replace, shuffle, seed)
.sample_n(&Series::new("s", &[rows.item]), replace, shuffle, seed)
.map_err(|e| {
ShellError::GenericError(
"Error creating sample".into(),
Expand All @@ -106,7 +108,7 @@ fn command(
}),
(None, Some(frac)) => df
.as_ref()
.sample_frac(frac.item, replace, shuffle, seed)
.sample_frac(&Series::new("frac", &[frac.item]), replace, shuffle, seed)
.map_err(|e| {
ShellError::GenericError(
"Error creating sample".into(),
Expand Down
12 changes: 9 additions & 3 deletions crates/nu-cmd-dataframe/src/dataframe/eager/sql_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::dataframe::eager::sql_expr::parse_sql_expr;
use polars::error::{ErrString, PolarsError};
use polars::prelude::{col, DataFrame, DataType, IntoLazy, LazyFrame};
use sqlparser::ast::{
Expr as SqlExpr, Select, SelectItem, SetExpr, Statement, TableFactor, Value as SQLValue,
Expr as SqlExpr, GroupByExpr, Select, SelectItem, SetExpr, Statement, TableFactor,
Value as SQLValue,
};
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
Expand Down Expand Up @@ -96,8 +97,13 @@ impl SQLContext {
.collect::<Result<Vec<_>, PolarsError>>()?;
// Check for group by
// After projection since there might be number.
let group_by = select_stmt
.group_by
let group_by = match &select_stmt.group_by {
GroupByExpr::All =>
Err(
PolarsError::ComputeError("Group-By Error: Only positive number or expression are supported, not all".into())
)?,
GroupByExpr::Expressions(expressions) => expressions
}
.iter()
.map(
|e|match e {
Expand Down
21 changes: 14 additions & 7 deletions crates/nu-cmd-dataframe/src/dataframe/eager/sql_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use polars::error::PolarsError;
use polars::prelude::{col, lit, DataType, Expr, LiteralValue, PolarsResult as Result, TimeUnit};

use sqlparser::ast::{
BinaryOperator as SQLBinaryOperator, DataType as SQLDataType, Expr as SqlExpr,
Function as SQLFunction, Value as SqlValue, WindowType,
ArrayElemTypeDef, BinaryOperator as SQLBinaryOperator, DataType as SQLDataType,
Expr as SqlExpr, Function as SQLFunction, Value as SqlValue, WindowType,
};

fn map_sql_polars_datatype(data_type: &SQLDataType) -> Result<DataType> {
Expand All @@ -13,7 +13,7 @@ fn map_sql_polars_datatype(data_type: &SQLDataType) -> Result<DataType> {
| SQLDataType::Uuid
| SQLDataType::Clob(_)
| SQLDataType::Text
| SQLDataType::String => DataType::Utf8,
| SQLDataType::String(_) => DataType::Utf8,
SQLDataType::Float(_) => DataType::Float32,
SQLDataType::Real => DataType::Float32,
SQLDataType::Double => DataType::Float64,
Expand All @@ -31,9 +31,12 @@ fn map_sql_polars_datatype(data_type: &SQLDataType) -> Result<DataType> {
SQLDataType::Time(_, _) => DataType::Time,
SQLDataType::Timestamp(_, _) => DataType::Datetime(TimeUnit::Microseconds, None),
SQLDataType::Interval => DataType::Duration(TimeUnit::Microseconds),
SQLDataType::Array(inner_type) => match inner_type {
Some(inner_type) => DataType::List(Box::new(map_sql_polars_datatype(inner_type)?)),
None => {
SQLDataType::Array(array_type_def) => match array_type_def {
ArrayElemTypeDef::AngleBracket(inner_type)
| ArrayElemTypeDef::SquareBracket(inner_type) => {
DataType::List(Box::new(map_sql_polars_datatype(inner_type)?))
}
_ => {
return Err(PolarsError::ComputeError(
"SQL Datatype Array(None) was not supported in polars-sql yet!".into(),
))
Expand Down Expand Up @@ -114,7 +117,11 @@ pub fn parse_sql_expr(expr: &SqlExpr) -> Result<Expr> {
binary_op_(left, right, op)?
}
SqlExpr::Function(sql_function) => parse_sql_function(sql_function)?,
SqlExpr::Cast { expr, data_type } => cast_(parse_sql_expr(expr)?, data_type)?,
SqlExpr::Cast {
expr,
data_type,
format: _,
} => cast_(parse_sql_expr(expr)?, data_type)?,
SqlExpr::Nested(expr) => parse_sql_expr(expr)?,
SqlExpr::Value(value) => literal_expr(value)?,
_ => {
Expand Down
6 changes: 3 additions & 3 deletions crates/nu-cmd-dataframe/src/dataframe/eager/to_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ fn command(
let writer = CsvWriter::new(&mut file);

let writer = if no_header {
writer.has_header(false)
writer.include_header(false)
} else {
writer.has_header(true)
writer.include_header(true)
};

let mut writer = match delimiter {
Expand All @@ -109,7 +109,7 @@ fn command(
None => unreachable!(),
};

writer.with_delimiter(delimiter)
writer.with_separator(delimiter)
}
}
};
Expand Down
3 changes: 2 additions & 1 deletion crates/nu-cmd-dataframe/src/dataframe/lazy/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ fn get_col_name(expr: &Expr) -> Option<String> {
| Expr::Slice { input: expr, .. }
| Expr::Cast { expr, .. }
| Expr::Sort { expr, .. }
| Expr::Take { expr, .. }
| Expr::Gather { expr, .. }
| Expr::SortBy { expr, .. }
| Expr::Exclude(expr, _)
| Expr::Alias(expr, _)
Expand All @@ -189,6 +189,7 @@ fn get_col_name(expr: &Expr) -> Option<String> {
| Expr::RenameAlias { .. }
| Expr::Count
| Expr::Nth(_)
| Expr::SubPlan(_, _)
| Expr::Selector(_) => None,
}
}
Expand Down
18 changes: 14 additions & 4 deletions crates/nu-cmd-dataframe/src/dataframe/series/cumulative.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use nu_protocol::{
Value,
};
use polars::prelude::{DataType, IntoSeries};
use polars_ops::prelude::{cum_max, cum_min, cum_sum};

enum CumType {
Min,
Expand Down Expand Up @@ -119,10 +120,19 @@ fn command(

let cum_type = CumType::from_str(&cum_type.item, cum_type.span)?;
let mut res = match cum_type {
CumType::Max => series.cummax(reverse),
CumType::Min => series.cummin(reverse),
CumType::Sum => series.cumsum(reverse),
};
CumType::Max => cum_max(&series, reverse),
CumType::Min => cum_min(&series, reverse),
CumType::Sum => cum_sum(&series, reverse),
}
.map_err(|e| {
ShellError::GenericError(
"Error creating cumulative".into(),
e.to_string(),
Some(call.head),
None,
Vec::new(),
)
})?;

let name = format!("{}_{}", series.name(), cum_type.to_str());
res.rename(&name);
Expand Down
4 changes: 3 additions & 1 deletion crates/nu-cmd-dataframe/src/dataframe/series/shift.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use nu_protocol::{
Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Type, Value,
};

use polars_plan::prelude::lit;

#[derive(Clone)]
pub struct Shift;

Expand Down Expand Up @@ -98,7 +100,7 @@ fn command_lazy(
let lazy: NuLazyFrame = match fill {
Some(fill) => {
let expr = NuExpression::try_from_value(fill)?.into_polars();
lazy.shift_and_fill(shift, expr).into()
lazy.shift_and_fill(lit(shift), expr).into()
}
None => lazy.shift(shift).into(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fn command(
)
})?;

let res = chunked.as_ref().str_lengths().into_series();
let res = chunked.as_ref().str_len_bytes().into_series();

NuDataFrame::try_from_series(vec![res], call.head)
.map(|df| PipelineData::Value(NuDataFrame::into_value(df, call.head), None))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,7 @@ fn command(
)
})?;

let mut res = chunked.str_slice(start, length).map_err(|e| {
ShellError::GenericError(
"Error slicing series".into(),
e.to_string(),
Some(call.head),
None,
Vec::new(),
)
})?;
let mut res = chunked.str_slice(start, length);
res.rename(series.name());

NuDataFrame::try_from_series(vec![res.into_series()], call.head)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub use operations::Axis;
use indexmap::map::IndexMap;
use nu_protocol::{did_you_mean, PipelineData, Record, ShellError, Span, Value};
use polars::prelude::{DataFrame, DataType, IntoLazy, LazyFrame, PolarsObject, Series};
use polars_arrow::util::total_ord::TotalEq;
use serde::{Deserialize, Serialize};
use std::{cmp::Ordering, fmt::Display, hash::Hasher};

Expand Down Expand Up @@ -61,6 +62,12 @@ impl std::hash::Hash for DataFrameValue {
}
}

impl TotalEq for DataFrameValue {
fn tot_eq(&self, other: &Self) -> bool {
self == other
}
}

impl PolarsObject for DataFrameValue {
fn type_name() -> &'static str {
"object"
Expand Down
20 changes: 11 additions & 9 deletions crates/nu-cmd-dataframe/src/dataframe/values/nu_expression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,11 @@ pub fn expr_to_value(expr: &Expr, span: Span) -> Result<Value, ShellError> {
},
span,
)),
Expr::Take { expr, idx } => Ok(Value::record(
Expr::Gather {
expr,
idx,
returns_scalar: _,
} => Ok(Value::record(
record! {
"expr" => expr_to_value(expr.as_ref(), span)?,
"idx" => expr_to_value(idx.as_ref(), span)?,
Expand Down Expand Up @@ -401,30 +405,28 @@ pub fn expr_to_value(expr: &Expr, span: Span) -> Result<Value, ShellError> {
Expr::Window {
function,
partition_by,
order_by,
options,
} => {
let partition_by: Result<Vec<Value>, ShellError> = partition_by
.iter()
.map(|e| expr_to_value(e, span))
.collect();

let order_by = order_by
.as_ref()
.map(|e| expr_to_value(e.as_ref(), span))
.transpose()?
.unwrap_or_else(|| Value::nothing(span));

Ok(Value::record(
record! {
"function" => expr_to_value(function, span)?,
"partition_by" => Value::list(partition_by?, span),
"order_by" => order_by,
"options" => Value::string(format!("{options:?}"), span),
},
span,
))
}
Expr::SubPlan(_, _) => Err(ShellError::UnsupportedInput {
msg: "Expressions of type SubPlan are not yet supported".to_string(),
input: format!("Expression is {expr:?}"),
msg_span: span,
input_span: Span::unknown(),
}),
// the parameter polars_plan::dsl::selector::Selector is not publicly exposed.
// I am not sure what we can meaningfully do with this at this time.
Expr::Selector(_) => Err(ShellError::UnsupportedInput {
Expand Down

0 comments on commit 0cda1fd

Please sign in to comment.