Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion bd-api/src/api_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use bd_client_stats_store::Collector;
use bd_client_stats_store::test::StatsHelper;
use bd_grpc_codec::code::Code;
use bd_grpc_codec::{Decompression, Encoder, OptimizeFor};
use bd_internal_logging::{LogFields, LogLevel, LogType};
use bd_internal_logging::{LogFields, LogLevel};
use bd_key_value::Store;
use bd_metadata::{Metadata, Platform};
use bd_network_quality::{NetworkQuality, NetworkQualityResolver as _};
Expand All @@ -37,6 +37,7 @@ use bd_proto::protos::client::api::{
RuntimeUpdate,
StatsUploadRequest,
};
use bd_proto::protos::logging::payload::LogType;
use bd_runtime::runtime::{ConfigLoader, FeatureFlag};
use bd_stats_common::labels;
use bd_test_helpers::make_mut;
Expand Down
1 change: 1 addition & 0 deletions bd-buffer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bd-client-common.path = "../bd-client-common"
bd-client-stats-store.path = "../bd-client-stats-store"
bd-completion.path = "../bd-completion"
bd-error-reporter.path = "../bd-error-reporter"
bd-log-primitives.path = "../bd-log-primitives"
bd-proto.path = "../bd-proto"
bd-runtime.path = "../bd-runtime"
bd-stats-common.path = "../bd-stats-common"
Expand Down
8 changes: 4 additions & 4 deletions bd-buffer/src/buffer/aggregate_ring_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ use super::{
RingBufferProducer,
RingBufferStats,
VolatileRingBuffer,
to_u32,
};
#[cfg(test)]
use crate::buffer::test::thread_synchronizer::ThreadSynchronizer;
use crate::{AbslCode, Error, Result};
use bd_log_primitives::LossyIntToU32;
use parking_lot::Mutex;
#[cfg(test)]
use std::any::Any;
Expand Down Expand Up @@ -76,7 +76,7 @@ impl SharedData {
// the volatile buffer.
match producer
.as_mut()
.reserve(to_u32(read_reservation.len()), true)
.reserve(read_reservation.len().to_u32_lossy(), true)
{
Ok(write_reservation) => {
write_reservation.copy_from_slice(read_reservation);
Expand Down Expand Up @@ -138,8 +138,8 @@ impl RingBufferImpl {
// For aggregate buffers, the size of the file (after subtracting header space) must be >= the
// size of RAM. This is to avoid situations in which we accept a record into RAM but cannot ever
// write it to disk.
if non_volatile_size < to_u32(std::mem::size_of::<FileHeader>())
|| volatile_size > (non_volatile_size - to_u32(std::mem::size_of::<FileHeader>()))
if non_volatile_size < std::mem::size_of::<FileHeader>().to_u32_lossy()
|| volatile_size > (non_volatile_size - std::mem::size_of::<FileHeader>().to_u32_lossy())
{
log::error!(
"file size '{}' not big enough for header size '{}' or file size (minus header) not \
Expand Down
7 changes: 4 additions & 3 deletions bd-buffer/src/buffer/aggregate_ring_buffer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use crate::buffer::test::{
read_and_verify,
start_read_and_verify,
};
use crate::buffer::{RingBuffer, RingBufferStats, StatsTestHelper, to_u32};
use crate::buffer::{RingBuffer, RingBufferStats, StatsTestHelper};
use bd_client_stats_store::Collector;
use bd_log_primitives::LossyIntToU32;
use futures::poll;
use std::sync::Arc;
use tempfile::TempDir;
Expand Down Expand Up @@ -48,7 +49,7 @@ impl Helper {
"test",
volatile_size,
temp_dir.path().join("buffer"),
non_volatile_size + to_u32(std::mem::size_of::<FileHeader>()),
non_volatile_size + std::mem::size_of::<FileHeader>().to_u32_lossy(),
PerRecordCrc32Check::Yes,
allow_overwrite,
Arc::new(RingBufferStats::default()),
Expand Down Expand Up @@ -80,7 +81,7 @@ impl Helper {
"test",
self.volatile_size,
self.temp_dir.path().join("buffer"),
self.non_volatile_size + to_u32(std::mem::size_of::<FileHeader>()),
self.non_volatile_size + std::mem::size_of::<FileHeader>().to_u32_lossy(),
PerRecordCrc32Check::Yes,
self.allow_overwrite,
Arc::new(RingBufferStats::default()),
Expand Down
23 changes: 13 additions & 10 deletions bd-buffer/src/buffer/common_ring_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
#[path = "./common_ring_buffer_test.rs"]
mod common_ring_buffer_test;

use super::RingBufferStats;
#[cfg(test)]
use super::test::thread_synchronizer::ThreadSynchronizer;
use super::{RingBufferStats, to_u32};
use crate::{AbslCode, Error, Result};
use bd_client_common::error::InvariantError;
use bd_log_primitives::LossyIntToU32;
use parking_lot::{Condvar, Mutex, MutexGuard};
use std::fmt::Display;
use std::ptr::NonNull;
Expand Down Expand Up @@ -229,14 +230,14 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
// part of the record. The extra space is always fixed and is the value stored in
// extra_bytes_per_record_.
fn record_size_offset(&self, start: u32) -> u32 {
debug_assert!(self.extra_bytes_per_record >= to_u32(std::mem::size_of::<u32>()));
start + (self.extra_bytes_per_record - to_u32(std::mem::size_of::<u32>()))
debug_assert!(self.extra_bytes_per_record >= std::mem::size_of::<u32>().to_u32_lossy());
start + (self.extra_bytes_per_record - std::mem::size_of::<u32>().to_u32_lossy())
}

// Returns any extra space at the beginning of the reservation. See recordSizeOffset() for more
// information on the record layout.
fn extra_data(&mut self, start: u32) -> &mut [u8] {
debug_assert!(self.extra_bytes_per_record >= to_u32(std::mem::size_of::<u32>()));
debug_assert!(self.extra_bytes_per_record >= std::mem::size_of::<u32>().to_u32_lossy());
let start = start as usize;
let extra_bytes_per_record = self.extra_bytes_per_record as usize;
&mut self.memory()[start .. start + extra_bytes_per_record - std::mem::size_of::<u32>()]
Expand Down Expand Up @@ -299,7 +300,7 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
// space. This can overflow but as long as we are within the memory space crc checks will catch
// further corruption.
let size_index = self.record_size_offset(next_read_start_to_use);
if size_index + to_u32(std::mem::size_of::<u32>()) > to_u32(self.memory.0.len()) {
if size_index + std::mem::size_of::<u32>().to_u32_lossy() > self.memory.0.len().to_u32_lossy() {
return Err(Error::AbslStatus(
AbslCode::DataLoss,
"corrupted record size index".to_string(),
Expand All @@ -318,7 +319,9 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
// space crc checks will catch further corruption.
if size == 0
|| Self::overflow_add(&[next_read_start_to_use, self.extra_bytes_per_record, size])
.is_none_or(|next_read_start_index| next_read_start_index > to_u32(self.memory.0.len()))
.is_none_or(|next_read_start_index| {
next_read_start_index > self.memory.0.len().to_u32_lossy()
})
{
return Err(Error::AbslStatus(
AbslCode::DataLoss,
Expand All @@ -343,7 +346,7 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
.last_write_end_before_wrap
.ok_or(InvariantError::Invariant)?
+ 1,
size: to_u32(self.memory.0.len())
size: self.memory.0.len().to_u32_lossy()
- (reservation_data
.last_write_end_before_wrap
.ok_or(InvariantError::Invariant)?
Expand Down Expand Up @@ -469,7 +472,7 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
let mut reservation_data = TempReservationData::default();
let next_write_start = *self.next_write_start();

if next_write_start + write_size <= to_u32(self.memory.0.len()) {
if next_write_start + write_size <= self.memory.0.len().to_u32_lossy() {
// It fits in the remainder without wrapping.
reservation_data.range.start = next_write_start;
reservation_data.next_write_start = next_write_start + write_size;
Expand Down Expand Up @@ -529,7 +532,7 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
};

// If size is 0 or > then the ring buffer size we can't do anything.
if size == 0 || actual_size > to_u32(self.memory.0.len()) {
if size == 0 || actual_size > self.memory.0.len().to_u32_lossy() {
log::trace!("({}) invalid reservation size: {}", self.name, size);
// Note that we don't record the bytes lost in this case, since a very large number is likely
// to cause overflow.
Expand Down Expand Up @@ -996,7 +999,7 @@ impl<ExtraLockedData> CommonRingBuffer<ExtraLockedData> {
lock_count: AtomicU32::default().into(),
shutdown_lock: None,
stats,
extra_bytes_per_record: extra_bytes_per_record + to_u32(std::mem::size_of::<u32>()),
extra_bytes_per_record: extra_bytes_per_record + std::mem::size_of::<u32>().to_u32_lossy(),
extra_locked_data,
allow_overwrite,
on_total_data_loss_cb: Box::new(on_total_data_loss_cb),
Expand Down
6 changes: 3 additions & 3 deletions bd-buffer/src/buffer/common_ring_buffer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ use crate::buffer::{
RingBufferStats,
StatsTestHelper,
VolatileRingBuffer,
to_u32,
};
use crate::{AbslCode, Error};
use assert_matches::assert_matches;
use bd_client_stats_store::Collector;
use bd_log_primitives::LossyIntToU32;
use parameterized::parameterized;
use std::any::Any;
use std::sync::Arc;
Expand Down Expand Up @@ -78,7 +78,7 @@ impl Helper {
TestType::NonVolatile => NonVolatileRingBuffer::new(
"test".to_string(),
temp_dir.path().join("buffer"),
size + to_u32(std::mem::size_of::<FileHeader>()),
size + std::mem::size_of::<FileHeader>().to_u32_lossy(),
AllowOverwrite::Yes,
BlockWhenReservingIntoConcurrentRead::No,
PerRecordCrc32Check::No,
Expand All @@ -89,7 +89,7 @@ impl Helper {
"test",
size,
temp_dir.path().join("buffer"),
size + to_u32(std::mem::size_of::<FileHeader>()),
size + std::mem::size_of::<FileHeader>().to_u32_lossy(),
PerRecordCrc32Check::No,
AllowOverwrite::Yes,
Arc::new(RingBufferStats::default()),
Expand Down
11 changes: 2 additions & 9 deletions bd-buffer/src/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,13 @@ mod volatile_ring_buffer;
mod test;

use bd_client_stats_store::Counter;
use bd_log_primitives::LossyIntToU32;
#[cfg(test)]
use std::any::Any;
use std::sync::Arc;
#[cfg(test)]
use test::thread_synchronizer::ThreadSynchronizer;

// Wrapper that allows converting a usize to a u32. This avoids verbosity in places we know this is
// safe.
#[allow(clippy::cast_possible_truncation)]
#[must_use]
pub const fn to_u32(value: usize) -> u32 {
value as u32
}

//
// RingBufferStats
//
Expand Down Expand Up @@ -101,7 +94,7 @@ pub trait RingBufferProducer: Send {

// Writes a single record to the buffer by copying the provided data into the record reservation.
fn write(&mut self, data: &[u8]) -> Result<()> {
let reserved = self.reserve(to_u32(data.len()), true)?;
let reserved = self.reserve(data.len().to_u32_lossy(), true)?;
reserved.copy_from_slice(data);
self.commit()
}
Expand Down
8 changes: 5 additions & 3 deletions bd-buffer/src/buffer/non_volatile_ring_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ use super::{
RingBufferCursorConsumer,
RingBufferProducer,
RingBufferStats,
to_u32,
};
use crate::{AbslCode, Error, Result};
use bd_client_common::error::InvariantError;
use bd_log_primitives::LossyIntToU32;
use crc32fast::Hasher;
use fs2::FileExt;
use intrusive_collections::offset_of;
Expand Down Expand Up @@ -632,7 +632,9 @@ pub struct FileHeader {
crc32: u32,
}

const FILE_HEADER_CURRENT_VERSION: u32 = 1;
// Version 1: Original version.
// Version 2: Switched to protobuf encoding for records.
const FILE_HEADER_CURRENT_VERSION: u32 = 2;

//
// ConsumerType
Expand Down Expand Up @@ -738,7 +740,7 @@ impl RingBufferImpl {
const_assert_eq!(offset_of!(FileHeader, next_read_start), 28);
const_assert_eq!(offset_of!(FileHeader, crc32), 36);

if size < to_u32(std::mem::size_of::<FileHeader>()) {
if size < std::mem::size_of::<FileHeader>().to_u32_lossy() {
log::error!(
"({name}) file size '{}' not big enough for header size '{}'",
size,
Expand Down
7 changes: 4 additions & 3 deletions bd-buffer/src/buffer/non_volatile_ring_buffer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
use super::{FileHeader, RingBufferImpl};
use crate::buffer::common_ring_buffer::{AllowOverwrite, Cursor};
use crate::buffer::test::{Helper as CommonHelper, reserve_and_commit};
use crate::buffer::{OptionalStatGetter, StatsTestHelper, to_u32};
use crate::buffer::{OptionalStatGetter, StatsTestHelper};
use crate::{AbslCode, Error, Result};
use assert_matches::assert_matches;
use bd_client_stats_store::Collector;
use bd_log_primitives::LossyIntToU32;
use intrusive_collections::offset_of;
use std::fs::File;
use std::io::{Read, Write};
Expand All @@ -34,7 +35,7 @@ impl Helper {
let buffer = RingBufferImpl::new(
"test".to_string(),
temp_dir.path().join("buffer"),
size + to_u32(std::mem::size_of::<FileHeader>()),
size + std::mem::size_of::<FileHeader>().to_u32_lossy(),
allow_overwrite,
super::BlockWhenReservingIntoConcurrentRead::No,
super::PerRecordCrc32Check::Yes,
Expand Down Expand Up @@ -76,7 +77,7 @@ impl Helper {
RingBufferImpl::new(
"test".to_string(),
self.temp_dir.path().join("buffer"),
self.size + to_u32(std::mem::size_of::<FileHeader>()),
self.size + std::mem::size_of::<FileHeader>().to_u32_lossy(),
self.allow_overwrite,
super::BlockWhenReservingIntoConcurrentRead::No,
super::PerRecordCrc32Check::Yes,
Expand Down
11 changes: 3 additions & 8 deletions bd-buffer/src/buffer/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,8 @@
pub mod thread_synchronizer;

use crate::buffer::common_ring_buffer::Cursor;
use crate::buffer::{
RingBuffer,
RingBufferConsumer,
RingBufferCursorConsumer,
RingBufferProducer,
to_u32,
};
use crate::buffer::{RingBuffer, RingBufferConsumer, RingBufferCursorConsumer, RingBufferProducer};
use bd_log_primitives::LossyIntToU32;
use std::sync::Arc;

#[ctor::ctor]
Expand Down Expand Up @@ -42,7 +37,7 @@ impl StartRead {
}

pub fn reserve_no_commit(producer: &mut dyn RingBufferProducer, data: &str) {
let reserved = producer.reserve(to_u32(data.len()), true).unwrap();
let reserved = producer.reserve(data.len().to_u32_lossy(), true).unwrap();
assert_eq!(reserved.len(), data.len());
reserved.copy_from_slice(data.as_bytes());
}
Expand Down
5 changes: 3 additions & 2 deletions bd-buffer/src/buffer/volatile_ring_buffer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
use crate::buffer::common_ring_buffer::Cursor;
use crate::buffer::test::{Helper, reserve_no_commit};
use crate::buffer::volatile_ring_buffer::RingBufferImpl;
use crate::buffer::{RingBufferProducer, RingBufferStats, to_u32};
use crate::buffer::{RingBufferProducer, RingBufferStats};
use crate::{AbslCode, Error};
use assert_matches::assert_matches;
use bd_log_primitives::LossyIntToU32;
use itertools::Itertools;

fn make_helper(size: u32) -> Helper {
Expand Down Expand Up @@ -377,7 +378,7 @@ fn write_into_concurrent_reader() {
// Start reading 0-9 and then reserve and commit what should go into 10-19.
let reserved = helper.start_read_and_verify("aaaaaa");
assert_matches!(
helper.producer().reserve(to_u32("dddddd".len()), true),
helper.producer().reserve("dddddd".len().to_u32_lossy(), true),
Err(Error::AbslStatus(code, message))
if code == AbslCode::ResourceExhausted && message == "writing into concurrent read"
);
Expand Down
Loading
Loading