diff --git a/datafusion/functions/src/encoding/inner.rs b/datafusion/functions/src/encoding/inner.rs index e5314ad220c8..14778a3670dd 100644 --- a/datafusion/functions/src/encoding/inner.rs +++ b/datafusion/functions/src/encoding/inner.rs @@ -19,9 +19,10 @@ use arrow::{ array::{ - Array, ArrayRef, BinaryArray, GenericByteArray, OffsetSizeTrait, StringArray, + Array, ArrayRef, AsArray, BinaryArrayType, GenericBinaryArray, + GenericStringArray, OffsetSizeTrait, }, - datatypes::{ByteArrayType, DataType}, + datatypes::DataType, }; use arrow_buffer::{Buffer, OffsetBufferBuilder}; use base64::{ @@ -29,19 +30,20 @@ use base64::{ Engine as _, }; use datafusion_common::{ - cast::{as_generic_binary_array, as_generic_string_array}, - not_impl_err, plan_err, + exec_datafusion_err, exec_err, internal_datafusion_err, internal_err, not_impl_err, + plan_err, + types::{logical_binary, logical_string, NativeType}, utils::take_function_args, + DataFusionError, Result, ScalarValue, +}; +use datafusion_expr::{ + Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, + TypeSignatureClass, Volatility, }; -use datafusion_common::{exec_err, internal_datafusion_err, ScalarValue}; -use datafusion_common::{DataFusionError, Result}; -use datafusion_expr::{ColumnarValue, Documentation}; -use std::sync::Arc; -use std::{fmt, str::FromStr}; - -use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; use datafusion_macros::user_doc; use std::any::Any; +use std::fmt; +use std::sync::Arc; // Allow padding characters, but don't require them, and don't generate them. const BASE64_ENGINE: GeneralPurpose = GeneralPurpose::new( @@ -79,7 +81,17 @@ impl Default for EncodeFunc { impl EncodeFunc { pub fn new() -> Self { Self { - signature: Signature::user_defined(Volatility::Immutable), + signature: Signature::coercible( + vec![ + Coercion::new_implicit( + TypeSignatureClass::Native(logical_binary()), + vec![TypeSignatureClass::Native(logical_string())], + NativeType::Binary, + ), + Coercion::new_exact(TypeSignatureClass::Native(logical_string())), + ], + Volatility::Immutable, + ), } } } @@ -88,6 +100,7 @@ impl ScalarUDFImpl for EncodeFunc { fn as_any(&self) -> &dyn Any { self } + fn name(&self) -> &str { "encode" } @@ -97,48 +110,21 @@ impl ScalarUDFImpl for EncodeFunc { } fn return_type(&self, arg_types: &[DataType]) -> Result { - use DataType::*; - - Ok(match arg_types[0] { - Utf8 => Utf8, - LargeUtf8 => LargeUtf8, - Utf8View => Utf8, - Binary => Utf8, - LargeBinary => LargeUtf8, - Null => Null, - _ => { - return plan_err!( - "The encode function can only accept Utf8 or Binary or Null." - ); - } - }) - } - - fn invoke_with_args( - &self, - args: datafusion_expr::ScalarFunctionArgs, - ) -> Result { - encode(&args.args) - } - - fn coerce_types(&self, arg_types: &[DataType]) -> Result> { - let [expression, format] = take_function_args(self.name(), arg_types)?; - - if format != &DataType::Utf8 { - return Err(DataFusionError::Plan("2nd argument should be Utf8".into())); + match &arg_types[0] { + DataType::LargeBinary => Ok(DataType::LargeUtf8), + _ => Ok(DataType::Utf8), } + } + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let [expression, encoding] = take_function_args("encode", &args.args)?; + let encoding = Encoding::try_from(encoding)?; match expression { - DataType::Utf8 | DataType::Utf8View | DataType::Null => { - Ok(vec![DataType::Utf8; 2]) + _ if expression.data_type().is_null() => { + Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None))) } - DataType::LargeUtf8 => Ok(vec![DataType::LargeUtf8, DataType::Utf8]), - DataType::Binary => Ok(vec![DataType::Binary, DataType::Utf8]), - DataType::LargeBinary => Ok(vec![DataType::LargeBinary, DataType::Utf8]), - _ => plan_err!( - "1st argument should be Utf8 or Binary or Null, got {:?}", - arg_types[0] - ), + ColumnarValue::Array(array) => encode_array(array, encoding), + ColumnarValue::Scalar(scalar) => encode_scalar(scalar, encoding), } } @@ -172,7 +158,17 @@ impl Default for DecodeFunc { impl DecodeFunc { pub fn new() -> Self { Self { - signature: Signature::user_defined(Volatility::Immutable), + signature: Signature::coercible( + vec![ + Coercion::new_implicit( + TypeSignatureClass::Native(logical_binary()), + vec![TypeSignatureClass::Native(logical_string())], + NativeType::Binary, + ), + Coercion::new_exact(TypeSignatureClass::Native(logical_string())), + ], + Volatility::Immutable, + ), } } } @@ -181,6 +177,7 @@ impl ScalarUDFImpl for DecodeFunc { fn as_any(&self) -> &dyn Any { self } + fn name(&self) -> &str { "decode" } @@ -190,40 +187,21 @@ impl ScalarUDFImpl for DecodeFunc { } fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(arg_types[0].to_owned()) - } - - fn invoke_with_args( - &self, - args: datafusion_expr::ScalarFunctionArgs, - ) -> Result { - decode(&args.args) - } - - fn coerce_types(&self, arg_types: &[DataType]) -> Result> { - if arg_types.len() != 2 { - return plan_err!( - "{} expects to get 2 arguments, but got {}", - self.name(), - arg_types.len() - ); - } - - if arg_types[1] != DataType::Utf8 { - return plan_err!("2nd argument should be Utf8"); + match &arg_types[0] { + DataType::LargeBinary => Ok(DataType::LargeBinary), + _ => Ok(DataType::Binary), } + } - match arg_types[0] { - DataType::Utf8 | DataType::Utf8View | DataType::Binary | DataType::Null => { - Ok(vec![DataType::Binary, DataType::Utf8]) - } - DataType::LargeUtf8 | DataType::LargeBinary => { - Ok(vec![DataType::LargeBinary, DataType::Utf8]) + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let [expression, encoding] = take_function_args("decode", &args.args)?; + let encoding = Encoding::try_from(encoding)?; + match expression { + _ if expression.data_type().is_null() => { + Ok(ColumnarValue::Scalar(ScalarValue::Binary(None))) } - _ => plan_err!( - "1st argument should be Utf8 or Binary or Null, got {:?}", - arg_types[0] - ), + ColumnarValue::Array(array) => decode_array(array, encoding), + ColumnarValue::Scalar(scalar) => decode_scalar(scalar, encoding), } } @@ -232,324 +210,286 @@ impl ScalarUDFImpl for DecodeFunc { } } -#[derive(Debug, Copy, Clone)] -enum Encoding { - Base64, - Hex, -} - -fn encode_process(value: &ColumnarValue, encoding: Encoding) -> Result { +fn encode_scalar(value: &ScalarValue, encoding: Encoding) -> Result { match value { - ColumnarValue::Array(a) => match a.data_type() { - DataType::Utf8 => encoding.encode_utf8_array::(a.as_ref()), - DataType::LargeUtf8 => encoding.encode_utf8_array::(a.as_ref()), - DataType::Utf8View => encoding.encode_utf8_array::(a.as_ref()), - DataType::Binary => encoding.encode_binary_array::(a.as_ref()), - DataType::LargeBinary => encoding.encode_binary_array::(a.as_ref()), - other => exec_err!( - "Unsupported data type {other:?} for function encode({encoding})" - ), - }, - ColumnarValue::Scalar(scalar) => { - match scalar { - ScalarValue::Utf8(a) => { - Ok(encoding.encode_scalar(a.as_ref().map(|s: &String| s.as_bytes()))) - } - ScalarValue::LargeUtf8(a) => Ok(encoding - .encode_large_scalar(a.as_ref().map(|s: &String| s.as_bytes()))), - ScalarValue::Utf8View(a) => { - Ok(encoding.encode_scalar(a.as_ref().map(|s: &String| s.as_bytes()))) - } - ScalarValue::Binary(a) => Ok( - encoding.encode_scalar(a.as_ref().map(|v: &Vec| v.as_slice())) - ), - ScalarValue::LargeBinary(a) => Ok(encoding - .encode_large_scalar(a.as_ref().map(|v: &Vec| v.as_slice()))), - other => exec_err!( - "Unsupported data type {other:?} for function encode({encoding})" - ), - } + ScalarValue::Binary(maybe_bytes) | ScalarValue::BinaryView(maybe_bytes) => { + Ok(ColumnarValue::Scalar(ScalarValue::Utf8( + maybe_bytes + .as_ref() + .map(|bytes| encoding.encode_bytes(bytes)), + ))) } - } -} - -fn decode_process(value: &ColumnarValue, encoding: Encoding) -> Result { - match value { - ColumnarValue::Array(a) => match a.data_type() { - DataType::Utf8 => encoding.decode_utf8_array::(a.as_ref()), - DataType::LargeUtf8 => encoding.decode_utf8_array::(a.as_ref()), - DataType::Utf8View => encoding.decode_utf8_array::(a.as_ref()), - DataType::Binary => encoding.decode_binary_array::(a.as_ref()), - DataType::LargeBinary => encoding.decode_binary_array::(a.as_ref()), - other => exec_err!( - "Unsupported data type {other:?} for function decode({encoding})" - ), - }, - ColumnarValue::Scalar(scalar) => { - match scalar { - ScalarValue::Utf8(a) => { - encoding.decode_scalar(a.as_ref().map(|s: &String| s.as_bytes())) - } - ScalarValue::LargeUtf8(a) => encoding - .decode_large_scalar(a.as_ref().map(|s: &String| s.as_bytes())), - ScalarValue::Utf8View(a) => { - encoding.decode_scalar(a.as_ref().map(|s: &String| s.as_bytes())) - } - ScalarValue::Binary(a) => { - encoding.decode_scalar(a.as_ref().map(|v: &Vec| v.as_slice())) - } - ScalarValue::LargeBinary(a) => encoding - .decode_large_scalar(a.as_ref().map(|v: &Vec| v.as_slice())), - other => exec_err!( - "Unsupported data type {other:?} for function decode({encoding})" - ), - } + ScalarValue::LargeBinary(maybe_bytes) => { + Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8( + maybe_bytes + .as_ref() + .map(|bytes| encoding.encode_bytes(bytes)), + ))) } + v => internal_err!("Unexpected value for encode: {v}"), } } -fn hex_encode(input: &[u8]) -> String { - hex::encode(input) +fn encode_array(array: &ArrayRef, encoding: Encoding) -> Result { + let array = match array.data_type() { + DataType::Binary => encoding.encode_array::<_, i32>(&array.as_binary::()), + DataType::BinaryView => encoding.encode_array::<_, i32>(&array.as_binary_view()), + DataType::LargeBinary => { + encoding.encode_array::<_, i64>(&array.as_binary::()) + } + dt => { + internal_err!("Unexpected data type for encode: {dt}") + } + }; + array.map(ColumnarValue::Array) } -fn base64_encode(input: &[u8]) -> String { - BASE64_ENGINE.encode(input) +fn decode_scalar(value: &ScalarValue, encoding: Encoding) -> Result { + match value { + ScalarValue::Binary(maybe_bytes) | ScalarValue::BinaryView(maybe_bytes) => { + Ok(ColumnarValue::Scalar(ScalarValue::Binary( + maybe_bytes + .as_ref() + .map(|x| encoding.decode_bytes(x)) + .transpose()?, + ))) + } + ScalarValue::LargeBinary(maybe_bytes) => { + Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary( + maybe_bytes + .as_ref() + .map(|x| encoding.decode_bytes(x)) + .transpose()?, + ))) + } + v => internal_err!("Unexpected value for decode: {v}"), + } } -fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result { - // only write input / 2 bytes to buf - let out_len = input.len() / 2; - let buf = &mut buf[..out_len]; - hex::decode_to_slice(input, buf) - .map_err(|e| internal_datafusion_err!("Failed to decode from hex: {e}"))?; - Ok(out_len) +/// Estimate how many bytes are actually represented by the array; in case the +/// the array slices it's internal buffer, this returns the byte size of that slice +/// but not the byte size of the entire buffer. +/// +/// This is an estimation only as it can estimate higher if null slots are non-zero +/// sized. +fn estimate_byte_data_size(array: &GenericBinaryArray) -> usize { + let offsets = array.value_offsets(); + // Unwraps are safe as should always have 1 element in offset buffer + let start = *offsets.first().unwrap(); + let end = *offsets.last().unwrap(); + let data_size = end - start; + data_size.as_usize() } -fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result { - BASE64_ENGINE - .decode_slice(input, buf) - .map_err(|e| internal_datafusion_err!("Failed to decode from base64: {e}")) +fn decode_array(array: &ArrayRef, encoding: Encoding) -> Result { + let array = match array.data_type() { + DataType::Binary => { + let array = array.as_binary::(); + encoding.decode_array::<_, i32>(&array, estimate_byte_data_size(array)) + } + DataType::BinaryView => { + let array = array.as_binary_view(); + // Don't know if there is a more strict upper bound we can infer + // for view arrays byte data size. + encoding.decode_array::<_, i32>(&array, array.get_buffer_memory_size()) + } + DataType::LargeBinary => { + let array = array.as_binary::(); + encoding.decode_array::<_, i64>(&array, estimate_byte_data_size(array)) + } + dt => { + internal_err!("Unexpected data type for decode: {dt}") + } + }; + array.map(ColumnarValue::Array) } -macro_rules! encode_to_array { - ($METHOD: ident, $INPUT:expr) => {{ - let utf8_array: StringArray = $INPUT - .iter() - .map(|x| x.map(|x| $METHOD(x.as_ref()))) - .collect(); - Arc::new(utf8_array) - }}; +#[derive(Debug, Copy, Clone)] +enum Encoding { + Base64, + Hex, } -fn decode_to_array( - method: F, - input: &GenericByteArray, - conservative_upper_bound_size: usize, -) -> Result -where - F: Fn(&[u8], &mut [u8]) -> Result, -{ - let mut values = vec![0; conservative_upper_bound_size]; - let mut offsets = OffsetBufferBuilder::new(input.len()); - let mut total_bytes_decoded = 0; - for v in input { - if let Some(v) = v { - let cursor = &mut values[total_bytes_decoded..]; - let decoded = method(v.as_ref(), cursor)?; - total_bytes_decoded += decoded; - offsets.push_length(decoded); - } else { - offsets.push_length(0); - } +impl fmt::Display for Encoding { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", format!("{self:?}").to_lowercase()) } - // We reserved an upper bound size for the values buffer, but we only use the actual size - values.truncate(total_bytes_decoded); - let binary_array = BinaryArray::try_new( - offsets.finish(), - Buffer::from_vec(values), - input.nulls().cloned(), - )?; - Ok(Arc::new(binary_array)) } -impl Encoding { - fn encode_scalar(self, value: Option<&[u8]>) -> ColumnarValue { - ColumnarValue::Scalar(match self { - Self::Base64 => ScalarValue::Utf8(value.map(|v| BASE64_ENGINE.encode(v))), - Self::Hex => ScalarValue::Utf8(value.map(hex::encode)), - }) - } - - fn encode_large_scalar(self, value: Option<&[u8]>) -> ColumnarValue { - ColumnarValue::Scalar(match self { - Self::Base64 => { - ScalarValue::LargeUtf8(value.map(|v| BASE64_ENGINE.encode(v))) +impl TryFrom<&ColumnarValue> for Encoding { + type Error = DataFusionError; + + fn try_from(encoding: &ColumnarValue) -> Result { + let encoding = match encoding { + ColumnarValue::Scalar(encoding) => match encoding.try_as_str().flatten() { + Some(encoding) => encoding, + _ => return exec_err!("Encoding must be a non-null string"), + }, + ColumnarValue::Array(_) => { + return not_impl_err!("Encoding must be a scalar; array specified encoding is not yet supported") } - Self::Hex => ScalarValue::LargeUtf8(value.map(hex::encode)), - }) - } - - fn encode_binary_array(self, value: &dyn Array) -> Result - where - T: OffsetSizeTrait, - { - let input_value = as_generic_binary_array::(value)?; - let array: ArrayRef = match self { - Self::Base64 => encode_to_array!(base64_encode, input_value), - Self::Hex => encode_to_array!(hex_encode, input_value), }; - Ok(ColumnarValue::Array(array)) - } - - fn encode_utf8_array(self, value: &dyn Array) -> Result - where - T: OffsetSizeTrait, - { - let input_value = as_generic_string_array::(value)?; - let array: ArrayRef = match self { - Self::Base64 => encode_to_array!(base64_encode, input_value), - Self::Hex => encode_to_array!(hex_encode, input_value), - }; - Ok(ColumnarValue::Array(array)) + match encoding { + "base64" => Ok(Self::Base64), + "hex" => Ok(Self::Hex), + _ => { + let options = [Self::Base64, Self::Hex] + .iter() + .map(|i| i.to_string()) + .collect::>() + .join(", "); + plan_err!( + "There is no built-in encoding named '{encoding}', currently supported encodings are: {options}" + ) + } + } } +} - fn decode_scalar(self, value: Option<&[u8]>) -> Result { - let value = match value { - Some(value) => value, - None => return Ok(ColumnarValue::Scalar(ScalarValue::Binary(None))), - }; - - let out = match self { - Self::Base64 => BASE64_ENGINE.decode(value).map_err(|e| { - internal_datafusion_err!("Failed to decode value using base64: {e}") - })?, - Self::Hex => hex::decode(value).map_err(|e| { - internal_datafusion_err!("Failed to decode value using hex: {e}") - })?, - }; - - Ok(ColumnarValue::Scalar(ScalarValue::Binary(Some(out)))) +impl Encoding { + fn encode_bytes(self, value: &[u8]) -> String { + match self { + Self::Base64 => BASE64_ENGINE.encode(value), + Self::Hex => hex::encode(value), + } } - fn decode_large_scalar(self, value: Option<&[u8]>) -> Result { - let value = match value { - Some(value) => value, - None => return Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(None))), - }; - - let out = match self { + fn decode_bytes(self, value: &[u8]) -> Result> { + match self { Self::Base64 => BASE64_ENGINE.decode(value).map_err(|e| { - internal_datafusion_err!("Failed to decode value using base64: {e}") - })?, + exec_datafusion_err!("Failed to decode value using base64: {e}") + }), Self::Hex => hex::decode(value).map_err(|e| { - internal_datafusion_err!("Failed to decode value using hex: {e}") - })?, - }; - - Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(Some(out)))) + exec_datafusion_err!("Failed to decode value using hex: {e}") + }), + } } - fn decode_binary_array(self, value: &dyn Array) -> Result + // OutputOffset important to ensure Large types output Large arrays + fn encode_array<'a, InputBinaryArray, OutputOffset>( + self, + array: &InputBinaryArray, + ) -> Result where - T: OffsetSizeTrait, + InputBinaryArray: BinaryArrayType<'a>, + OutputOffset: OffsetSizeTrait, { - let input_value = as_generic_binary_array::(value)?; - let array = self.decode_byte_array(input_value)?; - Ok(ColumnarValue::Array(array)) + match self { + Self::Base64 => { + let array: GenericStringArray = array + .iter() + .map(|x| x.map(|x| BASE64_ENGINE.encode(x))) + .collect(); + Ok(Arc::new(array)) + } + Self::Hex => { + let array: GenericStringArray = + array.iter().map(|x| x.map(hex::encode)).collect(); + Ok(Arc::new(array)) + } + } } - fn decode_utf8_array(self, value: &dyn Array) -> Result + // OutputOffset important to ensure Large types output Large arrays + fn decode_array<'a, InputBinaryArray, OutputOffset>( + self, + value: &InputBinaryArray, + approx_data_size: usize, + ) -> Result where - T: OffsetSizeTrait, + InputBinaryArray: BinaryArrayType<'a>, + OutputOffset: OffsetSizeTrait, { - let input_value = as_generic_string_array::(value)?; - let array = self.decode_byte_array(input_value)?; - Ok(ColumnarValue::Array(array)) - } + fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result { + // only write input / 2 bytes to buf + let out_len = input.len() / 2; + let buf = &mut buf[..out_len]; + hex::decode_to_slice(input, buf).map_err(|e| { + internal_datafusion_err!("Failed to decode from hex: {e}") + })?; + Ok(out_len) + } + + fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result { + BASE64_ENGINE.decode_slice(input, buf).map_err(|e| { + internal_datafusion_err!("Failed to decode from base64: {e}") + }) + } - fn decode_byte_array( - &self, - input_value: &GenericByteArray, - ) -> Result { match self { Self::Base64 => { - let upper_bound = - base64::decoded_len_estimate(input_value.values().len()); - decode_to_array(base64_decode, input_value, upper_bound) + let upper_bound = base64::decoded_len_estimate(approx_data_size); + delegated_decode::<_, _, OutputOffset>(base64_decode, value, upper_bound) } Self::Hex => { // Calculate the upper bound for decoded byte size // For hex encoding, each pair of hex characters (2 bytes) represents 1 byte when decoded // So the upper bound is half the length of the input values. - let upper_bound = input_value.values().len() / 2; - decode_to_array(hex_decode, input_value, upper_bound) + let upper_bound = approx_data_size / 2; + delegated_decode::<_, _, OutputOffset>(hex_decode, value, upper_bound) } } } } -impl fmt::Display for Encoding { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", format!("{self:?}").to_lowercase()) +fn delegated_decode<'a, DecodeFunction, InputBinaryArray, OutputOffset>( + decode: DecodeFunction, + input: &InputBinaryArray, + conservative_upper_bound_size: usize, +) -> Result +where + DecodeFunction: Fn(&[u8], &mut [u8]) -> Result, + InputBinaryArray: BinaryArrayType<'a>, + OutputOffset: OffsetSizeTrait, +{ + let mut values = vec![0; conservative_upper_bound_size]; + let mut offsets = OffsetBufferBuilder::new(input.len()); + let mut total_bytes_decoded = 0; + for v in input.iter() { + if let Some(v) = v { + let cursor = &mut values[total_bytes_decoded..]; + let decoded = decode(v, cursor)?; + total_bytes_decoded += decoded; + offsets.push_length(decoded); + } else { + offsets.push_length(0); + } } + // We reserved an upper bound size for the values buffer, but we only use the actual size + values.truncate(total_bytes_decoded); + let binary_array = GenericBinaryArray::::try_new( + offsets.finish(), + Buffer::from_vec(values), + input.nulls().cloned(), + )?; + Ok(Arc::new(binary_array)) } -impl FromStr for Encoding { - type Err = DataFusionError; - fn from_str(name: &str) -> Result { - Ok(match name { - "base64" => Self::Base64, - "hex" => Self::Hex, - _ => { - let options = [Self::Base64, Self::Hex] - .iter() - .map(|i| i.to_string()) - .collect::>() - .join(", "); - return plan_err!( - "There is no built-in encoding named '{name}', currently supported encodings are: {options}" - ); - } - }) +#[cfg(test)] +mod tests { + use arrow::array::BinaryArray; + use arrow_buffer::OffsetBuffer; + + use super::*; + + #[test] + fn test_estimate_byte_data_size() { + // Offsets starting at 0, but don't count entire data buffer size + let array = BinaryArray::new( + OffsetBuffer::new(vec![0, 5, 10, 15].into()), + vec![0; 100].into(), + None, + ); + let size = estimate_byte_data_size(&array); + assert_eq!(size, 15); + + // Offsets starting at 0, but don't count entire data buffer size + let array = BinaryArray::new( + OffsetBuffer::new(vec![50, 51, 51, 60, 80, 81].into()), + vec![0; 100].into(), + Some(vec![true, false, false, true, true].into()), + ); + let size = estimate_byte_data_size(&array); + assert_eq!(size, 31); } } - -/// Encodes the given data, accepts Binary, LargeBinary, Utf8, Utf8View or LargeUtf8 and returns a [`ColumnarValue`]. -/// Second argument is the encoding to use. -/// Standard encodings are base64 and hex. -fn encode(args: &[ColumnarValue]) -> Result { - let [expression, format] = take_function_args("encode", args)?; - - let encoding = match format { - ColumnarValue::Scalar(scalar) => match scalar.try_as_str() { - Some(Some(method)) => method.parse::(), - _ => not_impl_err!( - "Second argument to encode must be non null constant string: Encode using dynamically decided method is not yet supported. Got {scalar:?}" - ), - }, - ColumnarValue::Array(_) => not_impl_err!( - "Second argument to encode must be a constant: Encode using dynamically decided method is not yet supported" - ), - }?; - encode_process(expression, encoding) -} - -/// Decodes the given data, accepts Binary, LargeBinary, Utf8, Utf8View or LargeUtf8 and returns a [`ColumnarValue`]. -/// Second argument is the encoding to use. -/// Standard encodings are base64 and hex. -fn decode(args: &[ColumnarValue]) -> Result { - let [expression, format] = take_function_args("decode", args)?; - - let encoding = match format { - ColumnarValue::Scalar(scalar) => match scalar.try_as_str() { - Some(Some(method))=> method.parse::(), - _ => not_impl_err!( - "Second argument to decode must be a non null constant string: Decode using dynamically decided method is not yet supported. Got {scalar:?}" - ), - }, - ColumnarValue::Array(_) => not_impl_err!( - "Second argument to decode must be a utf8 constant: Decode using dynamically decided method is not yet supported" - ), - }?; - decode_process(expression, encoding) -} diff --git a/datafusion/sqllogictest/test_files/encoding.slt b/datafusion/sqllogictest/test_files/encoding.slt index 300294f6e115..e10287dcf124 100644 --- a/datafusion/sqllogictest/test_files/encoding.slt +++ b/datafusion/sqllogictest/test_files/encoding.slt @@ -15,6 +15,32 @@ # specific language governing permissions and limitations # under the License. +query T +SELECT encode(arrow_cast('tom', 'Utf8View'),'base64'); +---- +dG9t + +query T +SELECT arrow_cast(decode(arrow_cast('dG9t', 'Utf8View'),'base64'), 'Utf8'); +---- +tom + +query T +SELECT encode(arrow_cast('tom', 'BinaryView'),'base64'); +---- +dG9t + +query T +SELECT arrow_cast(decode(arrow_cast('dG9t', 'BinaryView'),'base64'), 'Utf8'); +---- +tom + +# test for hex digest +query T +select encode(digest('hello', 'sha256'), 'hex'); +---- +2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824 + statement ok CREATE TABLE test( num INT, @@ -29,23 +55,29 @@ CREATE TABLE test( ; # errors -query error 1st argument should be Utf8 or Binary or Null, got Int64 +query error DataFusion error: Error during planning: Internal error: Expect TypeSignatureClass::Native\(LogicalType\(Native\(Binary\), Binary\)\) but received NativeType::Int64, DataType: Int64 select encode(12, 'hex'); -query error DataFusion error: Error during planning: There is no built\-in encoding named 'non_encoding', currently supported encodings are: base64, hex -select encode(bin_field, 'non_encoding') from test; - -query error 1st argument should be Utf8 or Binary or Null, got Int64 +query error DataFusion error: Error during planning: Internal error: Expect TypeSignatureClass::Native\(LogicalType\(Native\(Binary\), Binary\)\) but received NativeType::Int64, DataType: Int64 select decode(12, 'hex'); query error DataFusion error: Error during planning: There is no built\-in encoding named 'non_encoding', currently supported encodings are: base64, hex -select decode(hex_field, 'non_encoding') from test; +select encode('', 'non_encoding'); -query error +query error DataFusion error: Error during planning: There is no built\-in encoding named 'non_encoding', currently supported encodings are: base64, hex +select decode('', 'non_encoding'); + +query error DataFusion error: Execution error: Encoding must be a non-null string +select decode('', null) from test; + +query error DataFusion error: This feature is not implemented: Encoding must be a scalar; array specified encoding is not yet supported +select decode('', hex_field) from test; + +query error DataFusion error: Error during planning: Failed to coerce arguments to satisfy a call to 'to_hex' function select to_hex(hex_field) from test; -query error -select arrow_cast(decode(X'8f50d3f60eae370ddbf85c86219c55108a350165', 'base64'), 'Utf8'); +query error DataFusion error: Execution error: Failed to decode value using base64 +select decode(X'8f50d3f60eae370ddbf85c86219c55108a350165', 'base64'); # Arrays tests query T @@ -56,13 +88,20 @@ SELECT encode(bin_field, 'hex') FROM test ORDER BY num; NULL 8f50d3f60eae370ddbf85c86219c55108a350165 -query T -SELECT arrow_cast(decode(base64_field, 'base64'), 'Utf8') FROM test ORDER BY num; +query TTTTTT +SELECT + arrow_cast(decode(arrow_cast(base64_field, 'Utf8'), 'base64'), 'Utf8'), + arrow_cast(decode(arrow_cast(base64_field, 'LargeUtf8'), 'base64'), 'Utf8'), + arrow_cast(decode(arrow_cast(base64_field, 'Utf8View'), 'base64'), 'Utf8'), + arrow_cast(decode(arrow_cast(base64_field, 'Binary'), 'base64'), 'Utf8'), + arrow_cast(decode(arrow_cast(base64_field, 'LargeBinary'), 'base64'), 'Utf8'), + arrow_cast(decode(arrow_cast(base64_field, 'BinaryView'), 'base64'), 'Utf8') +FROM test ORDER BY num; ---- -abc -qweqw -NULL -8f50d3f60eae370ddbf85c86219c55108a350165 +abc abc abc abc abc abc +qweqw qweqw qweqw qweqw qweqw qweqw +NULL NULL NULL NULL NULL NULL +8f50d3f60eae370ddbf85c86219c55108a350165 8f50d3f60eae370ddbf85c86219c55108a350165 8f50d3f60eae370ddbf85c86219c55108a350165 8f50d3f60eae370ddbf85c86219c55108a350165 8f50d3f60eae370ddbf85c86219c55108a350165 8f50d3f60eae370ddbf85c86219c55108a350165 query T SELECT arrow_cast(decode(hex_field, 'hex'), 'Utf8') FROM test ORDER BY num; @@ -90,38 +129,48 @@ select decode(encode(bin_field, 'base64'), 'base64') = X'8f50d3f60eae370ddbf85c8 ---- true -# test for Utf8View support for encode statement ok -CREATE TABLE test_source AS VALUES - ('Andrew', 'X'), - ('Xiangpeng', 'Xiangpeng'), - ('Raphael', 'R'), - (NULL, 'R'); +drop table test +# test for Utf8View support for encode statement ok CREATE TABLE test_utf8view AS select arrow_cast(column1, 'Utf8View') AS column1_utf8view, arrow_cast(column2, 'Utf8View') AS column2_utf8view -FROM test_source; +FROM VALUES + ('Andrew', 'X'), + ('Xiangpeng', 'Xiangpeng'), + ('Raphael', 'R'), + (NULL, 'R'); -query TTTTTT +query TTTT SELECT - column1_utf8view, encode(column1_utf8view, 'base64') AS column1_base64, encode(column1_utf8view, 'hex') AS column1_hex, - column2_utf8view, encode(column2_utf8view, 'base64') AS column2_base64, encode(column2_utf8view, 'hex') AS column2_hex FROM test_utf8view; ---- -Andrew QW5kcmV3 416e64726577 X WA 58 -Xiangpeng WGlhbmdwZW5n 5869616e6770656e67 Xiangpeng WGlhbmdwZW5n 5869616e6770656e67 -Raphael UmFwaGFlbA 5261706861656c R Ug 52 -NULL NULL NULL R Ug 52 +QW5kcmV3 416e64726577 WA 58 +WGlhbmdwZW5n 5869616e6770656e67 WGlhbmdwZW5n 5869616e6770656e67 +UmFwaGFlbA 5261706861656c Ug 52 +NULL NULL Ug 52 -# test for hex digest -query T -select encode(digest('hello', 'sha256'), 'hex'); +query TTTTTT +SELECT + encode(arrow_cast(column1_utf8view, 'Utf8'), 'base64'), + encode(arrow_cast(column1_utf8view, 'LargeUtf8'), 'base64'), + encode(arrow_cast(column1_utf8view, 'Utf8View'), 'base64'), + encode(arrow_cast(column1_utf8view, 'Binary'), 'base64'), + encode(arrow_cast(column1_utf8view, 'LargeBinary'), 'base64'), + encode(arrow_cast(column1_utf8view, 'BinaryView'), 'base64') +FROM test_utf8view; ---- -2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824 +QW5kcmV3 QW5kcmV3 QW5kcmV3 QW5kcmV3 QW5kcmV3 QW5kcmV3 +WGlhbmdwZW5n WGlhbmdwZW5n WGlhbmdwZW5n WGlhbmdwZW5n WGlhbmdwZW5n WGlhbmdwZW5n +UmFwaGFlbA UmFwaGFlbA UmFwaGFlbA UmFwaGFlbA UmFwaGFlbA UmFwaGFlbA +NULL NULL NULL NULL NULL NULL + +statement ok +drop table test_utf8view