-
Notifications
You must be signed in to change notification settings - Fork 2k
Refactor crypto functions code #18664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a592c18
9f718ce
1bf193e
6405b71
99581ce
1ca876a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,10 +18,8 @@ | |
| //! "crypto" DataFusion functions | ||
|
|
||
| use arrow::array::{ | ||
| Array, ArrayRef, BinaryArray, BinaryArrayType, BinaryViewArray, GenericBinaryArray, | ||
| OffsetSizeTrait, | ||
| Array, ArrayRef, AsArray, BinaryArray, BinaryArrayType, StringViewArray, | ||
| }; | ||
| use arrow::array::{AsArray, GenericStringArray, StringViewArray}; | ||
| use arrow::datatypes::DataType; | ||
| use blake2::{Blake2b512, Blake2s256, Digest}; | ||
| use blake3::Hasher as Blake3; | ||
|
|
@@ -84,18 +82,7 @@ define_digest_function!( | |
| "computes blake3 hash digest of the given input" | ||
| ); | ||
|
|
||
| macro_rules! digest_to_scalar { | ||
| ($METHOD: ident, $INPUT:expr) => {{ | ||
| ScalarValue::Binary($INPUT.as_ref().map(|v| { | ||
| let mut digest = $METHOD::default(); | ||
| digest.update(v); | ||
| #[allow(deprecated)] | ||
| digest.finalize().as_slice().to_vec() | ||
| })) | ||
| }}; | ||
| } | ||
|
|
||
| #[derive(Debug, Copy, Clone)] | ||
| #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] | ||
| pub enum DigestAlgorithm { | ||
| Md5, | ||
| Sha224, | ||
|
|
@@ -107,23 +94,6 @@ pub enum DigestAlgorithm { | |
| Blake3, | ||
| } | ||
|
|
||
| /// Digest computes a binary hash of the given data, accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`]. | ||
| /// Second argument is the algorithm to use. | ||
| /// Standard algorithms are md5, sha1, sha224, sha256, sha384 and sha512. | ||
| pub fn digest(args: &[ColumnarValue]) -> Result<ColumnarValue> { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved to |
||
| let [data, digest_algorithm] = take_function_args("digest", args)?; | ||
| let digest_algorithm = match digest_algorithm { | ||
| ColumnarValue::Scalar(scalar) => match scalar.try_as_str() { | ||
| Some(Some(method)) => method.parse::<DigestAlgorithm>(), | ||
| _ => exec_err!("Unsupported data type {scalar:?} for function digest"), | ||
| }, | ||
| ColumnarValue::Array(_) => { | ||
| internal_err!("Digest using dynamically decided method is not yet supported") | ||
| } | ||
| }?; | ||
| digest_process(data, digest_algorithm) | ||
| } | ||
|
|
||
| impl FromStr for DigestAlgorithm { | ||
| type Err = DataFusionError; | ||
| fn from_str(name: &str) -> Result<DigestAlgorithm> { | ||
|
|
@@ -183,7 +153,7 @@ pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> { | |
| ColumnarValue::Scalar(ScalarValue::Binary(opt)) => { | ||
| ColumnarValue::Scalar(ScalarValue::Utf8View(opt.map(hex_encode::<_>))) | ||
| } | ||
| _ => return exec_err!("Impossibly got invalid results from digest"), | ||
| _ => return internal_err!("Impossibly got invalid results from digest"), | ||
| }) | ||
| } | ||
|
|
||
|
|
@@ -198,25 +168,7 @@ fn hex_encode<T: AsRef<[u8]>>(data: T) -> String { | |
| } | ||
| s | ||
| } | ||
| pub fn utf8_or_binary_to_binary_type( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Refactored away, was only used for return types in sha/md5/digest (which are now simplified) |
||
| arg_type: &DataType, | ||
| name: &str, | ||
| ) -> Result<DataType> { | ||
| Ok(match arg_type { | ||
| DataType::Utf8View | ||
| | DataType::LargeUtf8 | ||
| | DataType::Utf8 | ||
| | DataType::Binary | ||
| | DataType::BinaryView | ||
| | DataType::LargeBinary => DataType::Binary, | ||
| DataType::Null => DataType::Null, | ||
| _ => { | ||
| return plan_err!( | ||
| "The {name:?} function can only accept strings or binary arrays." | ||
| ); | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| macro_rules! digest_to_array { | ||
| ($METHOD:ident, $INPUT:expr) => {{ | ||
| let binary_array: BinaryArray = $INPUT | ||
|
|
@@ -232,9 +184,20 @@ macro_rules! digest_to_array { | |
| Arc::new(binary_array) | ||
| }}; | ||
| } | ||
|
|
||
| macro_rules! digest_to_scalar { | ||
| ($METHOD: ident, $INPUT:expr) => {{ | ||
| ScalarValue::Binary($INPUT.as_ref().map(|v| { | ||
| let mut digest = $METHOD::default(); | ||
| digest.update(v); | ||
| digest.finalize().as_slice().to_vec() | ||
| })) | ||
| }}; | ||
| } | ||
|
|
||
| impl DigestAlgorithm { | ||
| /// digest an optional string to its hash value, null values are returned as is | ||
| pub fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue { | ||
| fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue { | ||
| ColumnarValue::Scalar(match self { | ||
| Self::Md5 => digest_to_scalar!(Md5, value), | ||
| Self::Sha224 => digest_to_scalar!(Sha224, value), | ||
|
|
@@ -251,49 +214,7 @@ impl DigestAlgorithm { | |
| }) | ||
| } | ||
|
|
||
| /// digest a binary array to their hash values | ||
| pub fn digest_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue> | ||
| where | ||
| T: OffsetSizeTrait, | ||
|
Comment on lines
-254
to
-257
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Essentially inlined these, as didn't see much benefit and they were adding more indirection |
||
| { | ||
| let array = match value.data_type() { | ||
| DataType::Binary | DataType::LargeBinary => { | ||
| let v = value.as_binary::<T>(); | ||
| self.digest_binary_array_impl::<&GenericBinaryArray<T>>(&v) | ||
| } | ||
| DataType::BinaryView => { | ||
| let v = value.as_binary_view(); | ||
| self.digest_binary_array_impl::<&BinaryViewArray>(&v) | ||
| } | ||
| other => { | ||
| return exec_err!("unsupported type for digest_utf_array: {other:?}") | ||
| } | ||
| }; | ||
| Ok(ColumnarValue::Array(array)) | ||
| } | ||
|
|
||
| /// digest a string array to their hash values | ||
| pub fn digest_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue> | ||
| where | ||
| T: OffsetSizeTrait, | ||
| { | ||
| let array = match value.data_type() { | ||
| DataType::Utf8 | DataType::LargeUtf8 => { | ||
| let v = value.as_string::<T>(); | ||
| self.digest_utf8_array_impl::<&GenericStringArray<T>>(&v) | ||
| } | ||
| DataType::Utf8View => { | ||
| let v = value.as_string_view(); | ||
| self.digest_utf8_array_impl::<&StringViewArray>(&v) | ||
| } | ||
| other => { | ||
| return exec_err!("unsupported type for digest_utf_array: {other:?}") | ||
| } | ||
| }; | ||
| Ok(ColumnarValue::Array(array)) | ||
| } | ||
|
|
||
| pub fn digest_utf8_array_impl<'a, StringArrType>( | ||
| fn digest_utf8_array_impl<'a, StringArrType>( | ||
| self, | ||
| input_value: &StringArrType, | ||
| ) -> ArrayRef | ||
|
|
@@ -324,7 +245,7 @@ impl DigestAlgorithm { | |
| } | ||
| } | ||
|
|
||
| pub fn digest_binary_array_impl<'a, BinaryArrType>( | ||
| fn digest_binary_array_impl<'a, BinaryArrType>( | ||
| self, | ||
| input_value: &BinaryArrType, | ||
| ) -> ArrayRef | ||
|
|
@@ -355,26 +276,40 @@ impl DigestAlgorithm { | |
| } | ||
| } | ||
| } | ||
|
|
||
| pub fn digest_process( | ||
| value: &ColumnarValue, | ||
| digest_algorithm: DigestAlgorithm, | ||
| ) -> Result<ColumnarValue> { | ||
| match value { | ||
| ColumnarValue::Array(a) => match a.data_type() { | ||
| DataType::Utf8View => digest_algorithm.digest_utf8_array::<i32>(a.as_ref()), | ||
| DataType::Utf8 => digest_algorithm.digest_utf8_array::<i32>(a.as_ref()), | ||
| DataType::LargeUtf8 => digest_algorithm.digest_utf8_array::<i64>(a.as_ref()), | ||
| DataType::Binary => digest_algorithm.digest_binary_array::<i32>(a.as_ref()), | ||
| DataType::LargeBinary => { | ||
| digest_algorithm.digest_binary_array::<i64>(a.as_ref()) | ||
| } | ||
| DataType::BinaryView => { | ||
| digest_algorithm.digest_binary_array::<i32>(a.as_ref()) | ||
| } | ||
| other => exec_err!( | ||
| "Unsupported data type {other:?} for function {digest_algorithm}" | ||
| ), | ||
| }, | ||
| ColumnarValue::Array(a) => { | ||
| let output = match a.data_type() { | ||
| DataType::Utf8View => { | ||
| digest_algorithm.digest_utf8_array_impl(&a.as_string_view()) | ||
| } | ||
| DataType::Utf8 => { | ||
| digest_algorithm.digest_utf8_array_impl(&a.as_string::<i32>()) | ||
| } | ||
| DataType::LargeUtf8 => { | ||
| digest_algorithm.digest_utf8_array_impl(&a.as_string::<i64>()) | ||
| } | ||
| DataType::Binary => { | ||
| digest_algorithm.digest_binary_array_impl(&a.as_binary::<i32>()) | ||
| } | ||
| DataType::LargeBinary => { | ||
| digest_algorithm.digest_binary_array_impl(&a.as_binary::<i64>()) | ||
| } | ||
| DataType::BinaryView => { | ||
| digest_algorithm.digest_binary_array_impl(&a.as_binary_view()) | ||
| } | ||
| other => { | ||
| return exec_err!( | ||
| "Unsupported data type {other:?} for function {digest_algorithm}" | ||
| ) | ||
| } | ||
| }; | ||
| Ok(ColumnarValue::Array(output)) | ||
| } | ||
| ColumnarValue::Scalar(scalar) => { | ||
| match scalar { | ||
| ScalarValue::Utf8View(a) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,11 +15,13 @@ | |
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! "crypto" DataFusion functions | ||
| use super::basic::{digest, utf8_or_binary_to_binary_type}; | ||
| use crate::crypto::basic::{digest_process, DigestAlgorithm}; | ||
|
|
||
| use arrow::datatypes::DataType; | ||
| use datafusion_common::{ | ||
| exec_err, not_impl_err, | ||
| types::{logical_binary, logical_string}, | ||
| utils::take_function_args, | ||
| Result, | ||
| }; | ||
| use datafusion_expr::{ | ||
|
|
@@ -36,16 +38,16 @@ use std::any::Any; | |
| syntax_example = "digest(expression, algorithm)", | ||
| sql_example = r#"```sql | ||
| > select digest('foo', 'sha256'); | ||
| +------------------------------------------+ | ||
| | digest(Utf8("foo"), Utf8("sha256")) | | ||
| +------------------------------------------+ | ||
| | <binary_hash_result> | | ||
| +------------------------------------------+ | ||
| +------------------------------------------------------------------+ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I double checked this is indeed the output DataFusion CLI v51.0.0
> select digest('foo', 'sha256');
+------------------------------------------------------------------+
| digest(Utf8("foo"),Utf8("sha256")) |
+------------------------------------------------------------------+
| 2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae |
+------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.030 seconds. |
||
| | digest(Utf8("foo"),Utf8("sha256")) | | ||
| +------------------------------------------------------------------+ | ||
| | 2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae | | ||
| +------------------------------------------------------------------+ | ||
| ```"#, | ||
| standard_argument(name = "expression", prefix = "String"), | ||
| argument( | ||
| name = "algorithm", | ||
| description = "String expression specifying algorithm to use. Must be one of: | ||
| description = "String expression specifying algorithm to use. Must be one of: | ||
| - md5 | ||
| - sha224 | ||
| - sha256 | ||
|
|
@@ -60,6 +62,7 @@ use std::any::Any; | |
| pub struct DigestFunc { | ||
| signature: Signature, | ||
| } | ||
|
|
||
| impl Default for DigestFunc { | ||
| fn default() -> Self { | ||
| Self::new() | ||
|
|
@@ -85,6 +88,7 @@ impl DigestFunc { | |
| } | ||
| } | ||
| } | ||
|
|
||
| impl ScalarUDFImpl for DigestFunc { | ||
| fn as_any(&self) -> &dyn Any { | ||
| self | ||
|
|
@@ -98,14 +102,35 @@ impl ScalarUDFImpl for DigestFunc { | |
| &self.signature | ||
| } | ||
|
|
||
| fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { | ||
| utf8_or_binary_to_binary_type(&arg_types[0], self.name()) | ||
| fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> { | ||
| Ok(DataType::Binary) | ||
| } | ||
|
|
||
| fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { | ||
| digest(&args.args) | ||
| let [data, digest_algorithm] = take_function_args(self.name(), &args.args)?; | ||
| digest(data, digest_algorithm) | ||
| } | ||
|
|
||
| fn documentation(&self) -> Option<&Documentation> { | ||
| self.doc() | ||
| } | ||
| } | ||
|
|
||
| /// Compute binary hash of the given `data` (String or Binary array), according | ||
| /// to the specified `digest_algorithm`. See [`DigestAlgorithm`] for supported | ||
| /// algorithms. | ||
| fn digest( | ||
| data: &ColumnarValue, | ||
| digest_algorithm: &ColumnarValue, | ||
| ) -> Result<ColumnarValue> { | ||
| let digest_algorithm = match digest_algorithm { | ||
| ColumnarValue::Scalar(scalar) => match scalar.try_as_str() { | ||
| Some(Some(method)) => method.parse::<DigestAlgorithm>(), | ||
| _ => exec_err!("Unsupported data type {scalar:?} for function digest"), | ||
| }, | ||
| ColumnarValue::Array(_) => { | ||
| not_impl_err!("Digest using dynamically decided method is not yet supported") | ||
| } | ||
| }?; | ||
| digest_process(data, digest_algorithm) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved down, closer to where it's actually used