Skip to content

Commit

Permalink
Update serializer interface in a way that makes buffer reuse more nat…
Browse files Browse the repository at this point in the history
…ural
  • Loading branch information
Ten0 committed Apr 22, 2023
1 parent b1358bc commit 25384fa
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 168 deletions.
41 changes: 31 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,28 +102,49 @@ where
/// Serialize an avro "datum" (raw data, no headers...)
///
/// to the provided writer
pub fn to_datum<T, W>(value: &T, writer: W, schema: &Schema) -> Result<(), ser::SerError>
///
/// [`SerializerConfig`](ser::SerializerConfig) can be built from a schema:
/// ```
/// # use serde_avro_fast::{ser, Schema};
/// let schema: Schema = r#""int""#.parse().unwrap();
/// let serializer_config = &mut ser::SerializerConfig::new(&schema);
///
/// let mut serialized: Vec<u8> = serde_avro_fast::to_datum_vec(&3, serializer_config).unwrap();
/// assert_eq!(serialized, &[6]);
///
/// // reuse config and output buffer across serializations for ideal performance (~40% perf gain)
/// serialized.clear();
/// let serialized = serde_avro_fast::to_datum(&4, serialized, serializer_config).unwrap();
/// assert_eq!(serialized, &[8]);
/// ```
pub fn to_datum<T, W>(
value: &T,
writer: W,
serializer_config: &mut ser::SerializerConfig<'_>,
) -> Result<W, ser::SerError>
where
T: serde::Serialize + ?Sized,
W: std::io::Write,
{
serde::Serialize::serialize(
value,
ser::SerializerState::from_writer(writer, schema).serializer(),
)
let mut serializer_state = ser::SerializerState::from_writer(writer, serializer_config);
serde::Serialize::serialize(value, serializer_state.serializer())?;
Ok(serializer_state.into_writer())
}

/// Serialize an avro "datum" (raw data, no headers...)
///
/// to a newly allocated Vec
///
/// Note that unless you would otherwise allocate a `Vec` anyway, it will be
/// Note that unless you would otherwise allocate a new `Vec` anyway, it will be
/// more efficient to use [`to_datum`] instead.
pub fn to_datum_vec<T>(value: &T, schema: &Schema) -> Result<Vec<u8>, ser::SerError>
///
/// See [`to_datum`] for more details.
pub fn to_datum_vec<T>(
value: &T,
serializer_config: &mut ser::SerializerConfig<'_>,
) -> Result<Vec<u8>, ser::SerError>
where
T: serde::Serialize + ?Sized,
{
let mut serializer_state = ser::SerializerState::from_writer(Vec::new(), schema);
serde::Serialize::serialize(value, serializer_state.serializer())?;
Ok(serializer_state.into_writer())
to_datum(value, Vec::new(), serializer_config)
}
133 changes: 56 additions & 77 deletions src/ser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
//! necessary for more advanced usage.
//!
//! This gives manual access to the type that implements
//! [`serde::Serializer`], as well as its building blocks in order to set
//! configuration parameters that may enable you to increase performance
//! [`serde::Serializer`]
//!
//! Such usage would go as follows:
//! ```
Expand Down Expand Up @@ -36,24 +35,35 @@
//! }
//!
//! // Build the struct that will generally serve through serialization
//! let serializer_config = &mut serde_avro_fast::ser::SerializerConfig::new(&schema);
//! let mut serializer_state =
//! serde_avro_fast::ser::SerializerState::from_writer(Vec::new(), &schema);
//!
//! // Provide buffers from previous serialization to avoid allocating if field reordering
//! // is necessary
//! # let buffers_from_previous_serialization = serde_avro_fast::ser::Buffers::default();
//! serializer_state.add_buffers(buffers_from_previous_serialization);
//! serde_avro_fast::ser::SerializerState::from_writer(Vec::new(), serializer_config);
//!
//! // It's not the `&mut SerializerState` that implements `serde::Serializer` directly, instead
//! // it is `DatumSerializer` (which is essentially an `&mut SerializerState` but not exactly
//! // because it also keeps track of the current schema node)
//! // We build it through `SerializerState::serializer`
//! serde::Serialize::serialize(&Test { field: "foo" }, serializer_state.serializer())
//! .expect("Failed to serialize");
//!
//! let (serialized, buffers_for_next_serialization) = serializer_state.into_writer_and_buffers();
//! let serialized = serializer_state.into_writer();
//!
//! assert_eq!(serialized, &[6, 102, 111, 111]);
//!
//! // reuse config & output buffer across serializations for ideal performance (~40% perf gain)
//! let mut serializer_state = serde_avro_fast::ser::SerializerState::from_writer(
//! {
//! let mut buf = serialized;
//! buf.clear();
//! buf
//! },
//! serializer_config,
//! );
//!
//! serde::Serialize::serialize(&Test { field: "bar" }, serializer_state.serializer())
//! .expect("Failed to serialize");
//! let serialized = serializer_state.into_writer();
//!
//! assert_eq!(serialized, &[6, b'b', b'a', b'r']);
//! ```

