Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ def test_valid_window_frame(units, start_bound, end_bound):
],
)
def test_invalid_window_frame(units, start_bound, end_bound):
with pytest.raises(RuntimeError):
with pytest.raises(NotImplementedError, match=f"(?i){units}"):
WindowFrame(units, start_bound, end_bound)


Expand Down
251 changes: 94 additions & 157 deletions src/common/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

use datafusion::arrow::array::Array;
use datafusion::arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
use datafusion::common::{DataFusionError, ScalarValue};
use datafusion::common::ScalarValue;
use datafusion::logical_expr::sqlparser::ast::NullTreatment as DFNullTreatment;
use pyo3::exceptions::PyNotImplementedError;
use pyo3::{exceptions::PyValueError, prelude::*};

use crate::errors::py_datafusion_err;

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
pub struct PyScalarValue(pub ScalarValue);

Expand Down Expand Up @@ -171,9 +170,7 @@ impl DataTypeMap {
PythonType::Datetime,
SqlType::DATE,
)),
DataType::Duration(_) => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{arrow_type:?}"),
))),
DataType::Duration(_) => Err(PyNotImplementedError::new_err(format!("{arrow_type:?}"))),
DataType::Interval(interval_unit) => Ok(DataTypeMap::new(
DataType::Interval(*interval_unit),
PythonType::Datetime,
Expand All @@ -188,9 +185,9 @@ impl DataTypeMap {
PythonType::Bytes,
SqlType::BINARY,
)),
DataType::FixedSizeBinary(_) => Err(py_datafusion_err(
DataFusionError::NotImplemented(format!("{arrow_type:?}")),
)),
DataType::FixedSizeBinary(_) => {
Err(PyNotImplementedError::new_err(format!("{arrow_type:?}")))
}
DataType::LargeBinary => Ok(DataTypeMap::new(
DataType::LargeBinary,
PythonType::Bytes,
Expand All @@ -206,24 +203,18 @@ impl DataTypeMap {
PythonType::Str,
SqlType::VARCHAR,
)),
DataType::List(_) => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{arrow_type:?}"
)))),
DataType::FixedSizeList(_, _) => Err(py_datafusion_err(
DataFusionError::NotImplemented(format!("{arrow_type:?}")),
)),
DataType::LargeList(_) => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{arrow_type:?}"),
))),
DataType::Struct(_) => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{arrow_type:?}"),
))),
DataType::Union(_, _) => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{arrow_type:?}"),
))),
DataType::Dictionary(_, _) => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{arrow_type:?}"),
))),
DataType::List(_) => Err(PyNotImplementedError::new_err(format!("{arrow_type:?}"))),
DataType::FixedSizeList(_, _) => {
Err(PyNotImplementedError::new_err(format!("{arrow_type:?}")))
}
DataType::LargeList(_) => {
Err(PyNotImplementedError::new_err(format!("{arrow_type:?}")))
}
DataType::Struct(_) => Err(PyNotImplementedError::new_err(format!("{arrow_type:?}"))),
DataType::Union(_, _) => Err(PyNotImplementedError::new_err(format!("{arrow_type:?}"))),
DataType::Dictionary(_, _) => {
Err(PyNotImplementedError::new_err(format!("{arrow_type:?}")))
}
DataType::Decimal128(precision, scale) => Ok(DataTypeMap::new(
DataType::Decimal128(*precision, *scale),
PythonType::Float,
Expand All @@ -234,24 +225,16 @@ impl DataTypeMap {
PythonType::Float,
SqlType::DECIMAL,
)),
DataType::Map(_, _) => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{arrow_type:?}"),
))),
DataType::RunEndEncoded(_, _) => Err(py_datafusion_err(
DataFusionError::NotImplemented(format!("{arrow_type:?}")),
)),
DataType::BinaryView => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{arrow_type:?}"),
))),
DataType::Utf8View => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{arrow_type:?}"
)))),
DataType::ListView(_) => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{arrow_type:?}"),
))),
DataType::LargeListView(_) => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{arrow_type:?}"),
))),
DataType::Map(_, _) => Err(PyNotImplementedError::new_err(format!("{arrow_type:?}"))),
DataType::RunEndEncoded(_, _) => {
Err(PyNotImplementedError::new_err(format!("{arrow_type:?}")))
}
DataType::BinaryView => Err(PyNotImplementedError::new_err(format!("{arrow_type:?}"))),
DataType::Utf8View => Err(PyNotImplementedError::new_err(format!("{arrow_type:?}"))),
DataType::ListView(_) => Err(PyNotImplementedError::new_err(format!("{arrow_type:?}"))),
DataType::LargeListView(_) => {
Err(PyNotImplementedError::new_err(format!("{arrow_type:?}")))
}
}
}

