From ef504624af17230e060a663d2bdcb218c30578c1 Mon Sep 17 00:00:00 2001 From: Julio Gonzalez Date: Thu, 18 Jul 2024 15:07:31 +0200 Subject: [PATCH 01/10] Add basic decoding for v0.4 msgpack traces. * Add methods to get string and &str. * Add Number abstraction layer in order to decode integer and floats. * Implement decoder for Span. * Implement decoder for meta attributes. * Implement decoder for metrics attributes. * Implement decoder for SpanLinks. * Add tests. --- trace-utils/src/lib.rs | 1 + trace-utils/src/msgpack/decoder.rs | 302 +++++++++++++++++++++++++++++ trace-utils/src/msgpack/error.rs | 23 +++ trace-utils/src/msgpack/mod.rs | 6 + trace-utils/src/msgpack/number.rs | 202 +++++++++++++++++++ trace-utils/src/tracer_payload.rs | 53 ++++- 6 files changed, 583 insertions(+), 4 deletions(-) create mode 100644 trace-utils/src/msgpack/decoder.rs create mode 100644 trace-utils/src/msgpack/error.rs create mode 100644 trace-utils/src/msgpack/mod.rs create mode 100644 trace-utils/src/msgpack/number.rs diff --git a/trace-utils/src/lib.rs b/trace-utils/src/lib.rs index 70d845a08c..568b2c5d99 100644 --- a/trace-utils/src/lib.rs +++ b/trace-utils/src/lib.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 pub mod config_utils; +pub mod msgpack; pub mod send_data; pub mod stats_utils; #[cfg(any(test, feature = "test-utils"))] diff --git a/trace-utils/src/msgpack/decoder.rs b/trace-utils/src/msgpack/decoder.rs new file mode 100644 index 0000000000..c799121a8c --- /dev/null +++ b/trace-utils/src/msgpack/decoder.rs @@ -0,0 +1,302 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::msgpack::error::DecodeError; +use crate::msgpack::number::read_number; +use crate::tracer_payload::TracerPayloadV04; +use datadog_trace_protobuf::pb::{Span, SpanLink}; +use rmp::{ + decode::{read_array_len, RmpRead}, + Marker, +}; +use std::{collections::HashMap, f64, str::FromStr}; + +#[inline] +pub fn read_string_ref(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { + rmp::decode::read_str_from_slice(buf).map_err(|_| DecodeError::WrongFormat) +} + +#[inline] +pub fn read_string(buf: &mut &[u8]) -> Result { + let value_len: usize = rmp::decode::read_str_len(buf) + .map_err(|_| DecodeError::WrongFormat)? + .try_into() + .map_err(|_| DecodeError::WrongConversion)?; + + let mut vec = vec![0; value_len]; + buf.read_exact_buf(vec.as_mut_slice()) + .map_err(|_| DecodeError::IOError)?; + + let str = String::from_utf8(vec).map_err(|_| DecodeError::Utf8Error)?; + Ok(str) +} + +#[inline] +fn read_str_pair(buf: &mut &[u8]) -> Result<(String, String), DecodeError> { + let k = read_string(buf)?; + let v = read_string(buf)?; + + Ok((k, v)) +} + +#[inline] +fn read_metric_pair(buf: &mut &[u8]) -> Result<(String, f64), DecodeError> { + let k = read_string(buf)?; + let v = read_number(buf)?.try_into()?; + + Ok((k, v)) +} + +fn read_map_strs(buf: &mut &[u8]) -> Result, DecodeError> { + match rmp::decode::read_marker(buf).map_err(|_| DecodeError::WrongFormat)? { + Marker::FixMap(len) => { + let mut map = HashMap::new(); + for _ in 0..len { + let (k, v) = read_str_pair(buf)?; + map.insert(k, v); + } + Ok(map) + } + _ => Err(DecodeError::WrongType), + } +} + +fn read_metrics(buf: &mut &[u8]) -> Result, DecodeError> { + match rmp::decode::read_marker(buf).map_err(|_| DecodeError::WrongFormat)? { + Marker::FixMap(len) => { + let mut metrics = HashMap::new(); + for _ in 0..len { + let (k, v) = read_metric_pair(buf)?; + metrics.insert(k, v); + } + Ok(metrics) + } + _ => Err(DecodeError::WrongType), + } +} + +fn read_meta_struct(buf: &mut &[u8]) -> Result>, DecodeError> { + match rmp::decode::read_marker(buf).map_err(|_| DecodeError::WrongFormat)? { + Marker::FixMap(len) => { + let mut meta_struct = HashMap::new(); + for _ in 0..len { + let k = read_string(buf)?; + let mut v = vec![]; + let array_len = + rmp::decode::read_array_len(buf).map_err(|_| DecodeError::WrongFormat)?; + for _ in 0..array_len { + let value = read_number(buf)?.try_into()?; + v.push(value); + } + meta_struct.insert(k, v); + } + Ok(meta_struct) + } + _ => Err(DecodeError::WrongType), + } +} + +#[allow(clippy::explicit_auto_deref)] +fn decode_span_link(buf: &mut &[u8]) -> Result { + let mut span = SpanLink::default(); + let span_size = rmp::decode::read_map_len(buf).map_err(|_| DecodeError::WrongType)?; + + for _ in 0..span_size { + let (key, value) = read_string_ref(*buf)?; + *buf = value; + if key == "trace_id" { + span.trace_id = read_number(buf)?.try_into()?; + } else if key == "trace_id_high" { + span.trace_id_high = read_number(buf)?.try_into()?; + } else if key == "span_id" { + span.span_id = read_number(buf)?.try_into()?; + } else if key == "attributes" { + span.attributes = read_map_strs(buf)?; + } else if key == "tracestate" { + let (value, next) = read_string_ref(*buf)?; + span.tracestate = String::from_str(value).unwrap(); + *buf = next; + } else if key == "flags" { + span.flags = read_number(buf)?.try_into()?; + } else { + return Err(DecodeError::WrongFormat); + } + } + + Ok(span) +} + +fn read_span_links(buf: &mut &[u8]) -> Result, DecodeError> { + match rmp::decode::read_marker(buf).map_err(|_| DecodeError::WrongFormat)? { + Marker::FixArray(len) => { + let mut vec: Vec = Vec::new(); + for _ in 0..len { + vec.push(decode_span_link(buf)?); + } + Ok(vec) + } + _ => Err(DecodeError::WrongType), + } +} + +// Disabling explicit_auto_deref warning because passing buf instead of *buf to read_string_ref +// leads to borrow checker errors. +#[allow(clippy::explicit_auto_deref)] +fn fill_span(span: &mut Span, buf: &mut &[u8]) -> Result<(), DecodeError> { + // field's key won't be held so no need to copy it in a buffer. + let (key, value) = read_string_ref(*buf)?; + + // Go to the value + *buf = value; + if key == "service" { + let (value, next) = read_string_ref(*buf)?; + span.service = String::from_str(value).unwrap(); + *buf = next; + } else if key == "name" { + let (value, next) = read_string_ref(*buf)?; + span.name = String::from_str(value).unwrap(); + *buf = next; + } else if key == "resource" { + let (value, next) = read_string_ref(*buf)?; + span.resource = String::from_str(value).unwrap(); + *buf = next; + } else if key == "trace_id" { + span.trace_id = read_number(buf)?.try_into()?; + } else if key == "span_id" { + span.span_id = read_number(buf)?.try_into()?; + } else if key == "parent_id" { + span.parent_id = read_number(buf)?.try_into()?; + } else if key == "start" { + span.start = read_number(buf)?.try_into()?; + } else if key == "duration" { + span.duration = read_number(buf)?.try_into()?; + } else if key == "error" { + span.error = read_number(buf)?.try_into()?; + } else if key == "meta" { + span.meta = read_map_strs(buf)?; + } else if key == "metrics" { + span.metrics = read_metrics(buf)?; + } else if key == "type" { + let (value, next) = read_string_ref(*buf)?; + span.r#type = String::from_str(value).unwrap(); + *buf = next; + } else if key == "meta_struct" { + span.meta_struct = read_meta_struct(buf)?; + } else if key == "span_links" { + span.span_links = read_span_links(buf)?; + } else { + return Err(DecodeError::WrongFormat); + } + Ok(()) +} + +#[inline] +fn decode_span_v04(buf: &mut &[u8]) -> Result { + let mut span = Span::default(); + + let span_size = rmp::decode::read_map_len(buf).unwrap(); + + for _ in 0..span_size { + fill_span(&mut span, buf)?; + } + Ok(span) +} + +pub fn from_slice(data: &mut &[u8]) -> Result, DecodeError> { + let trace_count = read_array_len(data).map_err(|_| DecodeError::WrongFormat)?; + + let mut traces: Vec = Default::default(); + + for _ in 0..trace_count { + let span_count = read_array_len(data).unwrap(); + let mut trace: Vec = Default::default(); + + for _ in 0..span_count { + let span = decode_span_v04(data)?; + trace.push(span); + } + traces.push(trace); + } + + Ok(traces) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn decoder_read_string_succes() { + let expected = "foobar".to_string(); + let payload = rmp_serde::to_vec(&expected).unwrap(); + + assert_eq!(expected, read_string(&mut payload.as_ref()).unwrap()); + } + + #[test] + fn decoder_read_string_wrong_format() { + let input: [u8; 2] = [255; 2]; + + assert_eq!( + Err(DecodeError::WrongFormat), + read_string(&mut input.as_ref()) + ); + } + + #[test] + fn decoder_read_string_utf8_error() { + let invalid_seq = vec![0, 159, 146, 150]; + let str = unsafe { String::from_utf8_unchecked(invalid_seq) }; + let payload = rmp_serde::to_vec(&str).unwrap(); + assert_eq!( + Err(DecodeError::Utf8Error), + read_string(&mut payload.as_ref()) + ); + } + + #[test] + fn decoder_span_link_success() { + let span_links = vec![SpanLink { + trace_id: 1, + trace_id_high: 0, + span_id: 1, + attributes: HashMap::from([ + ("attr1".to_string(), "test_value".to_string()), + ("attr2".to_string(), "test_value".to_string()), + ]), + tracestate: "state_test".to_string(), + flags: 0b101, + }]; + + let payload = rmp_serde::to_vec_named(&span_links).unwrap(); + + assert_eq!(span_links, read_span_links(&mut payload.as_ref()).unwrap()) + } + + #[test] + fn decoder_meta_struct_success() { + let meta_struct = HashMap::from([ + ("key".to_string(), vec![1, 2, 3]), + ("key2".to_string(), vec![4, 5, 6]), + ]); + + let payload = rmp_serde::to_vec_named(&meta_struct).unwrap(); + + assert_eq!( + meta_struct, + read_meta_struct(&mut payload.as_ref()).unwrap() + ) + } + + #[test] + fn decoder_meta_success() { + let meta = HashMap::from([ + ("key1".to_string(), "value1".to_string()), + ("key2".to_string(), "value2".to_string()), + ]); + + let payload = rmp_serde::to_vec_named(&meta).unwrap(); + + assert_eq!(meta, read_map_strs(&mut payload.as_ref()).unwrap()) + } +} diff --git a/trace-utils/src/msgpack/error.rs b/trace-utils/src/msgpack/error.rs new file mode 100644 index 0000000000..d66531b81b --- /dev/null +++ b/trace-utils/src/msgpack/error.rs @@ -0,0 +1,23 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#[derive(Debug, PartialEq)] +pub enum DecodeError { + WrongConversion, + WrongType, + WrongFormat, + IOError, + Utf8Error, +} + +impl std::fmt::Display for DecodeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DecodeError::WrongConversion => write!(f, "Failed to cast value"), + DecodeError::IOError => write!(f, "Failed to read from buffer"), + DecodeError::WrongType => write!(f, "Invalid type encountered"), + DecodeError::WrongFormat => write!(f, "Invalid format"), + DecodeError::Utf8Error => write!(f, "Failed to read utf8 value"), + } + } +} diff --git a/trace-utils/src/msgpack/mod.rs b/trace-utils/src/msgpack/mod.rs new file mode 100644 index 0000000000..bdbadd27e9 --- /dev/null +++ b/trace-utils/src/msgpack/mod.rs @@ -0,0 +1,6 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +pub mod decoder; +pub mod error; +mod number; diff --git a/trace-utils/src/msgpack/number.rs b/trace-utils/src/msgpack/number.rs new file mode 100644 index 0000000000..3db839bf62 --- /dev/null +++ b/trace-utils/src/msgpack/number.rs @@ -0,0 +1,202 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::msgpack::error::DecodeError; +use rmp::{decode::RmpRead, Marker}; + +pub enum Number { + U8(u8), + U32(u32), + U64(u64), + I8(i8), + I32(i32), + I64(i64), + F64(f64), +} + +impl TryFrom for u8 { + type Error = DecodeError; + fn try_from(value: Number) -> Result { + match value { + Number::U8(val) => Ok(val), + Number::I8(val) => Ok(val as u8), + _ => Err(DecodeError::WrongConversion), + } + } +} + +impl TryFrom for i8 { + type Error = DecodeError; + fn try_from(value: Number) -> Result { + match value { + Number::U8(val) => Ok(val as i8), + Number::I8(val) => Ok(val), + _ => Err(DecodeError::WrongConversion), + } + } +} + +impl TryFrom for u32 { + type Error = DecodeError; + fn try_from(value: Number) -> Result { + match value { + Number::U8(val) => Ok(val as u32), + Number::U32(val) => Ok(val), + _ => Err(DecodeError::WrongConversion), + } + } +} + +impl TryFrom for u64 { + type Error = DecodeError; + fn try_from(value: Number) -> Result { + match value { + Number::U8(val) => Ok(val as u64), + Number::U32(val) => Ok(val as u64), + Number::U64(val) => Ok(val), + _ => Err(DecodeError::WrongConversion), + } + } +} + +impl TryFrom for i64 { + type Error = DecodeError; + fn try_from(value: Number) -> Result { + match value { + Number::U8(val) => Ok(val as i64), + Number::U32(val) => Ok(val as i64), + Number::I8(val) => Ok(val as i64), + Number::I32(val) => Ok(val as i64), + Number::I64(val) => Ok(val), + _ => Err(DecodeError::WrongConversion), + } + } +} + +impl TryFrom for i32 { + type Error = DecodeError; + fn try_from(value: Number) -> Result { + match value { + Number::U8(val) => Ok(val as i32), + Number::I8(val) => Ok(val as i32), + Number::U32(val) => Ok(val as i32), + Number::I32(val) => Ok(val), + _ => Err(DecodeError::WrongConversion), + } + } +} + +impl TryFrom for f64 { + type Error = DecodeError; + fn try_from(value: Number) -> Result { + match value { + Number::U8(val) => Ok(val as f64), + Number::U32(val) => Ok(val as f64), + Number::U64(val) => Ok(val as f64), + Number::I8(val) => Ok(val as f64), + Number::I32(val) => Ok(val as f64), + Number::I64(val) => Ok(val as f64), + Number::F64(val) => Ok(val), + } + } +} + +pub fn read_number(buf: &mut &[u8]) -> Result { + match rmp::decode::read_marker(buf).map_err(|_| DecodeError::WrongFormat)? { + Marker::FixPos(val) => Ok(Number::U8(val)), + Marker::FixNeg(val) => Ok(Number::I8(val)), + Marker::U8 => Ok(Number::U8( + buf.read_data_u8().map_err(|_| DecodeError::IOError)?, + )), + Marker::U16 => Ok(Number::U32( + buf.read_data_u16().map_err(|_| DecodeError::IOError)? as u32, + )), + Marker::U32 => Ok(Number::U32( + buf.read_data_u32().map_err(|_| DecodeError::IOError)?, + )), + Marker::U64 => Ok(Number::U64( + buf.read_data_u64().map_err(|_| DecodeError::IOError)?, + )), + Marker::I8 => Ok(Number::I32( + buf.read_data_i8().map_err(|_| DecodeError::IOError)? as i32, + )), + Marker::I16 => Ok(Number::I32( + buf.read_data_i16().map_err(|_| DecodeError::IOError)? as i32, + )), + Marker::I32 => Ok(Number::I32( + buf.read_data_i32().map_err(|_| DecodeError::IOError)?, + )), + Marker::I64 => Ok(Number::I64( + buf.read_data_i64().map_err(|_| DecodeError::IOError)?, + )), + Marker::F32 => Ok(Number::F64( + buf.read_data_f32().map_err(|_| DecodeError::IOError)? as f64, + )), + Marker::F64 => Ok(Number::F64( + buf.read_data_f64().map_err(|_| DecodeError::IOError)?, + )), + _ => Err(DecodeError::WrongType), + } +} + +#[cfg(test)] +mod tests { + use std::f64; + + use super::*; + + #[test] + fn read_number_success() { + let values = (1, -1_i8, i32::MIN, u32::MAX, i64::MIN, u64::MAX, f64::MAX); + + assert_eq!( + values.0, + TryInto::::try_into( + read_number(&mut rmp_serde::to_vec_named(&values.0).unwrap().as_ref()).unwrap() + ) + .unwrap() + ); + assert_eq!( + values.1, + TryInto::::try_into( + read_number(&mut rmp_serde::to_vec_named(&values.1).unwrap().as_ref()).unwrap() + ) + .unwrap() + ); + assert_eq!( + values.2, + TryInto::::try_into( + read_number(&mut rmp_serde::to_vec_named(&values.2).unwrap().as_ref()).unwrap() + ) + .unwrap() + ); + assert_eq!( + values.3, + TryInto::::try_into( + read_number(&mut rmp_serde::to_vec_named(&values.3).unwrap().as_ref()).unwrap() + ) + .unwrap() + ); + assert_eq!( + values.4, + TryInto::::try_into( + read_number(&mut rmp_serde::to_vec_named(&values.4).unwrap().as_ref()).unwrap() + ) + .unwrap() + ); + assert_eq!( + values.5, + TryInto::::try_into( + read_number(&mut rmp_serde::to_vec_named(&values.5).unwrap().as_ref()).unwrap() + ) + .unwrap() + ); + assert_eq!( + values.6, + TryInto::::try_into( + read_number(&mut rmp_serde::to_vec_named(&values.6).unwrap().as_ref()).unwrap() + ) + .unwrap() + ); + } +} diff --git a/trace-utils/src/tracer_payload.rs b/trace-utils/src/tracer_payload.rs index 5e2be722ef..1c1f3e230f 100644 --- a/trace-utils/src/tracer_payload.rs +++ b/trace-utils/src/tracer_payload.rs @@ -1,11 +1,14 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::trace_utils::{cmp_send_data_payloads, collect_trace_chunks, TracerHeaderTags}; +use crate::{ + msgpack, + trace_utils::{cmp_send_data_payloads, collect_trace_chunks, TracerHeaderTags}, +}; use datadog_trace_protobuf::pb; use std::cmp::Ordering; -type TracerPayloadV04 = Vec; +pub type TracerPayloadV04 = Vec; #[derive(Debug, Clone)] /// Enumerates the different encoding types. @@ -238,10 +241,14 @@ impl<'a, T: TraceChunkProcessor + 'a> TryInto /// Err(e) => println!("Failed to convert: {:?}", e), /// } /// ``` + // Allowing useless_asref because otherwise self should me passed as mutable. + #[allow(clippy::useless_asref)] fn try_into(self) -> Result { match self.encoding_type { TraceEncoding::V04 => { - let traces: Vec> = match rmp_serde::from_slice(self.data) { + // TODO: investigate why as_ref() did the trick. + let data: &mut &[u8] = &mut self.data.as_ref(); + let traces: Vec> = match msgpack::decoder::from_slice(data) { Ok(res) => res, Err(e) => { anyhow::bail!("Error deserializing trace from request body: {e}") @@ -294,6 +301,15 @@ mod tests { }]) } + fn create_trace() -> Vec { + vec![ + // create a root span with metrics + create_test_span(1234, 12341, 0, 1, true), + create_test_span(1234, 12342, 12341, 1, false), + create_test_span(1234, 12343, 12342, 1, false), + ] + } + #[test] fn test_append_traces_v07() { let mut trace = create_dummy_collection_v07(); @@ -357,6 +373,7 @@ mod tests { "error": 0, "meta": {}, "metrics": {}, + "type": "serverless", }]); let expected_serialized_span_data1 = vec![pb::Span { @@ -372,7 +389,7 @@ mod tests { meta: HashMap::new(), metrics: HashMap::new(), meta_struct: HashMap::new(), - r#type: "".to_string(), + r#type: "serverless".to_string(), span_links: vec![], }]; @@ -388,6 +405,7 @@ mod tests { "error": 1, "meta": {}, "metrics": {}, + "type": "", }]); let expected_serialized_span_data2 = vec![pb::Span { @@ -433,4 +451,31 @@ mod tests { panic!("Invalid collection type returned for try_into"); } } + + #[test] + fn test_try_into_meta_metrics_success() { + let dummy_trace = create_trace(); + let expected = vec![dummy_trace.clone()]; + let payload = rmp_serde::to_vec_named(&expected).unwrap(); + let tracer_header_tags = &TracerHeaderTags::default(); + + let result: anyhow::Result = TracerPayloadParams::new( + &payload, + tracer_header_tags, + &mut DefaultTraceChunkProcessor, + false, + TraceEncoding::V04, + ) + .try_into(); + + assert!(result.is_ok()); + + let collection = result.unwrap(); + assert_eq!(1, collection.size()); + if let TracerPayloadCollection::V04(traces) = collection { + assert_eq!(dummy_trace, traces[0]); + } else { + panic!("Invalid collection type returned for try_into"); + } + } } From 32b8fbc091bd30c55a67b64b5e72fe798c2fc6bd Mon Sep 17 00:00:00 2001 From: Edmund Kump Date: Mon, 22 Jul 2024 14:56:51 -0400 Subject: [PATCH 02/10] FIXUP: Address clippy warnings and remove ignores --- trace-utils/src/msgpack/decoder.rs | 16 +++++++--------- trace-utils/src/tracer_payload.rs | 10 ++++++---- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/trace-utils/src/msgpack/decoder.rs b/trace-utils/src/msgpack/decoder.rs index c799121a8c..ffda327ac3 100644 --- a/trace-utils/src/msgpack/decoder.rs +++ b/trace-utils/src/msgpack/decoder.rs @@ -96,13 +96,12 @@ fn read_meta_struct(buf: &mut &[u8]) -> Result>, DecodeE } } -#[allow(clippy::explicit_auto_deref)] fn decode_span_link(buf: &mut &[u8]) -> Result { let mut span = SpanLink::default(); let span_size = rmp::decode::read_map_len(buf).map_err(|_| DecodeError::WrongType)?; for _ in 0..span_size { - let (key, value) = read_string_ref(*buf)?; + let (key, value) = read_string_ref(buf)?; *buf = value; if key == "trace_id" { span.trace_id = read_number(buf)?.try_into()?; @@ -113,7 +112,7 @@ fn decode_span_link(buf: &mut &[u8]) -> Result { } else if key == "attributes" { span.attributes = read_map_strs(buf)?; } else if key == "tracestate" { - let (value, next) = read_string_ref(*buf)?; + let (value, next) = read_string_ref(buf)?; span.tracestate = String::from_str(value).unwrap(); *buf = next; } else if key == "flags" { @@ -141,23 +140,22 @@ fn read_span_links(buf: &mut &[u8]) -> Result, DecodeError> { // Disabling explicit_auto_deref warning because passing buf instead of *buf to read_string_ref // leads to borrow checker errors. -#[allow(clippy::explicit_auto_deref)] fn fill_span(span: &mut Span, buf: &mut &[u8]) -> Result<(), DecodeError> { // field's key won't be held so no need to copy it in a buffer. - let (key, value) = read_string_ref(*buf)?; + let (key, value) = read_string_ref(buf)?; // Go to the value *buf = value; if key == "service" { - let (value, next) = read_string_ref(*buf)?; + let (value, next) = read_string_ref(buf)?; span.service = String::from_str(value).unwrap(); *buf = next; } else if key == "name" { - let (value, next) = read_string_ref(*buf)?; + let (value, next) = read_string_ref(buf)?; span.name = String::from_str(value).unwrap(); *buf = next; } else if key == "resource" { - let (value, next) = read_string_ref(*buf)?; + let (value, next) = read_string_ref(buf)?; span.resource = String::from_str(value).unwrap(); *buf = next; } else if key == "trace_id" { @@ -177,7 +175,7 @@ fn fill_span(span: &mut Span, buf: &mut &[u8]) -> Result<(), DecodeError> { } else if key == "metrics" { span.metrics = read_metrics(buf)?; } else if key == "type" { - let (value, next) = read_string_ref(*buf)?; + let (value, next) = read_string_ref(buf)?; span.r#type = String::from_str(value).unwrap(); *buf = next; } else if key == "meta_struct" { diff --git a/trace-utils/src/tracer_payload.rs b/trace-utils/src/tracer_payload.rs index 1c1f3e230f..db6cd55ebd 100644 --- a/trace-utils/src/tracer_payload.rs +++ b/trace-utils/src/tracer_payload.rs @@ -8,6 +8,7 @@ use crate::{ use datadog_trace_protobuf::pb; use std::cmp::Ordering; +// TODO: EK - Should this really be public? pub type TracerPayloadV04 = Vec; #[derive(Debug, Clone)] @@ -241,13 +242,14 @@ impl<'a, T: TraceChunkProcessor + 'a> TryInto /// Err(e) => println!("Failed to convert: {:?}", e), /// } /// ``` - // Allowing useless_asref because otherwise self should me passed as mutable. - #[allow(clippy::useless_asref)] fn try_into(self) -> Result { match self.encoding_type { TraceEncoding::V04 => { - // TODO: investigate why as_ref() did the trick. - let data: &mut &[u8] = &mut self.data.as_ref(); + // msgpack::decoder::from_slice requires a mutable ref to self.data, so we need to + // create a local copy of the ref to make the ref mutable + let mut data_slice: &[u8] = self.data; + let data: &mut &[u8] = &mut data_slice; + let traces: Vec> = match msgpack::decoder::from_slice(data) { Ok(res) => res, Err(e) => { From 557f1d1e9850f8cd4061f1bdcf4d08fe134f319b Mon Sep 17 00:00:00 2001 From: Edmund Kump Date: Tue, 23 Jul 2024 13:42:36 -0400 Subject: [PATCH 03/10] FIXUP: Refactor decoder to be more readable and maintainable - introduce enums for span_link and span keys and switch from long if statements to matches to codify all possible enumerations. - include trace payload schema version in namespace for decoder. - break up decoder into multiple files to be easier to follow. --- trace-utils/src/lib.rs | 2 +- trace-utils/src/msgpack_decoder/mod.rs | 1 + .../v04/decoder/mod.rs} | 162 ++++-------------- .../src/msgpack_decoder/v04/decoder/span.rs | 107 ++++++++++++ .../msgpack_decoder/v04/decoder/span_link.rs | 71 ++++++++ .../{msgpack => msgpack_decoder/v04}/error.rs | 0 .../{msgpack => msgpack_decoder/v04}/mod.rs | 2 +- .../v04}/number.rs | 2 +- trace-utils/src/tracer_payload.rs | 15 +- 9 files changed, 222 insertions(+), 140 deletions(-) create mode 100644 trace-utils/src/msgpack_decoder/mod.rs rename trace-utils/src/{msgpack/decoder.rs => msgpack_decoder/v04/decoder/mod.rs} (57%) create mode 100644 trace-utils/src/msgpack_decoder/v04/decoder/span.rs create mode 100644 trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs rename trace-utils/src/{msgpack => msgpack_decoder/v04}/error.rs (100%) rename trace-utils/src/{msgpack => msgpack_decoder/v04}/mod.rs (89%) rename trace-utils/src/{msgpack => msgpack_decoder/v04}/number.rs (99%) diff --git a/trace-utils/src/lib.rs b/trace-utils/src/lib.rs index 568b2c5d99..fffe4f8c19 100644 --- a/trace-utils/src/lib.rs +++ b/trace-utils/src/lib.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 pub mod config_utils; -pub mod msgpack; +pub mod msgpack_decoder; pub mod send_data; pub mod stats_utils; #[cfg(any(test, feature = "test-utils"))] diff --git a/trace-utils/src/msgpack_decoder/mod.rs b/trace-utils/src/msgpack_decoder/mod.rs new file mode 100644 index 0000000000..c8a710e920 --- /dev/null +++ b/trace-utils/src/msgpack_decoder/mod.rs @@ -0,0 +1 @@ +pub mod v04; diff --git a/trace-utils/src/msgpack/decoder.rs b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs similarity index 57% rename from trace-utils/src/msgpack/decoder.rs rename to trace-utils/src/msgpack_decoder/v04/decoder/mod.rs index ffda327ac3..ca53b3394f 100644 --- a/trace-utils/src/msgpack/decoder.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs @@ -1,23 +1,46 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::msgpack::error::DecodeError; -use crate::msgpack::number::read_number; +mod span; +mod span_link; + +use self::span::decode_span; +use super::error::DecodeError; +use super::number::read_number; use crate::tracer_payload::TracerPayloadV04; -use datadog_trace_protobuf::pb::{Span, SpanLink}; +use datadog_trace_protobuf::pb::Span; use rmp::{ decode::{read_array_len, RmpRead}, Marker, }; -use std::{collections::HashMap, f64, str::FromStr}; +use std::{collections::HashMap, f64}; + +pub fn from_slice(data: &mut &[u8]) -> Result, DecodeError> { + let trace_count = read_array_len(data).map_err(|_| DecodeError::WrongFormat)?; + + let mut traces: Vec = Default::default(); + + for _ in 0..trace_count { + let span_count = read_array_len(data).unwrap(); + let mut trace: Vec = Default::default(); + + for _ in 0..span_count { + let span = decode_span(data)?; + trace.push(span); + } + traces.push(trace); + } + + Ok(traces) +} #[inline] -pub fn read_string_ref(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { +fn read_string_ref(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { rmp::decode::read_str_from_slice(buf).map_err(|_| DecodeError::WrongFormat) } #[inline] -pub fn read_string(buf: &mut &[u8]) -> Result { +fn read_string(buf: &mut &[u8]) -> Result { let value_len: usize = rmp::decode::read_str_len(buf) .map_err(|_| DecodeError::WrongFormat)? .try_into() @@ -96,135 +119,14 @@ fn read_meta_struct(buf: &mut &[u8]) -> Result>, DecodeE } } -fn decode_span_link(buf: &mut &[u8]) -> Result { - let mut span = SpanLink::default(); - let span_size = rmp::decode::read_map_len(buf).map_err(|_| DecodeError::WrongType)?; - - for _ in 0..span_size { - let (key, value) = read_string_ref(buf)?; - *buf = value; - if key == "trace_id" { - span.trace_id = read_number(buf)?.try_into()?; - } else if key == "trace_id_high" { - span.trace_id_high = read_number(buf)?.try_into()?; - } else if key == "span_id" { - span.span_id = read_number(buf)?.try_into()?; - } else if key == "attributes" { - span.attributes = read_map_strs(buf)?; - } else if key == "tracestate" { - let (value, next) = read_string_ref(buf)?; - span.tracestate = String::from_str(value).unwrap(); - *buf = next; - } else if key == "flags" { - span.flags = read_number(buf)?.try_into()?; - } else { - return Err(DecodeError::WrongFormat); - } - } - - Ok(span) -} - -fn read_span_links(buf: &mut &[u8]) -> Result, DecodeError> { - match rmp::decode::read_marker(buf).map_err(|_| DecodeError::WrongFormat)? { - Marker::FixArray(len) => { - let mut vec: Vec = Vec::new(); - for _ in 0..len { - vec.push(decode_span_link(buf)?); - } - Ok(vec) - } - _ => Err(DecodeError::WrongType), - } -} - -// Disabling explicit_auto_deref warning because passing buf instead of *buf to read_string_ref -// leads to borrow checker errors. -fn fill_span(span: &mut Span, buf: &mut &[u8]) -> Result<(), DecodeError> { - // field's key won't be held so no need to copy it in a buffer. - let (key, value) = read_string_ref(buf)?; - - // Go to the value - *buf = value; - if key == "service" { - let (value, next) = read_string_ref(buf)?; - span.service = String::from_str(value).unwrap(); - *buf = next; - } else if key == "name" { - let (value, next) = read_string_ref(buf)?; - span.name = String::from_str(value).unwrap(); - *buf = next; - } else if key == "resource" { - let (value, next) = read_string_ref(buf)?; - span.resource = String::from_str(value).unwrap(); - *buf = next; - } else if key == "trace_id" { - span.trace_id = read_number(buf)?.try_into()?; - } else if key == "span_id" { - span.span_id = read_number(buf)?.try_into()?; - } else if key == "parent_id" { - span.parent_id = read_number(buf)?.try_into()?; - } else if key == "start" { - span.start = read_number(buf)?.try_into()?; - } else if key == "duration" { - span.duration = read_number(buf)?.try_into()?; - } else if key == "error" { - span.error = read_number(buf)?.try_into()?; - } else if key == "meta" { - span.meta = read_map_strs(buf)?; - } else if key == "metrics" { - span.metrics = read_metrics(buf)?; - } else if key == "type" { - let (value, next) = read_string_ref(buf)?; - span.r#type = String::from_str(value).unwrap(); - *buf = next; - } else if key == "meta_struct" { - span.meta_struct = read_meta_struct(buf)?; - } else if key == "span_links" { - span.span_links = read_span_links(buf)?; - } else { - return Err(DecodeError::WrongFormat); - } - Ok(()) -} - -#[inline] -fn decode_span_v04(buf: &mut &[u8]) -> Result { - let mut span = Span::default(); - - let span_size = rmp::decode::read_map_len(buf).unwrap(); - - for _ in 0..span_size { - fill_span(&mut span, buf)?; - } - Ok(span) -} - -pub fn from_slice(data: &mut &[u8]) -> Result, DecodeError> { - let trace_count = read_array_len(data).map_err(|_| DecodeError::WrongFormat)?; - - let mut traces: Vec = Default::default(); - - for _ in 0..trace_count { - let span_count = read_array_len(data).unwrap(); - let mut trace: Vec = Default::default(); - - for _ in 0..span_count { - let span = decode_span_v04(data)?; - trace.push(span); - } - traces.push(trace); - } - - Ok(traces) -} - #[cfg(test)] mod tests { use super::*; + use crate::msgpack_decoder::v04::decoder::span_link::read_span_links; + use datadog_trace_protobuf::pb::SpanLink; #[test] - fn decoder_read_string_succes() { + fn decoder_read_string_success() { let expected = "foobar".to_string(); let payload = rmp_serde::to_vec(&expected).unwrap(); diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span.rs new file mode 100644 index 0000000000..9d77fa98c7 --- /dev/null +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span.rs @@ -0,0 +1,107 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use super::{ + read_map_strs, read_meta_struct, read_metrics, read_string_ref, span_link::read_span_links, +}; +use crate::msgpack_decoder::v04::error::DecodeError; +use crate::msgpack_decoder::v04::number::read_number; +use datadog_trace_protobuf::pb::Span; +use std::str::FromStr; + +#[inline] +pub(crate) fn decode_span(buf: &mut &[u8]) -> Result { + let mut span = Span::default(); + + let span_size = rmp::decode::read_map_len(buf).unwrap(); + + for _ in 0..span_size { + fill_span(&mut span, buf)?; + } + Ok(span) +} + +enum SpanKey { + Service, + Name, + Resource, + TraceId, + SpanId, + ParentId, + Start, + Duration, + Error, + Meta, + Metrics, + Type, + MetaStruct, + SpanLinks, +} + +impl FromStr for SpanKey { + type Err = DecodeError; + + fn from_str(s: &str) -> Result { + match s { + "service" => Ok(SpanKey::Service), + "name" => Ok(SpanKey::Name), + "resource" => Ok(SpanKey::Resource), + "trace_id" => Ok(SpanKey::TraceId), + "span_id" => Ok(SpanKey::SpanId), + "parent_id" => Ok(SpanKey::ParentId), + "start" => Ok(SpanKey::Start), + "duration" => Ok(SpanKey::Duration), + "error" => Ok(SpanKey::Error), + "meta" => Ok(SpanKey::Meta), + "metrics" => Ok(SpanKey::Metrics), + "type" => Ok(SpanKey::Type), + "meta_struct" => Ok(SpanKey::MetaStruct), + "span_links" => Ok(SpanKey::SpanLinks), + _ => Err(DecodeError::WrongFormat), + } + } +} + +fn fill_span(span: &mut Span, buf: &mut &[u8]) -> Result<(), DecodeError> { + // field's key won't be held so no need to copy it in a buffer. + let (key, value) = read_string_ref(buf)?; + + // Go to the value + *buf = value; + + let key = key.parse::()?; + + match key { + SpanKey::Service => { + let (value, next) = read_string_ref(buf)?; + span.service = String::from_str(value).unwrap(); + *buf = next; + } + SpanKey::Name => { + let (value, next) = read_string_ref(buf)?; + span.name = String::from_str(value).unwrap(); + *buf = next; + } + SpanKey::Resource => { + let (value, next) = read_string_ref(buf)?; + span.resource = String::from_str(value).unwrap(); + *buf = next; + } + SpanKey::TraceId => span.trace_id = read_number(buf)?.try_into()?, + SpanKey::SpanId => span.span_id = read_number(buf)?.try_into()?, + SpanKey::ParentId => span.parent_id = read_number(buf)?.try_into()?, + SpanKey::Start => span.start = read_number(buf)?.try_into()?, + SpanKey::Duration => span.duration = read_number(buf)?.try_into()?, + SpanKey::Error => span.error = read_number(buf)?.try_into()?, + SpanKey::Meta => span.meta = read_map_strs(buf)?, + SpanKey::Metrics => span.metrics = read_metrics(buf)?, + SpanKey::Type => { + let (value, next) = read_string_ref(buf)?; + span.r#type = String::from_str(value).unwrap(); + *buf = next; + } + SpanKey::MetaStruct => span.meta_struct = read_meta_struct(buf)?, + SpanKey::SpanLinks => span.span_links = read_span_links(buf)?, + } + Ok(()) +} diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs new file mode 100644 index 0000000000..43e77e7bed --- /dev/null +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs @@ -0,0 +1,71 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::msgpack_decoder::v04::decoder::{read_map_strs, read_string_ref}; +use crate::msgpack_decoder::v04::error::DecodeError; +use crate::msgpack_decoder::v04::number::read_number; +use datadog_trace_protobuf::pb::SpanLink; +use rmp::Marker; +use std::str::FromStr; + +pub(crate) fn read_span_links(buf: &mut &[u8]) -> Result, DecodeError> { + match rmp::decode::read_marker(buf).map_err(|_| DecodeError::WrongFormat)? { + Marker::FixArray(len) => { + let mut vec: Vec = Vec::new(); + for _ in 0..len { + vec.push(decode_span_link(buf)?); + } + Ok(vec) + } + _ => Err(DecodeError::WrongType), + } +} +enum SpanLinkKey { + TraceId, + TraceIdHigh, + SpanId, + Attributes, + Tracestate, + Flags, +} +impl FromStr for SpanLinkKey { + type Err = DecodeError; + + fn from_str(s: &str) -> Result { + match s { + "trace_id" => Ok(SpanLinkKey::TraceId), + "trace_id_high" => Ok(SpanLinkKey::TraceIdHigh), + "span_id" => Ok(SpanLinkKey::SpanId), + "attributes" => Ok(SpanLinkKey::Attributes), + "tracestate" => Ok(SpanLinkKey::Tracestate), + "flags" => Ok(SpanLinkKey::Flags), + _ => Err(DecodeError::WrongFormat), + } + } +} + +fn decode_span_link(buf: &mut &[u8]) -> Result { + let mut span = SpanLink::default(); + let span_size = rmp::decode::read_map_len(buf).map_err(|_| DecodeError::WrongType)?; + + for _ in 0..span_size { + let (key, value) = read_string_ref(buf)?; + *buf = value; + let key = key.parse::()?; + + match key { + SpanLinkKey::TraceId => span.trace_id = read_number(buf)?.try_into()?, + SpanLinkKey::TraceIdHigh => span.trace_id_high = read_number(buf)?.try_into()?, + SpanLinkKey::SpanId => span.span_id = read_number(buf)?.try_into()?, + SpanLinkKey::Attributes => span.attributes = read_map_strs(buf)?, + SpanLinkKey::Tracestate => { + let (value, next) = read_string_ref(buf)?; + span.tracestate = String::from_str(value).unwrap(); + *buf = next; + } + SpanLinkKey::Flags => span.flags = read_number(buf)?.try_into()?, + } + } + + Ok(span) +} diff --git a/trace-utils/src/msgpack/error.rs b/trace-utils/src/msgpack_decoder/v04/error.rs similarity index 100% rename from trace-utils/src/msgpack/error.rs rename to trace-utils/src/msgpack_decoder/v04/error.rs diff --git a/trace-utils/src/msgpack/mod.rs b/trace-utils/src/msgpack_decoder/v04/mod.rs similarity index 89% rename from trace-utils/src/msgpack/mod.rs rename to trace-utils/src/msgpack_decoder/v04/mod.rs index bdbadd27e9..5b789e7c18 100644 --- a/trace-utils/src/msgpack/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/mod.rs @@ -3,4 +3,4 @@ pub mod decoder; pub mod error; -mod number; +pub mod number; diff --git a/trace-utils/src/msgpack/number.rs b/trace-utils/src/msgpack_decoder/v04/number.rs similarity index 99% rename from trace-utils/src/msgpack/number.rs rename to trace-utils/src/msgpack_decoder/v04/number.rs index 3db839bf62..277012f2c3 100644 --- a/trace-utils/src/msgpack/number.rs +++ b/trace-utils/src/msgpack_decoder/v04/number.rs @@ -1,7 +1,7 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::msgpack::error::DecodeError; +use super::error::DecodeError; use rmp::{decode::RmpRead, Marker}; pub enum Number { diff --git a/trace-utils/src/tracer_payload.rs b/trace-utils/src/tracer_payload.rs index db6cd55ebd..5fddbd63f1 100644 --- a/trace-utils/src/tracer_payload.rs +++ b/trace-utils/src/tracer_payload.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - msgpack, + msgpack_decoder, trace_utils::{cmp_send_data_payloads, collect_trace_chunks, TracerHeaderTags}, }; use datadog_trace_protobuf::pb; @@ -250,12 +250,13 @@ impl<'a, T: TraceChunkProcessor + 'a> TryInto let mut data_slice: &[u8] = self.data; let data: &mut &[u8] = &mut data_slice; - let traces: Vec> = match msgpack::decoder::from_slice(data) { - Ok(res) => res, - Err(e) => { - anyhow::bail!("Error deserializing trace from request body: {e}") - } - }; + let traces: Vec> = + match msgpack_decoder::v04::decoder::from_slice(data) { + Ok(res) => res, + Err(e) => { + anyhow::bail!("Error deserializing trace from request body: {e}") + } + }; if traces.is_empty() { anyhow::bail!("No traces deserialized from the request body."); From 7fd3d7e79a3a6012ecd2b53f63e35d23603600ae Mon Sep 17 00:00:00 2001 From: Edmund Kump Date: Thu, 25 Jul 2024 16:44:13 -0400 Subject: [PATCH 04/10] improve messages in decoding errors and fix tests to use public functions --- trace-utils/src/msgpack_decoder/mod.rs | 3 + .../src/msgpack_decoder/v04/decoder/mod.rs | 315 +++++++++++++++--- .../src/msgpack_decoder/v04/decoder/span.rs | 58 +++- .../msgpack_decoder/v04/decoder/span_link.rs | 72 +++- trace-utils/src/msgpack_decoder/v04/error.rs | 16 +- trace-utils/src/msgpack_decoder/v04/number.rs | 50 ++- trace-utils/src/tracer_payload.rs | 1 - 7 files changed, 436 insertions(+), 79 deletions(-) diff --git a/trace-utils/src/msgpack_decoder/mod.rs b/trace-utils/src/msgpack_decoder/mod.rs index c8a710e920..f4e980a0ae 100644 --- a/trace-utils/src/msgpack_decoder/mod.rs +++ b/trace-utils/src/msgpack_decoder/mod.rs @@ -1 +1,4 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + pub mod v04; diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs index ca53b3394f..9e6cda4e1e 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs @@ -9,19 +9,70 @@ use super::error::DecodeError; use super::number::read_number; use crate::tracer_payload::TracerPayloadV04; use datadog_trace_protobuf::pb::Span; +use rmp::decode::DecodeStringError; use rmp::{ + decode, decode::{read_array_len, RmpRead}, Marker, }; use std::{collections::HashMap, f64}; +/// Decodes a slice of bytes into a vector of `TracerPayloadV04` objects. +/// +/// +/// +/// # Arguments +/// +/// * `data` - A mutable reference to a slice of bytes containing the encoded data.Bytes are +/// expected to be encoded msgpack data containing a list of a list of v04 spans. +/// +/// # Returns +/// +/// * `Ok(Vec)` - A vector of decoded `TracerPayloadV04` objects if successful. +/// * `Err(DecodeError)` - An error if the decoding process fails. +/// +/// # Errors +/// +/// This function will return an error if: +/// - The array length for trace count or span count cannot be read. +/// - Any span cannot be decoded. +/// +/// # Examples +/// +/// ``` +/// use datadog_trace_protobuf::pb::Span; +/// use datadog_trace_utils::msgpack_decoder::v04::decoder::from_slice; +/// use rmp_serde::to_vec_named; +/// +/// let span = Span { +/// name: "test-span".to_owned(), +/// ..Default::default() +/// }; +/// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); +/// let decoded_traces = from_slice(&mut encoded_data.as_slice()).expect("Decoding failed"); +/// +/// assert_eq!(1, decoded_traces.len()); +/// assert_eq!(1, decoded_traces[0].len()); +/// let decoded_span = &decoded_traces[0][0]; +/// assert_eq!("test-span", decoded_span.name); +/// ``` pub fn from_slice(data: &mut &[u8]) -> Result, DecodeError> { - let trace_count = read_array_len(data).map_err(|_| DecodeError::WrongFormat)?; + let trace_count = read_array_len(data).map_err(|_| { + DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned()) + })?; let mut traces: Vec = Default::default(); for _ in 0..trace_count { - let span_count = read_array_len(data).unwrap(); + let span_count = match read_array_len(data) { + Ok(count) => count, + Err(_) => { + return Err(DecodeError::InvalidFormat( + "Unable to read array len for span count".to_owned(), + )) + } + }; + let mut trace: Vec = Default::default(); for _ in 0..span_count { @@ -36,21 +87,31 @@ pub fn from_slice(data: &mut &[u8]) -> Result, DecodeError #[inline] fn read_string_ref(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { - rmp::decode::read_str_from_slice(buf).map_err(|_| DecodeError::WrongFormat) + decode::read_str_from_slice(buf).map_err(|e| match e { + DecodeStringError::InvalidMarkerRead(e) => DecodeError::InvalidFormat(e.to_string()), + DecodeStringError::InvalidDataRead(e) => DecodeError::InvalidConversion(e.to_string()), + DecodeStringError::TypeMismatch(marker) => { + DecodeError::InvalidType(format!("Type mismatch at marker {:?}", marker)) + } + DecodeStringError::InvalidUtf8(_, e) => DecodeError::Utf8Error(e.to_string()), + _ => DecodeError::IOError, + }) } #[inline] fn read_string(buf: &mut &[u8]) -> Result { - let value_len: usize = rmp::decode::read_str_len(buf) - .map_err(|_| DecodeError::WrongFormat)? + let value_len: usize = decode::read_str_len(buf) + .map_err(|e| DecodeError::InvalidFormat(e.to_string()))? .try_into() - .map_err(|_| DecodeError::WrongConversion)?; + .map_err(|_| { + DecodeError::InvalidConversion("unable to get len of string buffer".to_owned()) + })?; let mut vec = vec![0; value_len]; buf.read_exact_buf(vec.as_mut_slice()) .map_err(|_| DecodeError::IOError)?; - let str = String::from_utf8(vec).map_err(|_| DecodeError::Utf8Error)?; + let str = String::from_utf8(vec).map_err(|e| DecodeError::Utf8Error(e.to_string()))?; Ok(str) } @@ -71,7 +132,9 @@ fn read_metric_pair(buf: &mut &[u8]) -> Result<(String, f64), DecodeError> { } fn read_map_strs(buf: &mut &[u8]) -> Result, DecodeError> { - match rmp::decode::read_marker(buf).map_err(|_| DecodeError::WrongFormat)? { + match decode::read_marker(buf) + .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for map".to_owned()))? + { Marker::FixMap(len) => { let mut map = HashMap::new(); for _ in 0..len { @@ -80,12 +143,16 @@ fn read_map_strs(buf: &mut &[u8]) -> Result, DecodeError } Ok(map) } - _ => Err(DecodeError::WrongType), + _ => Err(DecodeError::InvalidType( + "Unable to read map from buffer".to_owned(), + )), } } fn read_metrics(buf: &mut &[u8]) -> Result, DecodeError> { - match rmp::decode::read_marker(buf).map_err(|_| DecodeError::WrongFormat)? { + match decode::read_marker(buf) + .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for metrics".to_owned()))? + { Marker::FixMap(len) => { let mut metrics = HashMap::new(); for _ in 0..len { @@ -94,19 +161,26 @@ fn read_metrics(buf: &mut &[u8]) -> Result, DecodeError> { } Ok(metrics) } - _ => Err(DecodeError::WrongType), + _ => Err(DecodeError::InvalidType( + "Unable to read metrics from buffer".to_owned(), + )), } } fn read_meta_struct(buf: &mut &[u8]) -> Result>, DecodeError> { - match rmp::decode::read_marker(buf).map_err(|_| DecodeError::WrongFormat)? { + match decode::read_marker(buf).map_err(|_| { + DecodeError::InvalidFormat("Unable to read marker for meta_struct".to_owned()) + })? { Marker::FixMap(len) => { let mut meta_struct = HashMap::new(); for _ in 0..len { let k = read_string(buf)?; let mut v = vec![]; - let array_len = - rmp::decode::read_array_len(buf).map_err(|_| DecodeError::WrongFormat)?; + let array_len = decode::read_array_len(buf).map_err(|_| { + DecodeError::InvalidFormat( + "Unable to read array len for meta_struct".to_owned(), + ) + })?; for _ in 0..array_len { let value = read_number(buf)?.try_into()?; v.push(value); @@ -115,48 +189,89 @@ fn read_meta_struct(buf: &mut &[u8]) -> Result>, DecodeE } Ok(meta_struct) } - _ => Err(DecodeError::WrongType), + _ => Err(DecodeError::InvalidType( + "Unable to read meta_struct from buffer".to_owned(), + )), } } #[cfg(test)] mod tests { use super::*; - use crate::msgpack_decoder::v04::decoder::span_link::read_span_links; use datadog_trace_protobuf::pb::SpanLink; #[test] fn decoder_read_string_success() { - let expected = "foobar".to_string(); - let payload = rmp_serde::to_vec(&expected).unwrap(); - - assert_eq!(expected, read_string(&mut payload.as_ref()).unwrap()); + let expected_string = "test-service-name"; + let span = Span { + name: expected_string.to_owned(), + ..Default::default() + }; + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let decoded_traces = from_slice(&mut encoded_data.as_slice()).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + assert_eq!(expected_string, decoded_span.name); } #[test] - fn decoder_read_string_wrong_format() { - let input: [u8; 2] = [255; 2]; + fn test_decoder_meta_struct_success() { + let expected_meta_struct = HashMap::from([ + ("key1".to_string(), vec![1, 2, 3]), + ("key2".to_string(), vec![4, 5, 6]), + ]); + let span = Span { + meta_struct: expected_meta_struct.clone(), + ..Default::default() + }; + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let decoded_traces = from_slice(&mut encoded_data.as_slice()).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + assert_eq!(expected_meta_struct, decoded_span.meta_struct); + } - assert_eq!( - Err(DecodeError::WrongFormat), - read_string(&mut input.as_ref()) - ); + #[test] + fn test_decoder_meta_success() { + let expected_meta = HashMap::from([ + ("key1".to_string(), "value1".to_string()), + ("key2".to_string(), "value2".to_string()), + ]); + let span = Span { + meta: expected_meta.clone(), + ..Default::default() + }; + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let decoded_traces = from_slice(&mut encoded_data.as_slice()).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + assert_eq!(expected_meta, decoded_span.meta); } #[test] - fn decoder_read_string_utf8_error() { - let invalid_seq = vec![0, 159, 146, 150]; - let str = unsafe { String::from_utf8_unchecked(invalid_seq) }; - let payload = rmp_serde::to_vec(&str).unwrap(); - assert_eq!( - Err(DecodeError::Utf8Error), - read_string(&mut payload.as_ref()) - ); + fn test_decoder_metrics_success() { + let mut span = Span::default(); + let expected_metrics = + HashMap::from([("metric1".to_string(), 1.23), ("metric2".to_string(), 4.56)]); + span.metrics = expected_metrics.clone(); + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let decoded_traces = from_slice(&mut encoded_data.as_slice()).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + assert_eq!(expected_metrics, decoded_span.metrics); } #[test] - fn decoder_span_link_success() { - let span_links = vec![SpanLink { + fn test_decoder_span_link_success() { + let expected_span_links = vec![SpanLink { trace_id: 1, trace_id_high: 0, span_id: 1, @@ -168,35 +283,125 @@ mod tests { flags: 0b101, }]; - let payload = rmp_serde::to_vec_named(&span_links).unwrap(); - - assert_eq!(span_links, read_span_links(&mut payload.as_ref()).unwrap()) + let span = Span { + span_links: expected_span_links.clone(), + ..Default::default() + }; + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let decoded_traces = + from_slice(&mut encoded_data.as_slice()).expect("unable to decode span"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + assert_eq!(expected_span_links, decoded_span.span_links); } #[test] - fn decoder_meta_struct_success() { - let meta_struct = HashMap::from([ - ("key".to_string(), vec![1, 2, 3]), - ("key2".to_string(), vec![4, 5, 6]), - ]); + fn test_decoder_read_string_wrong_format() { + let span = Span { + service: "my_service".to_owned(), + ..Default::default() + }; + let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + // This changes the map size from 11 to 12 to trigger an InvalidMarkerRead error. + encoded_data[2] = 0x8c; + + let result = from_slice(&mut encoded_data.as_slice()); + assert_eq!( + Err(DecodeError::InvalidFormat( + "Expected at least bytes 1, but only got 0 (pos 0)".to_owned() + )), + result + ); + } - let payload = rmp_serde::to_vec_named(&meta_struct).unwrap(); + #[test] + fn test_decoder_read_string_utf8_error() { + let invalid_seq = vec![0, 159, 146, 150]; + let invalid_str = unsafe { String::from_utf8_unchecked(invalid_seq) }; + let span = Span { + name: invalid_str.to_owned(), + ..Default::default() + }; + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + + let result = from_slice(&mut encoded_data.as_slice()); + assert_eq!( + Err(DecodeError::Utf8Error( + "invalid utf-8 sequence of 1 bytes from index 1".to_owned() + )), + result + ); + } + #[test] + fn test_decoder_invalid_marker_for_trace_count_read() { + let span = Span::default(); + let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + // This changes the entire payload to a map with 12 keys in order to trigger an error when + // reading the array len of traces + encoded_data[0] = 0x8c; + + let result = from_slice(&mut encoded_data.as_ref()); assert_eq!( - meta_struct, - read_meta_struct(&mut payload.as_ref()).unwrap() - ) + Err(DecodeError::InvalidFormat( + "Unable to read array len for trace count".to_string() + )), + result + ); } #[test] - fn decoder_meta_success() { - let meta = HashMap::from([ - ("key1".to_string(), "value1".to_string()), - ("key2".to_string(), "value2".to_string()), - ]); + fn test_decoder_invalid_marker_for_span_count_read() { + let span = Span::default(); + let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + // This changes the entire payload to a map with 12 keys in order to trigger an error when + // reading the array len of spans + encoded_data[1] = 0x8c; + + let result = from_slice(&mut encoded_data.as_ref()); + assert_eq!( + Err(DecodeError::InvalidFormat( + "Unable to read array len for span count".to_owned() + )), + result + ); + } - let payload = rmp_serde::to_vec_named(&meta).unwrap(); + #[test] + fn test_decoder_read_string_invalid_data_read() { + let span = Span { + name: "test-span".to_owned(), + ..Default::default() + }; + let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + // This changes the marker for the empty metrics map to a str8 marker + encoded_data[104] = 0xD9; + + let result = from_slice(&mut encoded_data.as_slice()); + assert_eq!( + Err(DecodeError::InvalidConversion( + "Expected at least bytes 1, but only got 0 (pos 1)".to_owned() + )), + result + ); + } - assert_eq!(meta, read_map_strs(&mut payload.as_ref()).unwrap()) + #[test] + fn test_decoder_read_string_type_mismatch() { + let span = Span::default(); + let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + // Modify the encoded data to cause a type mismatch by changing the marker for the `name` + // field to an integer marker + encoded_data[3] = 0x01; + + let result = from_slice(&mut encoded_data.as_slice()); + assert_eq!( + Err(DecodeError::InvalidType( + "Type mismatch at marker FixPos(1)".to_owned() + )), + result + ); } } diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span.rs index 9d77fa98c7..49ca4e9858 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span.rs @@ -9,11 +9,28 @@ use crate::msgpack_decoder::v04::number::read_number; use datadog_trace_protobuf::pb::Span; use std::str::FromStr; +/// Decodes a slice of bytes into a `Span` object. +/// +/// # Arguments +/// +/// * `buf` - A mutable reference to a slice of bytes containing the encoded data. +/// +/// # Returns +/// +/// * `Ok(Span)` - A decoded `Span` object if successful. +/// * `Err(DecodeError)` - An error if the decoding process fails. +/// +/// # Errors +/// +/// This function will return an error if: +/// - The map length cannot be read. +/// - Any key or value cannot be decoded. #[inline] pub(crate) fn decode_span(buf: &mut &[u8]) -> Result { let mut span = Span::default(); - - let span_size = rmp::decode::read_map_len(buf).unwrap(); + let span_size = rmp::decode::read_map_len(buf).map_err(|_| { + DecodeError::InvalidFormat("Unable to get map len for span size".to_owned()) + })?; for _ in 0..span_size { fill_span(&mut span, buf)?; @@ -21,6 +38,7 @@ pub(crate) fn decode_span(buf: &mut &[u8]) -> Result { Ok(span) } +#[derive(Debug, PartialEq)] enum SpanKey { Service, Name, @@ -57,7 +75,9 @@ impl FromStr for SpanKey { "type" => Ok(SpanKey::Type), "meta_struct" => Ok(SpanKey::MetaStruct), "span_links" => Ok(SpanKey::SpanLinks), - _ => Err(DecodeError::WrongFormat), + _ => Err(DecodeError::InvalidFormat( + format!("Invalid span key: {}", s).to_owned(), + )), } } } @@ -105,3 +125,35 @@ fn fill_span(span: &mut Span, buf: &mut &[u8]) -> Result<(), DecodeError> { } Ok(()) } +#[cfg(test)] +mod tests { + use super::SpanKey; + use crate::msgpack_decoder::v04::error::DecodeError; + use std::str::FromStr; + + #[test] + fn test_span_key_from_str() { + assert_eq!(SpanKey::from_str("service").unwrap(), SpanKey::Service); + assert_eq!(SpanKey::from_str("name").unwrap(), SpanKey::Name); + assert_eq!(SpanKey::from_str("resource").unwrap(), SpanKey::Resource); + assert_eq!(SpanKey::from_str("trace_id").unwrap(), SpanKey::TraceId); + assert_eq!(SpanKey::from_str("span_id").unwrap(), SpanKey::SpanId); + assert_eq!(SpanKey::from_str("parent_id").unwrap(), SpanKey::ParentId); + assert_eq!(SpanKey::from_str("start").unwrap(), SpanKey::Start); + assert_eq!(SpanKey::from_str("duration").unwrap(), SpanKey::Duration); + assert_eq!(SpanKey::from_str("error").unwrap(), SpanKey::Error); + assert_eq!(SpanKey::from_str("meta").unwrap(), SpanKey::Meta); + assert_eq!(SpanKey::from_str("metrics").unwrap(), SpanKey::Metrics); + assert_eq!(SpanKey::from_str("type").unwrap(), SpanKey::Type); + assert_eq!( + SpanKey::from_str("meta_struct").unwrap(), + SpanKey::MetaStruct + ); + assert_eq!(SpanKey::from_str("span_links").unwrap(), SpanKey::SpanLinks); + + assert!(matches!( + SpanKey::from_str("invalid_key"), + Err(DecodeError::InvalidFormat(_)) + )); + } +} diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs index 43e77e7bed..b490d7626e 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs @@ -8,8 +8,27 @@ use datadog_trace_protobuf::pb::SpanLink; use rmp::Marker; use std::str::FromStr; +/// Reads a slice of bytes and decodes it into a vector of `SpanLink` objects. +/// +/// # Arguments +/// +/// * `buf` - A mutable reference to a slice of bytes containing the encoded data. +/// +/// # Returns +/// +/// * `Ok(Vec)` - A vector of decoded `SpanLink` objects if successful. +/// * `Err(DecodeError)` - An error if the decoding process fails. +/// +/// # Errors +/// +/// This function will return an error if: +/// - The marker for the array length cannot be read. +/// - Any `SpanLink` cannot be decoded. +/// ``` pub(crate) fn read_span_links(buf: &mut &[u8]) -> Result, DecodeError> { - match rmp::decode::read_marker(buf).map_err(|_| DecodeError::WrongFormat)? { + match rmp::decode::read_marker(buf).map_err(|_| { + DecodeError::InvalidFormat("Unable to read marker for span links".to_owned()) + })? { Marker::FixArray(len) => { let mut vec: Vec = Vec::new(); for _ in 0..len { @@ -17,9 +36,12 @@ pub(crate) fn read_span_links(buf: &mut &[u8]) -> Result, DecodeEr } Ok(vec) } - _ => Err(DecodeError::WrongType), + _ => Err(DecodeError::InvalidType( + "Unable to read span link from buffer".to_owned(), + )), } } +#[derive(Debug, PartialEq)] enum SpanLinkKey { TraceId, TraceIdHigh, @@ -39,14 +61,17 @@ impl FromStr for SpanLinkKey { "attributes" => Ok(SpanLinkKey::Attributes), "tracestate" => Ok(SpanLinkKey::Tracestate), "flags" => Ok(SpanLinkKey::Flags), - _ => Err(DecodeError::WrongFormat), + _ => Err(DecodeError::InvalidFormat( + format!("Invalid span link key: {}", s).to_owned(), + )), } } } fn decode_span_link(buf: &mut &[u8]) -> Result { let mut span = SpanLink::default(); - let span_size = rmp::decode::read_map_len(buf).map_err(|_| DecodeError::WrongType)?; + let span_size = rmp::decode::read_map_len(buf) + .map_err(|_| DecodeError::InvalidType("Unable to get map len for span size".to_owned()))?; for _ in 0..span_size { let (key, value) = read_string_ref(buf)?; @@ -69,3 +94,42 @@ fn decode_span_link(buf: &mut &[u8]) -> Result { Ok(span) } + +#[cfg(test)] +mod tests { + use super::SpanLinkKey; + use crate::msgpack_decoder::v04::error::DecodeError; + use std::str::FromStr; + + #[test] + fn test_span_link_key_from_str() { + // Valid cases + assert_eq!( + SpanLinkKey::from_str("trace_id").unwrap(), + SpanLinkKey::TraceId + ); + assert_eq!( + SpanLinkKey::from_str("trace_id_high").unwrap(), + SpanLinkKey::TraceIdHigh + ); + assert_eq!( + SpanLinkKey::from_str("span_id").unwrap(), + SpanLinkKey::SpanId + ); + assert_eq!( + SpanLinkKey::from_str("attributes").unwrap(), + SpanLinkKey::Attributes + ); + assert_eq!( + SpanLinkKey::from_str("tracestate").unwrap(), + SpanLinkKey::Tracestate + ); + assert_eq!(SpanLinkKey::from_str("flags").unwrap(), SpanLinkKey::Flags); + + // Invalid case + assert!(matches!( + SpanLinkKey::from_str("invalid_key"), + Err(DecodeError::InvalidFormat(_)) + )); + } +} diff --git a/trace-utils/src/msgpack_decoder/v04/error.rs b/trace-utils/src/msgpack_decoder/v04/error.rs index d66531b81b..ff74819f57 100644 --- a/trace-utils/src/msgpack_decoder/v04/error.rs +++ b/trace-utils/src/msgpack_decoder/v04/error.rs @@ -3,21 +3,21 @@ #[derive(Debug, PartialEq)] pub enum DecodeError { - WrongConversion, - WrongType, - WrongFormat, + InvalidConversion(String), + InvalidType(String), + InvalidFormat(String), IOError, - Utf8Error, + Utf8Error(String), } impl std::fmt::Display for DecodeError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - DecodeError::WrongConversion => write!(f, "Failed to cast value"), + DecodeError::InvalidConversion(msg) => write!(f, "Failed to convert value: {}", msg), DecodeError::IOError => write!(f, "Failed to read from buffer"), - DecodeError::WrongType => write!(f, "Invalid type encountered"), - DecodeError::WrongFormat => write!(f, "Invalid format"), - DecodeError::Utf8Error => write!(f, "Failed to read utf8 value"), + DecodeError::InvalidType(msg) => write!(f, "Invalid type encountered: {}", msg), + DecodeError::InvalidFormat(msg) => write!(f, "Invalid format: {}", msg), + DecodeError::Utf8Error(msg) => write!(f, "Failed to read utf8 value: {}", msg), } } } diff --git a/trace-utils/src/msgpack_decoder/v04/number.rs b/trace-utils/src/msgpack_decoder/v04/number.rs index 277012f2c3..7f62b18af5 100644 --- a/trace-utils/src/msgpack_decoder/v04/number.rs +++ b/trace-utils/src/msgpack_decoder/v04/number.rs @@ -3,6 +3,7 @@ use super::error::DecodeError; use rmp::{decode::RmpRead, Marker}; +use std::fmt; pub enum Number { U8(u8), @@ -13,6 +14,19 @@ pub enum Number { I64(i64), F64(f64), } +impl fmt::Display for Number { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Number::U8(val) => write!(f, "{}", val), + Number::U32(val) => write!(f, "{}", val), + Number::U64(val) => write!(f, "{}", val), + Number::I8(val) => write!(f, "{}", val), + Number::I32(val) => write!(f, "{}", val), + Number::I64(val) => write!(f, "{}", val), + Number::F64(val) => write!(f, "{}", val), + } + } +} impl TryFrom for u8 { type Error = DecodeError; @@ -20,7 +34,10 @@ impl TryFrom for u8 { match value { Number::U8(val) => Ok(val), Number::I8(val) => Ok(val as u8), - _ => Err(DecodeError::WrongConversion), + _ => Err(DecodeError::InvalidConversion(format!( + "unable to convert {} to u8", + value + ))), } } } @@ -31,7 +48,10 @@ impl TryFrom for i8 { match value { Number::U8(val) => Ok(val as i8), Number::I8(val) => Ok(val), - _ => Err(DecodeError::WrongConversion), + _ => Err(DecodeError::InvalidConversion(format!( + "unable to convert {} to i8", + value + ))), } } } @@ -42,7 +62,10 @@ impl TryFrom for u32 { match value { Number::U8(val) => Ok(val as u32), Number::U32(val) => Ok(val), - _ => Err(DecodeError::WrongConversion), + _ => Err(DecodeError::InvalidConversion(format!( + "unable to convert {} to u32", + value + ))), } } } @@ -54,7 +77,10 @@ impl TryFrom for u64 { Number::U8(val) => Ok(val as u64), Number::U32(val) => Ok(val as u64), Number::U64(val) => Ok(val), - _ => Err(DecodeError::WrongConversion), + _ => Err(DecodeError::InvalidConversion(format!( + "unable to convert {} to u64", + value + ))), } } } @@ -68,7 +94,10 @@ impl TryFrom for i64 { Number::I8(val) => Ok(val as i64), Number::I32(val) => Ok(val as i64), Number::I64(val) => Ok(val), - _ => Err(DecodeError::WrongConversion), + _ => Err(DecodeError::InvalidConversion(format!( + "unable to convert {} to i64", + value + ))), } } } @@ -81,7 +110,10 @@ impl TryFrom for i32 { Number::I8(val) => Ok(val as i32), Number::U32(val) => Ok(val as i32), Number::I32(val) => Ok(val), - _ => Err(DecodeError::WrongConversion), + _ => Err(DecodeError::InvalidConversion(format!( + "unable to convert {} to i32", + value + ))), } } } @@ -102,7 +134,9 @@ impl TryFrom for f64 { } pub fn read_number(buf: &mut &[u8]) -> Result { - match rmp::decode::read_marker(buf).map_err(|_| DecodeError::WrongFormat)? { + match rmp::decode::read_marker(buf) + .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for number".to_owned()))? + { Marker::FixPos(val) => Ok(Number::U8(val)), Marker::FixNeg(val) => Ok(Number::I8(val)), Marker::U8 => Ok(Number::U8( @@ -135,7 +169,7 @@ pub fn read_number(buf: &mut &[u8]) -> Result { Marker::F64 => Ok(Number::F64( buf.read_data_f64().map_err(|_| DecodeError::IOError)?, )), - _ => Err(DecodeError::WrongType), + _ => Err(DecodeError::InvalidType("Invalid number type".to_owned())), } } diff --git a/trace-utils/src/tracer_payload.rs b/trace-utils/src/tracer_payload.rs index 5fddbd63f1..8ae0a963fc 100644 --- a/trace-utils/src/tracer_payload.rs +++ b/trace-utils/src/tracer_payload.rs @@ -8,7 +8,6 @@ use crate::{ use datadog_trace_protobuf::pb; use std::cmp::Ordering; -// TODO: EK - Should this really be public? pub type TracerPayloadV04 = Vec; #[derive(Debug, Clone)] From 804e512cfb05e53366cd03abf3f5a12386c7f53c Mon Sep 17 00:00:00 2001 From: Edmund Kump Date: Mon, 29 Jul 2024 15:20:33 -0400 Subject: [PATCH 05/10] add support for map16 and map32 decoding --- .../src/msgpack_decoder/v04/decoder/mod.rs | 190 +++++++++++++----- 1 file changed, 136 insertions(+), 54 deletions(-) diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs index 9e6cda4e1e..624727ec88 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs @@ -132,65 +132,92 @@ fn read_metric_pair(buf: &mut &[u8]) -> Result<(String, f64), DecodeError> { } fn read_map_strs(buf: &mut &[u8]) -> Result, DecodeError> { - match decode::read_marker(buf) - .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for map".to_owned()))? - { - Marker::FixMap(len) => { - let mut map = HashMap::new(); - for _ in 0..len { - let (k, v) = read_str_pair(buf)?; - map.insert(k, v); - } - Ok(map) - } - _ => Err(DecodeError::InvalidType( - "Unable to read map from buffer".to_owned(), - )), - } + let len = read_map_len(buf)?; + read_map(len, buf, read_str_pair) } fn read_metrics(buf: &mut &[u8]) -> Result, DecodeError> { - match decode::read_marker(buf) - .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for metrics".to_owned()))? - { - Marker::FixMap(len) => { - let mut metrics = HashMap::new(); - for _ in 0..len { - let (k, v) = read_metric_pair(buf)?; - metrics.insert(k, v); - } - Ok(metrics) - } - _ => Err(DecodeError::InvalidType( - "Unable to read metrics from buffer".to_owned(), - )), - } + let len = read_map_len(buf)?; + read_map(len, buf, read_metric_pair) } fn read_meta_struct(buf: &mut &[u8]) -> Result>, DecodeError> { - match decode::read_marker(buf).map_err(|_| { - DecodeError::InvalidFormat("Unable to read marker for meta_struct".to_owned()) - })? { - Marker::FixMap(len) => { - let mut meta_struct = HashMap::new(); - for _ in 0..len { - let k = read_string(buf)?; - let mut v = vec![]; - let array_len = decode::read_array_len(buf).map_err(|_| { - DecodeError::InvalidFormat( - "Unable to read array len for meta_struct".to_owned(), - ) - })?; - for _ in 0..array_len { - let value = read_number(buf)?.try_into()?; - v.push(value); - } - meta_struct.insert(k, v); - } - Ok(meta_struct) + fn read_meta_struct_pair(buf: &mut &[u8]) -> Result<(String, Vec), DecodeError> { + let k = read_string(buf)?; + let mut v = vec![]; + let array_len = decode::read_array_len(buf).map_err(|_| { + DecodeError::InvalidFormat("Unable to read array len for meta_struct".to_owned()) + })?; + for _ in 0..array_len { + let value = read_number(buf)?.try_into()?; + v.push(value); } + Ok((k, v)) + } + + let len = read_map_len(buf)?; + read_map(len, buf, read_meta_struct_pair) +} + +/// Reads a map from the buffer and returns it as a `HashMap`. +/// +/// This function is generic over the key and value types of the map, and it uses a provided +/// function to read key-value pairs from the buffer. +/// +/// # Arguments +/// +/// * `len` - The number of key-value pairs to read from the buffer. +/// * `buf` - A mutable reference to the buffer containing the encoded map data. +/// * `read_pair` - A function that reads a key-value pair from the buffer and returns it as a +/// `Result<(K, V), DecodeError>`. +/// +/// # Returns +/// +/// * `Ok(HashMap)` - A `HashMap` containing the decoded key-value pairs if successful. +/// * `Err(DecodeError)` - An error if the decoding process fails. +/// +/// # Errors +/// +/// This function will return an error if: +/// - The `read_pair` function returns an error while reading a key-value pair. +/// +/// # Type Parameters +/// +/// * `K` - The type of the keys in the map. Must implement `std::hash::Hash` and `Eq`. +/// * `V` - The type of the values in the map. +/// * `F` - The type of the function used to read key-value pairs from the buffer. +fn read_map( + len: usize, + buf: &mut &[u8], + read_pair: F, +) -> Result, DecodeError> +where + K: std::hash::Hash + Eq, + F: Fn(&mut &[u8]) -> Result<(K, V), DecodeError>, +{ + let mut map = HashMap::new(); + for _ in 0..len { + let (k, v) = read_pair(buf)?; + map.insert(k, v); + } + Ok(map) +} + +fn read_map_len(buf: &mut &[u8]) -> Result { + match decode::read_marker(buf) + .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for map".to_owned()))? + { + Marker::FixMap(len) => Ok(len as usize), + Marker::Map16 => buf + .read_data_u16() + .map_err(|_| DecodeError::IOError) + .map(|len| len as usize), + Marker::Map32 => buf + .read_data_u32() + .map_err(|_| DecodeError::IOError) + .map(|len| len as usize), _ => Err(DecodeError::InvalidType( - "Unable to read meta_struct from buffer".to_owned(), + "Unable to read map from buffer".to_owned(), )), } } @@ -217,7 +244,7 @@ mod tests { } #[test] - fn test_decoder_meta_struct_success() { + fn test_decoder_meta_struct_fixed_map_success() { let expected_meta_struct = HashMap::from([ ("key1".to_string(), vec![1, 2, 3]), ("key2".to_string(), vec![4, 5, 6]), @@ -236,7 +263,26 @@ mod tests { } #[test] - fn test_decoder_meta_success() { + fn test_decoder_meta_struct_map_16_success() { + let expected_meta_struct: HashMap> = (0..20) + .map(|i| (format!("key {}", i), vec![1 + i, 2 + i, 3 + i])) + .collect(); + + let span = Span { + meta_struct: expected_meta_struct.clone(), + ..Default::default() + }; + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let decoded_traces = from_slice(&mut encoded_data.as_slice()).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + assert_eq!(expected_meta_struct, decoded_span.meta_struct); + } + + #[test] + fn test_decoder_meta_fixed_map_success() { let expected_meta = HashMap::from([ ("key1".to_string(), "value1".to_string()), ("key2".to_string(), "value2".to_string()), @@ -255,7 +301,26 @@ mod tests { } #[test] - fn test_decoder_metrics_success() { + fn test_decoder_meta_map_16_success() { + let expected_meta: HashMap = (0..20) + .map(|i| (format!("key {}", i), format!("value {}", i))) + .collect(); + + let span = Span { + meta: expected_meta.clone(), + ..Default::default() + }; + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let decoded_traces = from_slice(&mut encoded_data.as_slice()).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + assert_eq!(expected_meta, decoded_span.meta); + } + + #[test] + fn test_decoder_metrics_fixed_map_success() { let mut span = Span::default(); let expected_metrics = HashMap::from([("metric1".to_string(), 1.23), ("metric2".to_string(), 4.56)]); @@ -269,6 +334,23 @@ mod tests { assert_eq!(expected_metrics, decoded_span.metrics); } + #[test] + fn test_decoder_metrics_map16_success() { + let mut span = Span::default(); + let expected_metrics: HashMap = (0..20) + .map(|i| (format!("metric{}", i), i as f64)) + .collect(); + + span.metrics = expected_metrics.clone(); + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let decoded_traces = from_slice(&mut encoded_data.as_slice()).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + assert_eq!(expected_metrics, decoded_span.metrics); + } + #[test] fn test_decoder_span_link_success() { let expected_span_links = vec![SpanLink { From 97cb6b6e1585dcf650ec84bb18db1d066e58df19 Mon Sep 17 00:00:00 2001 From: Edmund Kump Date: Thu, 1 Aug 2024 19:15:46 -0400 Subject: [PATCH 06/10] make number decoding and casting more flexible --- trace-utils/src/msgpack_decoder/v04/number.rs | 588 ++++++++++++++---- 1 file changed, 451 insertions(+), 137 deletions(-) diff --git a/trace-utils/src/msgpack_decoder/v04/number.rs b/trace-utils/src/msgpack_decoder/v04/number.rs index 7f62b18af5..a0709c8038 100644 --- a/trace-utils/src/msgpack_decoder/v04/number.rs +++ b/trace-utils/src/msgpack_decoder/v04/number.rs @@ -5,39 +5,75 @@ use super::error::DecodeError; use rmp::{decode::RmpRead, Marker}; use std::fmt; +#[derive(Debug, PartialEq)] pub enum Number { - U8(u8), - U32(u32), - U64(u64), - I8(i8), - I32(i32), - I64(i64), - F64(f64), + Unsigned(u64), + Signed(i64), + Float(f64), } + impl fmt::Display for Number { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Number::U8(val) => write!(f, "{}", val), - Number::U32(val) => write!(f, "{}", val), - Number::U64(val) => write!(f, "{}", val), - Number::I8(val) => write!(f, "{}", val), - Number::I32(val) => write!(f, "{}", val), - Number::I64(val) => write!(f, "{}", val), - Number::F64(val) => write!(f, "{}", val), + Number::Signed(val) => write!(f, "{}", val), + Number::Unsigned(val) => write!(f, "{}", val), + Number::Float(val) => write!(f, "{}", val), } } } -impl TryFrom for u8 { - type Error = DecodeError; - fn try_from(value: Number) -> Result { - match value { - Number::U8(val) => Ok(val), - Number::I8(val) => Ok(val as u8), - _ => Err(DecodeError::InvalidConversion(format!( - "unable to convert {} to u8", - value - ))), +impl Number { + pub fn bounded_int_conversion( + self, + lower_bound: T, + upper_bound: Option, + ) -> Result + where + T: TryInto + TryInto + TryInto + Copy + fmt::Display, + i64: TryInto, + u64: TryInto, + >::Error: fmt::Debug, + >::Error: fmt::Debug, + >::Error: fmt::Debug, + >::Error: fmt::Debug, + { + match self { + Number::Signed(val) => { + let upper_bound_check = if let Some(upper_bound) = upper_bound { + val <= upper_bound.try_into().unwrap() + } else { + true + }; + if val >= lower_bound.try_into().unwrap() && upper_bound_check { + val.try_into() + .map_err(|e| DecodeError::InvalidConversion(format!("{:?}", e))) + } else { + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + val + ))) + } + } + Number::Unsigned(val) => { + let upper_bound_check = if let Some(upper_bound) = upper_bound { + val <= upper_bound.try_into().unwrap() + } else { + true + }; + + if upper_bound_check { + val.try_into() + .map_err(|e| DecodeError::InvalidConversion(format!("{:?}", e))) + } else { + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + val + ))) + } + } + _ => Err(DecodeError::InvalidConversion( + "Cannot convert float to int".to_owned(), + )), } } } @@ -45,76 +81,42 @@ impl TryFrom for u8 { impl TryFrom for i8 { type Error = DecodeError; fn try_from(value: Number) -> Result { - match value { - Number::U8(val) => Ok(val as i8), - Number::I8(val) => Ok(val), - _ => Err(DecodeError::InvalidConversion(format!( - "unable to convert {} to i8", - value - ))), - } + value.bounded_int_conversion(i8::MIN, Some(i8::MAX)) } } -impl TryFrom for u32 { +impl TryFrom for i32 { type Error = DecodeError; fn try_from(value: Number) -> Result { - match value { - Number::U8(val) => Ok(val as u32), - Number::U32(val) => Ok(val), - _ => Err(DecodeError::InvalidConversion(format!( - "unable to convert {} to u32", - value - ))), - } + value.bounded_int_conversion(i32::MIN, Some(i32::MAX)) } } -impl TryFrom for u64 { +impl TryFrom for i64 { type Error = DecodeError; fn try_from(value: Number) -> Result { - match value { - Number::U8(val) => Ok(val as u64), - Number::U32(val) => Ok(val as u64), - Number::U64(val) => Ok(val), - _ => Err(DecodeError::InvalidConversion(format!( - "unable to convert {} to u64", - value - ))), - } + value.bounded_int_conversion(i64::MIN, Some(i64::MAX)) } } -impl TryFrom for i64 { +impl TryFrom for u8 { type Error = DecodeError; fn try_from(value: Number) -> Result { - match value { - Number::U8(val) => Ok(val as i64), - Number::U32(val) => Ok(val as i64), - Number::I8(val) => Ok(val as i64), - Number::I32(val) => Ok(val as i64), - Number::I64(val) => Ok(val), - _ => Err(DecodeError::InvalidConversion(format!( - "unable to convert {} to i64", - value - ))), - } + value.bounded_int_conversion(u8::MIN, Some(u8::MAX)) } } -impl TryFrom for i32 { +impl TryFrom for u32 { type Error = DecodeError; fn try_from(value: Number) -> Result { - match value { - Number::U8(val) => Ok(val as i32), - Number::I8(val) => Ok(val as i32), - Number::U32(val) => Ok(val as i32), - Number::I32(val) => Ok(val), - _ => Err(DecodeError::InvalidConversion(format!( - "unable to convert {} to i32", - value - ))), - } + value.bounded_int_conversion(u32::MIN, Some(u32::MAX)) + } +} + +impl TryFrom for u64 { + type Error = DecodeError; + fn try_from(value: Number) -> Result { + value.bounded_int_conversion(u64::MIN, None) } } @@ -122,13 +124,27 @@ impl TryFrom for f64 { type Error = DecodeError; fn try_from(value: Number) -> Result { match value { - Number::U8(val) => Ok(val as f64), - Number::U32(val) => Ok(val as f64), - Number::U64(val) => Ok(val as f64), - Number::I8(val) => Ok(val as f64), - Number::I32(val) => Ok(val as f64), - Number::I64(val) => Ok(val as f64), - Number::F64(val) => Ok(val), + Number::Unsigned(val) => { + if val <= f64::MAX as u64 { + Ok(val as f64) + } else { + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + val + ))) + } + } + Number::Signed(val) => { + if val >= f64::MIN as i64 && val <= f64::MAX as i64 { + Ok(val as f64) + } else { + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + val + ))) + } + } + Number::Float(val) => Ok(val), } } } @@ -137,36 +153,36 @@ pub fn read_number(buf: &mut &[u8]) -> Result { match rmp::decode::read_marker(buf) .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for number".to_owned()))? { - Marker::FixPos(val) => Ok(Number::U8(val)), - Marker::FixNeg(val) => Ok(Number::I8(val)), - Marker::U8 => Ok(Number::U8( - buf.read_data_u8().map_err(|_| DecodeError::IOError)?, + Marker::FixPos(val) => Ok(Number::Unsigned(val as u64)), + Marker::FixNeg(val) => Ok(Number::Signed(val as i64)), + Marker::U8 => Ok(Number::Unsigned( + buf.read_data_u8().map_err(|_| DecodeError::IOError)? as u64, )), - Marker::U16 => Ok(Number::U32( - buf.read_data_u16().map_err(|_| DecodeError::IOError)? as u32, + Marker::U16 => Ok(Number::Unsigned( + buf.read_data_u16().map_err(|_| DecodeError::IOError)? as u64, )), - Marker::U32 => Ok(Number::U32( - buf.read_data_u32().map_err(|_| DecodeError::IOError)?, + Marker::U32 => Ok(Number::Unsigned( + buf.read_data_u32().map_err(|_| DecodeError::IOError)? as u64, )), - Marker::U64 => Ok(Number::U64( + Marker::U64 => Ok(Number::Unsigned( buf.read_data_u64().map_err(|_| DecodeError::IOError)?, )), - Marker::I8 => Ok(Number::I32( - buf.read_data_i8().map_err(|_| DecodeError::IOError)? as i32, + Marker::I8 => Ok(Number::Signed( + buf.read_data_i8().map_err(|_| DecodeError::IOError)? as i64, )), - Marker::I16 => Ok(Number::I32( - buf.read_data_i16().map_err(|_| DecodeError::IOError)? as i32, + Marker::I16 => Ok(Number::Signed( + buf.read_data_i16().map_err(|_| DecodeError::IOError)? as i64, )), - Marker::I32 => Ok(Number::I32( - buf.read_data_i32().map_err(|_| DecodeError::IOError)?, + Marker::I32 => Ok(Number::Signed( + buf.read_data_i32().map_err(|_| DecodeError::IOError)? as i64, )), - Marker::I64 => Ok(Number::I64( + Marker::I64 => Ok(Number::Signed( buf.read_data_i64().map_err(|_| DecodeError::IOError)?, )), - Marker::F32 => Ok(Number::F64( + Marker::F32 => Ok(Number::Float( buf.read_data_f32().map_err(|_| DecodeError::IOError)? as f64, )), - Marker::F64 => Ok(Number::F64( + Marker::F64 => Ok(Number::Float( buf.read_data_f64().map_err(|_| DecodeError::IOError)?, )), _ => Err(DecodeError::InvalidType("Invalid number type".to_owned())), @@ -175,62 +191,360 @@ pub fn read_number(buf: &mut &[u8]) -> Result { #[cfg(test)] mod tests { + use super::*; use std::f64; - use super::*; + #[test] + fn test_i64_conversions() { + let valid_max = i64::MAX; + let valid_unsigned_number = Number::Unsigned(valid_max as u64); + let zero_unsigned = Number::Unsigned(0u64); + let zero_signed = Number::Unsigned(0u64); + let valid_signed_number = Number::Signed(valid_max); + let invalid_float_number = Number::Float(3.14); + let invalid_unsigned = u64::MAX; + let invalid_unsigned_number = Number::Unsigned(invalid_unsigned); + assert_eq!( + valid_max, + TryInto::::try_into(valid_unsigned_number).unwrap() + ); + assert_eq!( + valid_max, + TryInto::::try_into(valid_signed_number).unwrap() + ); + assert_eq!(0, TryInto::::try_into(zero_signed).unwrap()); + assert_eq!(0, TryInto::::try_into(zero_unsigned).unwrap()); + assert_eq!( + Err(DecodeError::InvalidConversion( + "Cannot convert float to int".to_owned() + )), + TryInto::::try_into(invalid_float_number) + ); + assert_eq!( + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + invalid_unsigned + ))), + TryInto::::try_into(invalid_unsigned_number) + ); + } #[test] - fn read_number_success() { - let values = (1, -1_i8, i32::MIN, u32::MAX, i64::MIN, u64::MAX, f64::MAX); + fn test_i32_conversions() { + let valid_signed_upper = i32::MAX; + let valid_unsigned_number = Number::Unsigned(valid_signed_upper as u64); + let zero_unsigned = Number::Unsigned(0u64); + let zero_signed = Number::Unsigned(0u64); + let valid_signed_number_upper = Number::Signed(valid_signed_upper as i64); + let valid_signed_lower = i32::MIN; + let valid_signed_number_lower = Number::Signed(valid_signed_lower as i64); + let invalid_float_number = Number::Float(3.14); + let invalid_unsigned = u64::MAX; + let invalid_unsigned_number = Number::Unsigned(invalid_unsigned); + let invalid_signed_upper = i32::MAX as i64 + 1; + let invalid_signed_number_upper = Number::Signed(invalid_signed_upper); + let invalid_signed_lower = i32::MIN as i64 - 1; + let invalid_signed_number_lower = Number::Signed(invalid_signed_lower); assert_eq!( - values.0, - TryInto::::try_into( - read_number(&mut rmp_serde::to_vec_named(&values.0).unwrap().as_ref()).unwrap() - ) - .unwrap() + valid_signed_upper, + TryInto::::try_into(valid_unsigned_number).unwrap() + ); + assert_eq!( + valid_signed_upper, + TryInto::::try_into(valid_signed_number_upper).unwrap() + ); + assert_eq!( + valid_signed_lower, + TryInto::::try_into(valid_signed_number_lower).unwrap() ); + assert_eq!(0, TryInto::::try_into(zero_signed).unwrap()); + assert_eq!(0, TryInto::::try_into(zero_unsigned).unwrap()); assert_eq!( - values.1, - TryInto::::try_into( - read_number(&mut rmp_serde::to_vec_named(&values.1).unwrap().as_ref()).unwrap() - ) - .unwrap() + Err(DecodeError::InvalidConversion( + "Cannot convert float to int".to_owned() + )), + TryInto::::try_into(invalid_float_number) ); assert_eq!( - values.2, - TryInto::::try_into( - read_number(&mut rmp_serde::to_vec_named(&values.2).unwrap().as_ref()).unwrap() - ) - .unwrap() + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + invalid_unsigned + ))), + TryInto::::try_into(invalid_unsigned_number) + ); + assert_eq!( + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + invalid_signed_upper + ))), + TryInto::::try_into(invalid_signed_number_upper) + ); + assert_eq!( + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + invalid_signed_lower + ))), + TryInto::::try_into(invalid_signed_number_lower) + ); + } + + #[test] + fn test_i8_conversions() { + let valid_signed_upper = i8::MAX; + let valid_unsigned_number = Number::Unsigned(valid_signed_upper as u64); + let zero_unsigned = Number::Unsigned(0u64); + let zero_signed = Number::Unsigned(0u64); + let valid_signed_number_upper = Number::Signed(valid_signed_upper as i64); + let valid_signed_lower = i8::MIN; + let valid_signed_number_lower = Number::Signed(valid_signed_lower as i64); + let invalid_float_number = Number::Float(3.14); + let invalid_unsigned = u8::MAX; + let invalid_unsigned_number = Number::Unsigned(invalid_unsigned as u64); + let invalid_signed_upper = i8::MAX as i64 + 1; + let invalid_signed_number_upper = Number::Signed(invalid_signed_upper); + let invalid_signed_lower = i8::MIN as i64 - 1; + let invalid_signed_number_lower = Number::Signed(invalid_signed_lower); + + assert_eq!( + valid_signed_upper, + TryInto::::try_into(valid_unsigned_number).unwrap() ); assert_eq!( - values.3, - TryInto::::try_into( - read_number(&mut rmp_serde::to_vec_named(&values.3).unwrap().as_ref()).unwrap() - ) - .unwrap() + valid_signed_upper, + TryInto::::try_into(valid_signed_number_upper).unwrap() ); assert_eq!( - values.4, - TryInto::::try_into( - read_number(&mut rmp_serde::to_vec_named(&values.4).unwrap().as_ref()).unwrap() - ) - .unwrap() + valid_signed_lower, + TryInto::::try_into(valid_signed_number_lower).unwrap() ); + assert_eq!(0, TryInto::::try_into(zero_signed).unwrap()); + assert_eq!(0, TryInto::::try_into(zero_unsigned).unwrap()); assert_eq!( - values.5, - TryInto::::try_into( - read_number(&mut rmp_serde::to_vec_named(&values.5).unwrap().as_ref()).unwrap() - ) - .unwrap() + Err(DecodeError::InvalidConversion( + "Cannot convert float to int".to_owned() + )), + TryInto::::try_into(invalid_float_number) ); assert_eq!( - values.6, - TryInto::::try_into( - read_number(&mut rmp_serde::to_vec_named(&values.6).unwrap().as_ref()).unwrap() - ) - .unwrap() + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + invalid_unsigned + ))), + TryInto::::try_into(invalid_unsigned_number) + ); + assert_eq!( + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + invalid_signed_upper + ))), + TryInto::::try_into(invalid_signed_number_upper) + ); + assert_eq!( + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + invalid_signed_lower + ))), + TryInto::::try_into(invalid_signed_number_lower) + ); + } + + #[test] + fn test_u8_conversions() { + let valid_signed_upper = u8::MAX; + let valid_unsigned_number = Number::Unsigned(valid_signed_upper as u64); + let zero_unsigned = Number::Unsigned(0u64); + let zero_signed = Number::Unsigned(0u64); + let valid_signed_number_upper = Number::Signed(valid_signed_upper as i64); + let valid_signed_lower = u8::MIN; + let valid_signed_number_lower = Number::Signed(valid_signed_lower as i64); + let invalid_float_number = Number::Float(3.14); + let invalid_unsigned = (u8::MAX as u64) + 1; + let invalid_unsigned_number = Number::Unsigned(invalid_unsigned); + let invalid_signed_upper = i32::MAX as i64 + 1; + let invalid_signed_number_upper = Number::Signed(invalid_signed_upper); + let invalid_signed_lower = i8::MIN as i64; + let invalid_signed_number_lower = Number::Signed(invalid_signed_lower); + + assert_eq!( + valid_signed_upper, + TryInto::::try_into(valid_unsigned_number).unwrap() + ); + assert_eq!( + valid_signed_upper, + TryInto::::try_into(valid_signed_number_upper).unwrap() + ); + assert_eq!( + valid_signed_lower, + TryInto::::try_into(valid_signed_number_lower).unwrap() + ); + assert_eq!(0, TryInto::::try_into(zero_signed).unwrap()); + assert_eq!(0, TryInto::::try_into(zero_unsigned).unwrap()); + assert_eq!( + Err(DecodeError::InvalidConversion( + "Cannot convert float to int".to_owned() + )), + TryInto::::try_into(invalid_float_number) + ); + assert_eq!( + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + invalid_unsigned + ))), + TryInto::::try_into(invalid_unsigned_number) + ); + assert_eq!( + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + invalid_signed_upper + ))), + TryInto::::try_into(invalid_signed_number_upper) + ); + assert_eq!( + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + invalid_signed_lower + ))), + TryInto::::try_into(invalid_signed_number_lower) + ); + } + + #[test] + fn test_u32_conversions() { + let valid_signed_upper = u32::MAX; + let valid_unsigned_number = Number::Unsigned(valid_signed_upper as u64); + let zero_unsigned = Number::Unsigned(0u64); + let zero_signed = Number::Unsigned(0u64); + let valid_signed_number_upper = Number::Signed(valid_signed_upper as i64); + let valid_signed_lower = u32::MIN; + let valid_signed_number_lower = Number::Signed(valid_signed_lower as i64); + let invalid_float_number = Number::Float(3.14); + let invalid_unsigned = (u32::MAX as u64) + 1; + let invalid_unsigned_number = Number::Unsigned(invalid_unsigned); + let invalid_signed_upper = i64::MAX; + let invalid_signed_number_upper = Number::Signed(invalid_signed_upper); + let invalid_signed_lower = i8::MIN as i64; + let invalid_signed_number_lower = Number::Signed(invalid_signed_lower); + + assert_eq!( + valid_signed_upper, + TryInto::::try_into(valid_unsigned_number).unwrap() + ); + assert_eq!( + valid_signed_upper, + TryInto::::try_into(valid_signed_number_upper).unwrap() + ); + assert_eq!( + valid_signed_lower, + TryInto::::try_into(valid_signed_number_lower).unwrap() + ); + assert_eq!(0, TryInto::::try_into(zero_signed).unwrap()); + assert_eq!(0, TryInto::::try_into(zero_unsigned).unwrap()); + assert_eq!( + Err(DecodeError::InvalidConversion( + "Cannot convert float to int".to_owned() + )), + TryInto::::try_into(invalid_float_number) + ); + assert_eq!( + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + invalid_unsigned + ))), + TryInto::::try_into(invalid_unsigned_number) + ); + assert_eq!( + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + invalid_signed_upper + ))), + TryInto::::try_into(invalid_signed_number_upper) + ); + assert_eq!( + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + invalid_signed_lower + ))), + TryInto::::try_into(invalid_signed_number_lower) + ); + } + + #[test] + fn test_u64_conversions() { + let valid_unsigned = u64::MAX; + let valid_unsigned_number = Number::Unsigned(valid_unsigned); + let zero_unsigned = Number::Unsigned(0u64); + let zero_signed = Number::Unsigned(0u64); + let valid_signed_upper = i64::MAX as u64; + let valid_signed_number_upper = Number::Signed(valid_signed_upper as i64); + let valid_signed_lower = u32::MIN as u64; + let valid_signed_number_lower = Number::Signed(valid_signed_lower as i64); + let invalid_float_number = Number::Float(3.14); + let invalid_signed_lower = i8::MIN as i64; + let invalid_signed_number_lower = Number::Signed(invalid_signed_lower); + + assert_eq!( + valid_unsigned, + TryInto::::try_into(valid_unsigned_number).unwrap() + ); + assert_eq!( + valid_signed_upper, + TryInto::::try_into(valid_signed_number_upper).unwrap() + ); + assert_eq!( + valid_signed_lower, + TryInto::::try_into(valid_signed_number_lower).unwrap() + ); + assert_eq!(0, TryInto::::try_into(zero_signed).unwrap()); + assert_eq!(0, TryInto::::try_into(zero_unsigned).unwrap()); + assert_eq!( + Err(DecodeError::InvalidConversion( + "Cannot convert float to int".to_owned() + )), + TryInto::::try_into(invalid_float_number) + ); + assert_eq!( + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + invalid_signed_lower + ))), + TryInto::::try_into(invalid_signed_number_lower) + ); + } + + #[test] + fn test_f64_conversions() { + let valid_signed_upper = i64::MAX; + let valid_unsigned_upper = u64::MAX; + let valid_signed_number_upper = Number::Signed(valid_signed_upper); + let valid_signed_lower = i64::MIN; + let valid_signed_number_lower = Number::Signed(valid_signed_lower); + let valid_unsigned_number = Number::Unsigned(valid_unsigned_upper); + let zero_unsigned = Number::Unsigned(0u64); + let zero_signed = Number::Unsigned(0u64); + let invalid_unsigned = u64::MAX; + let invalid_unsigned_number = Number::Unsigned(invalid_unsigned); + + assert_eq!( + valid_unsigned_upper as f64, + TryInto::::try_into(valid_unsigned_number).unwrap() + ); + assert_eq!( + valid_signed_upper as f64, + TryInto::::try_into(valid_signed_number_upper).unwrap() + ); + assert_eq!( + valid_signed_lower as f64, + TryInto::::try_into(valid_signed_number_lower).unwrap() + ); + assert_eq!(0f64, TryInto::::try_into(zero_signed).unwrap()); + assert_eq!(0f64, TryInto::::try_into(zero_unsigned).unwrap()); + assert_eq!( + Err(DecodeError::InvalidConversion(format!( + "{} is out of bounds for conversion", + invalid_unsigned + ))), + TryInto::::try_into(invalid_unsigned_number) ); } } From a445b81ecde83cb7f1eb401026df2ba0b5407068 Mon Sep 17 00:00:00 2001 From: Edmund Kump Date: Thu, 1 Aug 2024 19:35:44 -0400 Subject: [PATCH 07/10] add some basic fuzz tests for span decoding --- Cargo.lock | 2 + trace-utils/Cargo.toml | 2 + .../src/msgpack_decoder/v04/decoder/mod.rs | 62 +++++++++++++++++++ 3 files changed, 66 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 93f5cd9303..7ca6f5cef4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1644,6 +1644,8 @@ name = "datadog-trace-utils" version = "12.0.0" dependencies = [ "anyhow", + "bolero", + "bolero-generator", "bytes", "cargo_metadata", "criterion", diff --git a/trace-utils/Cargo.toml b/trace-utils/Cargo.toml index 51b1f0a733..5e4c34fcdb 100644 --- a/trace-utils/Cargo.toml +++ b/trace-utils/Cargo.toml @@ -39,6 +39,8 @@ testcontainers = { version = "0.17.0", optional = true } cargo_metadata = { version = "0.18.1", optional = true } [dev-dependencies] +bolero = "0.10.1" +bolero-generator = "0.10.2" criterion = "0.5.1" httpmock = { version = "0.7.0"} serde_json = "1.0" diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs index 624727ec88..8f397e931e 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs @@ -486,4 +486,66 @@ mod tests { result ); } + + use bolero::check; + use datadog_trace_protobuf::pb::Span; + use rmp_serde::to_vec_named; + + #[test] + fn fuzz_from_slice() { + check!() + .with_type::<( + String, + String, + String, + String, + String, + String, + String, + String, + u64, + u64, + u64, + i64, + )>() + .cloned() + .for_each( + |( + name, + service, + resource, + span_type, + meta_key, + meta_value, + metric_key, + metric_value, + trace_id, + span_id, + parent_id, + start, + )| { + let span = Span { + name, + service, + resource, + r#type: span_type, + meta: HashMap::from([(meta_key, meta_value)]), + metrics: HashMap::from([( + metric_key, + metric_value.parse::().unwrap_or_default(), + )]), + trace_id, + span_id, + parent_id, + start, + ..Default::default() + }; + let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); + let result = from_slice(&mut encoded_data.as_slice()); + println!("result: {:?}", result); + + assert!(result.is_ok()); + }, + ); + } } From 134e442bd28f33edf3d65f2f42641a282fd29b16 Mon Sep 17 00:00:00 2001 From: Edmund Kump Date: Fri, 2 Aug 2024 10:02:37 -0400 Subject: [PATCH 08/10] address comments and clippy errors --- .../src/msgpack_decoder/v04/decoder/mod.rs | 16 +++------------- .../src/msgpack_decoder/v04/decoder/span.rs | 9 +++------ trace-utils/src/msgpack_decoder/v04/number.rs | 12 ++++++------ 3 files changed, 12 insertions(+), 25 deletions(-) diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs index 8f397e931e..76a7c7a413 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs @@ -100,19 +100,9 @@ fn read_string_ref(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { #[inline] fn read_string(buf: &mut &[u8]) -> Result { - let value_len: usize = decode::read_str_len(buf) - .map_err(|e| DecodeError::InvalidFormat(e.to_string()))? - .try_into() - .map_err(|_| { - DecodeError::InvalidConversion("unable to get len of string buffer".to_owned()) - })?; - - let mut vec = vec![0; value_len]; - buf.read_exact_buf(vec.as_mut_slice()) - .map_err(|_| DecodeError::IOError)?; - - let str = String::from_utf8(vec).map_err(|e| DecodeError::Utf8Error(e.to_string()))?; - Ok(str) + let (str_ref, remaining_buf) = read_string_ref(buf)?; + *buf = remaining_buf; + Ok(str_ref.to_string()) } #[inline] diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span.rs index 49ca4e9858..36fc75a3b6 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span.rs @@ -2,7 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 use super::{ - read_map_strs, read_meta_struct, read_metrics, read_string_ref, span_link::read_span_links, + read_map_strs, read_meta_struct, read_metrics, read_string, read_string_ref, + span_link::read_span_links, }; use crate::msgpack_decoder::v04::error::DecodeError; use crate::msgpack_decoder::v04::number::read_number; @@ -84,11 +85,7 @@ impl FromStr for SpanKey { fn fill_span(span: &mut Span, buf: &mut &[u8]) -> Result<(), DecodeError> { // field's key won't be held so no need to copy it in a buffer. - let (key, value) = read_string_ref(buf)?; - - // Go to the value - *buf = value; - + let key = read_string(buf)?; let key = key.parse::()?; match key { diff --git a/trace-utils/src/msgpack_decoder/v04/number.rs b/trace-utils/src/msgpack_decoder/v04/number.rs index a0709c8038..9a4f39cfc8 100644 --- a/trace-utils/src/msgpack_decoder/v04/number.rs +++ b/trace-utils/src/msgpack_decoder/v04/number.rs @@ -201,7 +201,7 @@ mod tests { let zero_unsigned = Number::Unsigned(0u64); let zero_signed = Number::Unsigned(0u64); let valid_signed_number = Number::Signed(valid_max); - let invalid_float_number = Number::Float(3.14); + let invalid_float_number = Number::Float(4.14); let invalid_unsigned = u64::MAX; let invalid_unsigned_number = Number::Unsigned(invalid_unsigned); @@ -238,7 +238,7 @@ mod tests { let valid_signed_number_upper = Number::Signed(valid_signed_upper as i64); let valid_signed_lower = i32::MIN; let valid_signed_number_lower = Number::Signed(valid_signed_lower as i64); - let invalid_float_number = Number::Float(3.14); + let invalid_float_number = Number::Float(4.14); let invalid_unsigned = u64::MAX; let invalid_unsigned_number = Number::Unsigned(invalid_unsigned); let invalid_signed_upper = i32::MAX as i64 + 1; @@ -298,7 +298,7 @@ mod tests { let valid_signed_number_upper = Number::Signed(valid_signed_upper as i64); let valid_signed_lower = i8::MIN; let valid_signed_number_lower = Number::Signed(valid_signed_lower as i64); - let invalid_float_number = Number::Float(3.14); + let invalid_float_number = Number::Float(4.14); let invalid_unsigned = u8::MAX; let invalid_unsigned_number = Number::Unsigned(invalid_unsigned as u64); let invalid_signed_upper = i8::MAX as i64 + 1; @@ -358,7 +358,7 @@ mod tests { let valid_signed_number_upper = Number::Signed(valid_signed_upper as i64); let valid_signed_lower = u8::MIN; let valid_signed_number_lower = Number::Signed(valid_signed_lower as i64); - let invalid_float_number = Number::Float(3.14); + let invalid_float_number = Number::Float(4.14); let invalid_unsigned = (u8::MAX as u64) + 1; let invalid_unsigned_number = Number::Unsigned(invalid_unsigned); let invalid_signed_upper = i32::MAX as i64 + 1; @@ -418,7 +418,7 @@ mod tests { let valid_signed_number_upper = Number::Signed(valid_signed_upper as i64); let valid_signed_lower = u32::MIN; let valid_signed_number_lower = Number::Signed(valid_signed_lower as i64); - let invalid_float_number = Number::Float(3.14); + let invalid_float_number = Number::Float(4.14); let invalid_unsigned = (u32::MAX as u64) + 1; let invalid_unsigned_number = Number::Unsigned(invalid_unsigned); let invalid_signed_upper = i64::MAX; @@ -479,7 +479,7 @@ mod tests { let valid_signed_number_upper = Number::Signed(valid_signed_upper as i64); let valid_signed_lower = u32::MIN as u64; let valid_signed_number_lower = Number::Signed(valid_signed_lower as i64); - let invalid_float_number = Number::Float(3.14); + let invalid_float_number = Number::Float(4.14); let invalid_signed_lower = i8::MIN as i64; let invalid_signed_number_lower = Number::Signed(invalid_signed_lower); From 2025f3caed6fd643fb7a52948ba9c32aa78b64d0 Mon Sep 17 00:00:00 2001 From: Edmund Kump Date: Mon, 5 Aug 2024 09:53:10 -0400 Subject: [PATCH 09/10] Add trace-utils to fuzz testing on CI --- .github/workflows/fuzz.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/fuzz.yml b/.github/workflows/fuzz.yml index 2d864a6dab..d8ff841a35 100644 --- a/.github/workflows/fuzz.yml +++ b/.github/workflows/fuzz.yml @@ -7,7 +7,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - directory: [alloc, profiling, ddcommon-ffi] + directory: [alloc, profiling, ddcommon-ffi, trace-utils] env: CARGO_TERM_COLOR: always steps: From 87c03a4d41459a895319b08eeaebf218851c5485 Mon Sep 17 00:00:00 2001 From: Edmund Kump Date: Tue, 6 Aug 2024 08:35:31 -0400 Subject: [PATCH 10/10] addressing benchmark regression suggestions --- .../src/msgpack_decoder/v04/decoder/span.rs | 25 ++++++++----------- .../msgpack_decoder/v04/decoder/span_link.rs | 7 +++--- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span.rs index 36fc75a3b6..4bd9c3d61a 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span.rs @@ -84,25 +84,23 @@ impl FromStr for SpanKey { } fn fill_span(span: &mut Span, buf: &mut &[u8]) -> Result<(), DecodeError> { - // field's key won't be held so no need to copy it in a buffer. - let key = read_string(buf)?; + let (key, value) = read_string_ref(buf)?; let key = key.parse::()?; + *buf = value; + match key { SpanKey::Service => { - let (value, next) = read_string_ref(buf)?; - span.service = String::from_str(value).unwrap(); - *buf = next; + let value = read_string(buf)?; + span.service = value; } SpanKey::Name => { - let (value, next) = read_string_ref(buf)?; - span.name = String::from_str(value).unwrap(); - *buf = next; + let value = read_string(buf)?; + span.name = value; } SpanKey::Resource => { - let (value, next) = read_string_ref(buf)?; - span.resource = String::from_str(value).unwrap(); - *buf = next; + let value = read_string(buf)?; + span.resource = value; } SpanKey::TraceId => span.trace_id = read_number(buf)?.try_into()?, SpanKey::SpanId => span.span_id = read_number(buf)?.try_into()?, @@ -113,9 +111,8 @@ fn fill_span(span: &mut Span, buf: &mut &[u8]) -> Result<(), DecodeError> { SpanKey::Meta => span.meta = read_map_strs(buf)?, SpanKey::Metrics => span.metrics = read_metrics(buf)?, SpanKey::Type => { - let (value, next) = read_string_ref(buf)?; - span.r#type = String::from_str(value).unwrap(); - *buf = next; + let value = read_string(buf)?; + span.r#type = value; } SpanKey::MetaStruct => span.meta_struct = read_meta_struct(buf)?, SpanKey::SpanLinks => span.span_links = read_span_links(buf)?, diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs index b490d7626e..07a48d7233 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs @@ -1,7 +1,7 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::msgpack_decoder::v04::decoder::{read_map_strs, read_string_ref}; +use crate::msgpack_decoder::v04::decoder::{read_map_strs, read_string, read_string_ref}; use crate::msgpack_decoder::v04::error::DecodeError; use crate::msgpack_decoder::v04::number::read_number; use datadog_trace_protobuf::pb::SpanLink; @@ -84,9 +84,8 @@ fn decode_span_link(buf: &mut &[u8]) -> Result { SpanLinkKey::SpanId => span.span_id = read_number(buf)?.try_into()?, SpanLinkKey::Attributes => span.attributes = read_map_strs(buf)?, SpanLinkKey::Tracestate => { - let (value, next) = read_string_ref(buf)?; - span.tracestate = String::from_str(value).unwrap(); - *buf = next; + let value = read_string(buf)?; + span.tracestate = value; } SpanLinkKey::Flags => span.flags = read_number(buf)?.try_into()?, }