mod error;
Expand All @@ -69,104 +79,73 @@ use {integer_encoding::VarIntWriter, serde::ser::*, std::io::Write};

/// All configuration and state necessary for the serialization to run
///
/// Notably holds the writer and a [`SerializerConfig`].
/// Notably holds the writer and a `&mut` [`SerializerConfig`].
///
/// Does not implement [`Serializer`] directly (use
/// [`.serializer`](Self::serializer) to obtain that).
pub struct SerializerState<'s, W> {
pub struct SerializerState<'c, 's, W> {
pub(crate) writer: W,
/// Storing these here for reuse so that we can bypass the allocation,
/// and statistically obtain buffers that are already the proper length
/// (since we have used them for previous records)
buffers: Buffers,
config: SerializerConfig<'s>,
config: &'c mut SerializerConfig<'s>,
}
/// Schema + other configs for serialization
#[derive(Clone)]

/// Schema + serialization buffers
///
/// It can be built from a schema:
/// ```
/// # use serde_avro_fast::{ser, Schema};
/// let schema: Schema = r#""int""#.parse().unwrap();
/// let serializer_config = &mut ser::SerializerConfig::new(&schema);
///
/// let mut serialized: Vec<u8> = serde_avro_fast::to_datum_vec(&3, serializer_config).unwrap();
/// assert_eq!(serialized, &[6]);
///
/// // reuse config & output buffer across serializations for ideal performance (~40% perf gain)
/// serialized.clear();
/// let serialized = serde_avro_fast::to_datum(&4, serialized, serializer_config).unwrap();
/// assert_eq!(serialized, &[8]);
/// ```
pub struct SerializerConfig<'s> {
schema_root: &'s SchemaNode<'s>,
buffers: Buffers,
schema: &'s Schema,
}

impl<'s> SerializerConfig<'s> {
pub fn new(schema: &'s Schema) -> Self {
Self::from_schema_node(schema.root())
}
pub fn from_schema_node(schema_root: &'s SchemaNode<'s>) -> Self {
Self { schema_root }
}
}

impl<'s, W: std::io::Write> SerializerState<'s, W> {
pub fn from_writer(writer: W, schema: &'s Schema) -> Self {
Self {
writer,
config: SerializerConfig {
schema_root: schema.root(),
},
schema,
buffers: Buffers::default(),
}
}

pub fn with_config(writer: W, config: SerializerConfig<'s>) -> Self {
SerializerState {
pub fn schema(&self) -> &'s Schema {
self.schema
}
}

impl<'c, 's, W: std::io::Write> SerializerState<'c, 's, W> {
pub fn from_writer(writer: W, serializer_config: &'c mut SerializerConfig<'s>) -> Self {
Self {
writer,
config,
buffers: Buffers::default(),
config: serializer_config,
}
}

pub fn serializer<'r>(&'r mut self) -> DatumSerializer<'r, 's, W> {
pub fn serializer<'r>(&'r mut self) -> DatumSerializer<'r, 'c, 's, W> {
DatumSerializer {
schema_node: self.config.schema_root,
schema_node: self.config.schema.root(),
state: self,
}
}

/// Reuse buffers from a previous serializer
///
/// In order to avoid allocating even when field reordering is necessary we
/// can preserve the necessary allocations from one record to another (even
/// across deserializations).
///
/// This brings ~40% perf improvement
pub fn add_buffers(&mut self, buffers: Buffers) {
if self.buffers.field_reordering_buffers.is_empty() {
self.buffers.field_reordering_buffers = buffers.field_reordering_buffers;
} else {
self.buffers
.field_reordering_buffers
.extend(buffers.field_reordering_buffers);
}
if self.buffers.field_reordering_super_buffers.is_empty() {
self.buffers.field_reordering_super_buffers = buffers.field_reordering_super_buffers;
} else {
self.buffers
.field_reordering_super_buffers
.extend(buffers.field_reordering_super_buffers);
}
}
}

impl<W> SerializerState<'_, W> {
impl<W> SerializerState<'_, '_, W> {
/// Get writer back
pub fn into_writer(self) -> W {
self.writer
}

/// Get writer back, as well as buffers
///
/// You may reuse these buffers with another serializer for increased
/// performance
///
/// These are used when it is required to buffer for field reordering
/// (when the fields of a record are provided by serde not in the same order
/// as they have to be serialized according to the schema)
///
/// If you are in a such scenario, reusing those should lead to about ~40%
/// perf improvement.
pub fn into_writer_and_buffers(self) -> (W, Buffers) {
(self.writer, self.buffers)
}
}

