Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion datafusion/functions/src/encoding/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use base64::{
Engine as _,
};
use datafusion_common::{
cast::{as_generic_binary_array, as_generic_string_array},
cast::{
as_fixed_size_binary_array, as_generic_binary_array, as_generic_string_array,
},
not_impl_err, plan_err,
utils::take_function_args,
};
Expand Down Expand Up @@ -246,6 +248,9 @@ fn encode_process(value: &ColumnarValue, encoding: Encoding) -> Result<ColumnarV
DataType::Utf8View => encoding.encode_utf8_array::<i32>(a.as_ref()),
DataType::Binary => encoding.encode_binary_array::<i32>(a.as_ref()),
DataType::LargeBinary => encoding.encode_binary_array::<i64>(a.as_ref()),
DataType::FixedSizeBinary(_) => {
encoding.encode_fixed_size_binary_array(a.as_ref())
}
other => exec_err!(
"Unsupported data type {other:?} for function encode({encoding})"
),
Expand All @@ -265,6 +270,9 @@ fn encode_process(value: &ColumnarValue, encoding: Encoding) -> Result<ColumnarV
),
ScalarValue::LargeBinary(a) => Ok(encoding
.encode_large_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
ScalarValue::FixedSizeBinary(_, a) => Ok(
encoding.encode_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))
),
other => exec_err!(
"Unsupported data type {other:?} for function encode({encoding})"
),
Expand Down Expand Up @@ -401,6 +409,15 @@ impl Encoding {
Ok(ColumnarValue::Array(array))
}

fn encode_fixed_size_binary_array(self, value: &dyn Array) -> Result<ColumnarValue> {
let input_value = as_fixed_size_binary_array(value)?;
let array: ArrayRef = match self {
Self::Base64 => encode_to_array!(base64_encode, input_value),
Self::Hex => encode_to_array!(hex_encode, input_value),
};
Ok(ColumnarValue::Array(array))
}

fn encode_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
where
T: OffsetSizeTrait,
Expand Down Expand Up @@ -553,3 +570,29 @@ fn decode(args: &[ColumnarValue]) -> Result<ColumnarValue> {
}?;
decode_process(expression, encoding)
}

#[cfg(test)]
mod tests {
#[test]
fn test_encode_fsb() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be added as an SLT test instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? SLT tests are quite unapproachable for those who don't work in datafusion regularly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case it is very necessary, because it reveals that the signature of the function doesn't actually accept FixedSizeBinary arrays, something calling the function directly in code like this wouldn't make obvious.

While SLTs can be harder to write, we prefer it because they are less verbose, and they can test functions with a more stable interface (SQL) rather than their internal APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something you would be able to help out with?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at the README and see some other tests for reference:

# Arrays tests
query T
SELECT encode(bin_field, 'hex') FROM test ORDER BY num;
----
616263
7177657177
NULL
8f50d3f60eae370ddbf85c86219c55108a350165

arrow_cast(column1, 'FixedSizeBinary(3)') as "column1",
arrow_cast(column2, 'FixedSizeBinary(3)') as "column2"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I don't have the bandwidth to dig into SLT tests at this time. I have provided this MR, it contains tests, and in the past this has been sufficient for Datafusion. You may take it from here as you wish.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can tell this PR doesn't actually fix anything; have you tested this according to your use case for the issue you raised? (And I don't mean the unit tests you introduced here because those are internal tests)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can tell this PR doesn't actually fix anything

How so? Before, calling encode on fixedsizebinary types would have returned an unsupported data type error.

We've been using in production since April when we first made the change: urbanlogiq@9dfd561

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are some possible tests:

# test for FixedSizeBinary support for encode
statement ok
CREATE TABLE test_fsb AS 
SELECT arrow_cast(X'0123456789ABCDEF', 'FixedSizeBinary(8)') as fsb_col;

query TT
SELECT 
  encode(fsb_col, 'base64') AS fsb_base64,
  encode(fsb_col, 'hex') AS fsb_hex
FROM test_fsb;
----
ASNFZ4mrze8 0123456789abcdef

# Test with NULL
query T
SELECT encode(arrow_cast(NULL, 'FixedSizeBinary(8)'), 'base64');
----
NULL

use super::*;

let value = vec![0u8; 16];
let array = arrow::array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
vec![Some(value)].into_iter(),
16,
)
.unwrap();
let value = ColumnarValue::Array(Arc::new(array));

let ColumnarValue::Array(result) =
encode_process(&value, Encoding::Base64).unwrap()
else {
panic!("unexpected value");
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To cover the missing usage of return_type and coerce_types you need something like:

let encode_func = EncodeFunc::new();
    let args = vec![
        ColumnarValue::Array(Arc::new(array)),
        ColumnarValue::Scalar(ScalarValue::Utf8(Some("base64".to_string()))),
    ];
    
    // This will test the full path including type checking
    let result = encode_func.invoke_with_args(datafusion_expr::ScalarFunctionArgs {
        args,
        number_rows: 1,
        return_type: &DataType::Utf8,
    }).unwrap();
    
    let ColumnarValue::Array(result) = result else {
        panic!("unexpected value");
    };
    ...


let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
let result_value = string_array.value(0);
assert_eq!(result_value, "AAAAAAAAAAAAAAAAAAAAAA");
}
}