Expand Down Expand Up @@ -317,33 +300,33 @@ impl DataTypeMap {
Ok(DataType::Interval(IntervalUnit::MonthDayNano))
}
ScalarValue::List(arr) => Ok(arr.data_type().to_owned()),
ScalarValue::Struct(_fields) => Err(py_datafusion_err(
DataFusionError::NotImplemented("ScalarValue::Struct".to_string()),
ScalarValue::Struct(_fields) => Err(PyNotImplementedError::new_err(
"ScalarValue::Struct".to_string(),
)),
ScalarValue::FixedSizeBinary(size, _) => Ok(DataType::FixedSizeBinary(*size)),
ScalarValue::FixedSizeList(_array_ref) => {
// The FieldRef was removed from ScalarValue::FixedSizeList in
// https://github.com/apache/arrow-datafusion/pull/8221, so we can no
// longer convert back to a DataType here
Err(py_datafusion_err(DataFusionError::NotImplemented(
Err(PyNotImplementedError::new_err(
"ScalarValue::FixedSizeList".to_string(),
)))
))
}
ScalarValue::LargeList(_) => Err(py_datafusion_err(DataFusionError::NotImplemented(
ScalarValue::LargeList(_) => Err(PyNotImplementedError::new_err(
"ScalarValue::LargeList".to_string(),
))),
)),
ScalarValue::DurationSecond(_) => Ok(DataType::Duration(TimeUnit::Second)),
ScalarValue::DurationMillisecond(_) => Ok(DataType::Duration(TimeUnit::Millisecond)),
ScalarValue::DurationMicrosecond(_) => Ok(DataType::Duration(TimeUnit::Microsecond)),
ScalarValue::DurationNanosecond(_) => Ok(DataType::Duration(TimeUnit::Nanosecond)),
ScalarValue::Union(_, _, _) => Err(py_datafusion_err(DataFusionError::NotImplemented(
ScalarValue::Union(_, _, _) => Err(PyNotImplementedError::new_err(
"ScalarValue::LargeList".to_string(),
))),
)),
ScalarValue::Utf8View(_) => Ok(DataType::Utf8View),
ScalarValue::BinaryView(_) => Ok(DataType::BinaryView),
ScalarValue::Map(_) => Err(py_datafusion_err(DataFusionError::NotImplemented(
ScalarValue::Map(_) => Err(PyNotImplementedError::new_err(
"ScalarValue::Map".to_string(),
))),
)),
}
}
}
Expand Down Expand Up @@ -400,12 +383,8 @@ impl DataTypeMap {
#[pyo3(name = "sql")]
pub fn py_map_from_sql_type(sql_type: &SqlType) -> PyResult<DataTypeMap> {
match sql_type {
SqlType::ANY => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{sql_type:?}"
)))),
SqlType::ARRAY => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{sql_type:?}"
)))),
SqlType::ANY => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::ARRAY => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::BIGINT => Ok(DataTypeMap::new(
DataType::Int64,
PythonType::Int,
Expand All @@ -426,12 +405,8 @@ impl DataTypeMap {
PythonType::Int,
SqlType::CHAR,
)),
SqlType::COLUMN_LIST => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{sql_type:?}"),
))),
SqlType::CURSOR => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{sql_type:?}"
)))),
SqlType::COLUMN_LIST => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::CURSOR => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::DATE => Ok(DataTypeMap::new(
DataType::Date64,
PythonType::Datetime,
Expand All @@ -442,126 +417,88 @@ impl DataTypeMap {
PythonType::Float,
SqlType::DECIMAL,
)),
SqlType::DISTINCT => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{sql_type:?}"
)))),
SqlType::DISTINCT => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::DOUBLE => Ok(DataTypeMap::new(
DataType::Decimal256(1, 1),
PythonType::Float,
SqlType::DOUBLE,
)),
SqlType::DYNAMIC_STAR => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{sql_type:?}"),
))),
SqlType::DYNAMIC_STAR => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::FLOAT => Ok(DataTypeMap::new(
DataType::Decimal128(1, 1),
PythonType::Float,
SqlType::FLOAT,
)),
SqlType::GEOMETRY => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{sql_type:?}"
)))),
SqlType::GEOMETRY => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::INTEGER => Ok(DataTypeMap::new(
DataType::Int8,
PythonType::Int,
SqlType::INTEGER,
)),
SqlType::INTERVAL => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{sql_type:?}"
)))),
SqlType::INTERVAL_DAY => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{sql_type:?}"),
))),
SqlType::INTERVAL_DAY_HOUR => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{sql_type:?}"),
))),
SqlType::INTERVAL_DAY_MINUTE => Err(py_datafusion_err(
DataFusionError::NotImplemented(format!("{sql_type:?}")),
)),
SqlType::INTERVAL_DAY_SECOND => Err(py_datafusion_err(
DataFusionError::NotImplemented(format!("{sql_type:?}")),
)),
SqlType::INTERVAL_HOUR => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{sql_type:?}"),
))),
SqlType::INTERVAL_HOUR_MINUTE => Err(py_datafusion_err(
DataFusionError::NotImplemented(format!("{sql_type:?}")),
)),
SqlType::INTERVAL_HOUR_SECOND => Err(py_datafusion_err(
DataFusionError::NotImplemented(format!("{sql_type:?}")),
)),
SqlType::INTERVAL_MINUTE => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{sql_type:?}"),
))),
SqlType::INTERVAL_MINUTE_SECOND => Err(py_datafusion_err(
DataFusionError::NotImplemented(format!("{sql_type:?}")),
)),
SqlType::INTERVAL_MONTH => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{sql_type:?}"),
))),
SqlType::INTERVAL_SECOND => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{sql_type:?}"),
))),
SqlType::INTERVAL_YEAR => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{sql_type:?}"),
))),
SqlType::INTERVAL_YEAR_MONTH => Err(py_datafusion_err(
DataFusionError::NotImplemented(format!("{sql_type:?}")),
)),
SqlType::MAP => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{sql_type:?}"
)))),
SqlType::MULTISET => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{sql_type:?}"
)))),
SqlType::INTERVAL => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::INTERVAL_DAY => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::INTERVAL_DAY_HOUR => {
Err(PyNotImplementedError::new_err(format!("{sql_type:?}")))
}
SqlType::INTERVAL_DAY_MINUTE => {
Err(PyNotImplementedError::new_err(format!("{sql_type:?}")))
}
SqlType::INTERVAL_DAY_SECOND => {
Err(PyNotImplementedError::new_err(format!("{sql_type:?}")))
}
SqlType::INTERVAL_HOUR => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::INTERVAL_HOUR_MINUTE => {
Err(PyNotImplementedError::new_err(format!("{sql_type:?}")))
}
SqlType::INTERVAL_HOUR_SECOND => {
Err(PyNotImplementedError::new_err(format!("{sql_type:?}")))
}
SqlType::INTERVAL_MINUTE => {
Err(PyNotImplementedError::new_err(format!("{sql_type:?}")))
}
SqlType::INTERVAL_MINUTE_SECOND => {
Err(PyNotImplementedError::new_err(format!("{sql_type:?}")))
}
SqlType::INTERVAL_MONTH => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::INTERVAL_SECOND => {
Err(PyNotImplementedError::new_err(format!("{sql_type:?}")))
}
SqlType::INTERVAL_YEAR => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::INTERVAL_YEAR_MONTH => {
Err(PyNotImplementedError::new_err(format!("{sql_type:?}")))
}
SqlType::MAP => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::MULTISET => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::NULL => Ok(DataTypeMap::new(
DataType::Null,
PythonType::None,
SqlType::NULL,
)),
SqlType::OTHER => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{sql_type:?}"
)))),
SqlType::REAL => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{sql_type:?}"
)))),
SqlType::ROW => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{sql_type:?}"
)))),
SqlType::SARG => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{sql_type:?}"
)))),
SqlType::OTHER => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::REAL => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::ROW => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::SARG => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::SMALLINT => Ok(DataTypeMap::new(
DataType::Int16,
PythonType::Int,
SqlType::SMALLINT,
)),
SqlType::STRUCTURED => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{sql_type:?}"),
))),
SqlType::SYMBOL => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{sql_type:?}"
)))),
SqlType::TIME => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{sql_type:?}"
)))),
SqlType::TIME_WITH_LOCAL_TIME_ZONE => Err(py_datafusion_err(
DataFusionError::NotImplemented(format!("{sql_type:?}")),
)),
SqlType::TIMESTAMP => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{sql_type:?}"
)))),
SqlType::TIMESTAMP_WITH_LOCAL_TIME_ZONE => Err(py_datafusion_err(
DataFusionError::NotImplemented(format!("{sql_type:?}")),
)),
SqlType::STRUCTURED => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::SYMBOL => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::TIME => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::TIME_WITH_LOCAL_TIME_ZONE => {
Err(PyNotImplementedError::new_err(format!("{sql_type:?}")))
}
SqlType::TIMESTAMP => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::TIMESTAMP_WITH_LOCAL_TIME_ZONE => {
Err(PyNotImplementedError::new_err(format!("{sql_type:?}")))
}
SqlType::TINYINT => Ok(DataTypeMap::new(
DataType::Int8,
PythonType::Int,
SqlType::TINYINT,
)),
SqlType::UNKNOWN => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{sql_type:?}"
)))),
SqlType::UNKNOWN => Err(PyNotImplementedError::new_err(format!("{sql_type:?}"))),
SqlType::VARBINARY => Ok(DataTypeMap::new(
DataType::LargeBinary,
PythonType::Bytes,
Expand Down
Loading
Loading