/// Buffers used during serialization, for reuse across serializations
Expand All @@ -177,7 +156,7 @@ impl<W> SerializerState<'_, W> {
///
/// This brings ~40% perf improvement
#[derive(Default)]
pub struct Buffers {
struct Buffers {
field_reordering_buffers: Vec<Vec<u8>>,
field_reordering_super_buffers: Vec<Vec<Option<Vec<u8>>>>,
}
8 changes: 4 additions & 4 deletions src/ser/serializer/blocks.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use super::*;

pub(super) struct BlockWriter<'r, 's, W> {
pub(super) state: &'r mut SerializerState<'s, W>,
pub(super) struct BlockWriter<'r, 'c, 's, W> {
pub(super) state: &'r mut SerializerState<'c, 's, W>,
current_block_len: usize,
}

impl<'r, 's, W> BlockWriter<'r, 's, W>
impl<'r, 'c, 's, W> BlockWriter<'r, 'c, 's, W>
where
W: std::io::Write,
{
pub(super) fn new(
state: &'r mut SerializerState<'s, W>,
state: &'r mut SerializerState<'c, 's, W>,
min_len: usize,
) -> Result<Self, SerError> {
if min_len > 0 {
Expand Down
4 changes: 2 additions & 2 deletions src/ser/serializer/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::schema::Decimal;

use super::*;

pub(super) fn serialize<'r, 's, W>(
state: &'r mut SerializerState<'s, W>,
pub(super) fn serialize<'r, 'c, 's, W>(
state: &'r mut SerializerState<'c, 's, W>,
decimal: &'s Decimal,
mut rust_decimal: rust_decimal::Decimal,
) -> Result<(), SerError>
Expand Down
26 changes: 13 additions & 13 deletions src/ser/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ use {

/// Can't be instantiated directly - has to be constructed from a
/// [`SerializerState`]
pub struct DatumSerializer<'r, 's, W> {
pub(super) state: &'r mut SerializerState<'s, W>,
pub struct DatumSerializer<'r, 'c, 's, W> {
pub(super) state: &'r mut SerializerState<'c, 's, W>,
pub(super) schema_node: &'s SchemaNode<'s>,
}

impl<'r, 's, W: Write> Serializer for DatumSerializer<'r, 's, W> {
impl<'r, 'c, 's, W: Write> Serializer for DatumSerializer<'r, 'c, 's, W> {
type Ok = ();
type Error = SerError;

type SerializeSeq = SerializeAsArrayOrDuration<'r, 's, W>;
type SerializeTuple = SerializeAsArrayOrDuration<'r, 's, W>;
type SerializeTupleStruct = SerializeAsArrayOrDuration<'r, 's, W>;
type SerializeTupleVariant = SerializeAsArrayOrDuration<'r, 's, W>;
type SerializeMap = SerializeMapAsRecordOrMapOrDuration<'r, 's, W>;
type SerializeStruct = SerializeStructAsRecordOrMapOrDuration<'r, 's, W>;
type SerializeStructVariant = SerializeStructAsRecordOrMapOrDuration<'r, 's, W>;
type SerializeSeq = SerializeAsArrayOrDuration<'r, 'c, 's, W>;
type SerializeTuple = SerializeAsArrayOrDuration<'r, 'c, 's, W>;
type SerializeTupleStruct = SerializeAsArrayOrDuration<'r, 'c, 's, W>;
type SerializeTupleVariant = SerializeAsArrayOrDuration<'r, 'c, 's, W>;
type SerializeMap = SerializeMapAsRecordOrMapOrDuration<'r, 'c, 's, W>;
type SerializeStruct = SerializeStructAsRecordOrMapOrDuration<'r, 'c, 's, W>;
type SerializeStructVariant = SerializeStructAsRecordOrMapOrDuration<'r, 'c, 's, W>;

fn serialize_bool(self, v: bool) -> Result<Self::Ok, Self::Error> {
match self.schema_node {
Expand Down Expand Up @@ -426,7 +426,7 @@ impl<'r, 's, W: Write> Serializer for DatumSerializer<'r, 's, W> {
}
}

impl<'r, 's, W: Write> DatumSerializer<'r, 's, W> {
impl<'r, 'c, 's, W: Write> DatumSerializer<'r, 'c, 's, W> {
fn serialize_union_unnamed<O>(
self,
union: &'s Union<'s>,
Expand Down Expand Up @@ -560,7 +560,7 @@ impl<'r, 's, W: Write> DatumSerializer<'r, 's, W> {
fn serialize_lookup_union_variant_by_name<O>(
self,
variant_name: &str,
f: impl FnOnce(DatumSerializer<'r, 's, W>) -> Result<O, SerError>,
f: impl FnOnce(DatumSerializer<'r, 'c, 's, W>) -> Result<O, SerError>,
) -> Result<O, SerError> {
match self.schema_node {
SchemaNode::Union(union) => match union.per_type_lookup.named(variant_name) {
Expand Down Expand Up @@ -588,7 +588,7 @@ impl<'r, 's, W: Write> DatumSerializer<'r, 's, W> {
self,
variant_or_struct_name: &str,
len: usize,
) -> Result<SerializeStructAsRecordOrMapOrDuration<'r, 's, W>, SerError> {
) -> Result<SerializeStructAsRecordOrMapOrDuration<'r, 'c, 's, W>, SerError> {
self.serialize_lookup_union_variant_by_name(variant_or_struct_name, |serializer| {
match *serializer.schema_node {
SchemaNode::Record(ref record) => Ok(
Expand Down
Loading

0 comments on commit 25384fa

Please sign in to comment.