diff --git a/Cargo.lock b/Cargo.lock index 6acab5e3..8a13212e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1305,6 +1305,7 @@ dependencies = [ "config", "const_format", "derivative", + "encoding_rs", "env_logger", "expect-test", "futures", diff --git a/Cargo.toml b/Cargo.toml index 70167b80..684a8e8e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,3 +152,4 @@ azure_storage_blobs = { version = "0.21.0", default-features = false, features = ] } serde_path_to_error = "0.1.17" expect-test = "1.5.0" +encoding_rs = "0.8.35" diff --git a/src/ops/sources/amazon_s3.rs b/src/ops/sources/amazon_s3.rs index e81ed772..ff1613f6 100644 --- a/src/ops/sources/amazon_s3.rs +++ b/src/ops/sources/amazon_s3.rs @@ -152,7 +152,8 @@ impl SourceExecutor for Executor { Some(SourceValue::Existence(if self.binary { fields_value!(bytes.to_vec()) } else { - fields_value!(String::from_utf8_lossy(&bytes).to_string()) + let (s, _) = utils::bytes_decode::bytes_to_string(&bytes); + fields_value!(s) })) } else { None diff --git a/src/ops/sources/azure_blob.rs b/src/ops/sources/azure_blob.rs index 304ed68b..4f56b719 100644 --- a/src/ops/sources/azure_blob.rs +++ b/src/ops/sources/azure_blob.rs @@ -147,7 +147,8 @@ impl SourceExecutor for Executor { Some(SourceValue::Existence(if self.binary { fields_value!(bytes) } else { - fields_value!(String::from_utf8_lossy(&bytes).to_string()) + let (s, _) = utils::bytes_decode::bytes_to_string(&bytes); + fields_value!(s) })) } else { None diff --git a/src/ops/sources/google_drive.rs b/src/ops/sources/google_drive.rs index 4415c852..1fa293a0 100644 --- a/src/ops/sources/google_drive.rs +++ b/src/ops/sources/google_drive.rs @@ -400,9 +400,8 @@ impl SourceExecutor for Executor { if self.binary { content.to_bytes().to_vec().into() } else { - String::from_utf8_lossy(&content.to_bytes()) - .to_string() - .into() + let (s, _) = utils::bytes_decode::bytes_to_string(&content.to_bytes()); + s.into() }, ]; Some(SourceValue::Existence(FieldValues { fields })) diff --git a/src/ops/sources/local_file.rs b/src/ops/sources/local_file.rs index 3f48e7db..6701f99e 100644 --- a/src/ops/sources/local_file.rs +++ b/src/ops/sources/local_file.rs @@ -97,7 +97,8 @@ impl SourceExecutor for Executor { let content = if self.binary { fields_value!(content) } else { - fields_value!(String::from_utf8_lossy(&content).to_string()) + let (s, _) = utils::bytes_decode::bytes_to_string(&content); + fields_value!(s) }; Some(SourceValue::Existence(content)) } diff --git a/src/utils/bytes_decode.rs b/src/utils/bytes_decode.rs new file mode 100644 index 00000000..892b1f2e --- /dev/null +++ b/src/utils/bytes_decode.rs @@ -0,0 +1,12 @@ +use encoding_rs::Encoding; + +pub fn bytes_to_string(bytes: &[u8]) -> (String, bool) { + // 1) BOM sniff first (definitive for UTF-8/16; UTF-32 is not supported here). + if let Some((enc, bom_len)) = Encoding::for_bom(bytes) { + let (cow, had_errors) = enc.decode_without_bom_handling(&bytes[bom_len..]); + return (cow.into_owned(), had_errors); + } + // 2) Otherwise, try UTF-8 (accepts input with or without a UTF-8 BOM). + let (cow, had_errors) = encoding_rs::UTF_8.decode_with_bom_removal(bytes); + (cow.into_owned(), had_errors) +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 9aa1655f..212c3432 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,3 +1,4 @@ +pub mod bytes_decode; pub mod concur_control; pub mod db; pub mod deser;