diff --git a/Cargo.lock b/Cargo.lock index acb7a0bb..0e46264a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1440,12 +1440,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "foldhash" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" - [[package]] name = "foreign-types" version = "0.3.2" @@ -1702,17 +1696,6 @@ dependencies = [ "serde", ] -[[package]] -name = "hashbrown" -version = "0.15.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" -dependencies = [ - "allocator-api2", - "equivalent", - "foldhash", -] - [[package]] name = "hashbrown" version = "0.16.1" @@ -2302,15 +2285,6 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" -[[package]] -name = "lru" -version = "0.12.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" -dependencies = [ - "hashbrown 0.15.5", -] - [[package]] name = "lru-slab" version = "0.1.2" @@ -4143,41 +4117,25 @@ dependencies = [ name = "roboflow-dataset" version = "0.2.0" dependencies = [ - "anyhow", "async-trait", - "bytes", - "bzip2", - "chrono", "criterion", "crossbeam-channel", - "crossbeam-deque", - "futures", - "hex", - "lz4_flex", "mcap", - "memmap2 0.9.9", - "num_cpus", "polars", "pretty_assertions", "proptest", - "rayon", - "regex", "robocodec", "roboflow-core", - "roboflow-executor", "roboflow-media", "roboflow-storage", - "rosbag", "serde", "serde_json", - "serde_yaml_ng", "tempfile", "thiserror 1.0.69", "tokio", "toml", "tracing", "uuid", - "zstd", ] [[package]] @@ -4191,11 +4149,9 @@ dependencies = [ "futures", "gethostname", "glob", - "lru", "polars", "pretty_assertions", "roboflow-core", - "roboflow-executor", "roboflow-storage", "serde", "serde_json", @@ -4236,7 +4192,6 @@ dependencies = [ "crossbeam-channel", "image", "num_cpus", - "png 0.17.16", "rayon", "robocodec", "roboflow-core", @@ -4254,19 +4209,11 @@ name = "roboflow-pipeline" version = "0.2.0" dependencies = [ "async-trait", - "chrono", - "crossbeam-channel", - "rayon", "robocodec", "roboflow-core", "roboflow-dataset", "roboflow-executor", "roboflow-media", - "roboflow-storage", - "serde", - "serde_json", - "thiserror 1.0.69", - "tokio", "tracing", ] diff --git a/crates/roboflow-dataset/Cargo.toml b/crates/roboflow-dataset/Cargo.toml index d2b1aff8..3547f7ec 100644 --- a/crates/roboflow-dataset/Cargo.toml +++ b/crates/roboflow-dataset/Cargo.toml @@ -10,36 +10,20 @@ description = "Dataset formats and conversion for robotics training data" [dependencies] roboflow-core = { workspace = true } roboflow-storage = { workspace = true } -roboflow-executor = { workspace = true } roboflow-media = { workspace = true } robocodec = { workspace = true } polars = { version = "0.41", features = ["parquet", "lazy", "diagonal_concat"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -serde_yaml_ng = "0.10" toml = "0.8" thiserror = "1.0" -anyhow = "1.0" tracing = "0.1" crossbeam-channel = "0.5" -crossbeam-deque = "0.8" -rayon = "1.10" -num_cpus = "1.16" tokio = { workspace = true } async-trait = { workspace = true } -futures = "0.3" uuid = { version = "1.10", features = ["v4", "serde"] } mcap = "0.24" -rosbag = "0.6" -chrono = { workspace = true } -zstd = "0.13" -lz4_flex = "0.11" -bzip2 = "0.4" -bytes = "1" -memmap2 = "0.9" -hex = "0.4" -regex = "1.10" tempfile = "3.10" [features] diff --git a/crates/roboflow-dataset/src/formats/common/base.rs b/crates/roboflow-dataset/src/formats/common/base.rs index afec3d35..d62602b9 100644 --- a/crates/roboflow-dataset/src/formats/common/base.rs +++ b/crates/roboflow-dataset/src/formats/common/base.rs @@ -21,7 +21,6 @@ //! - [`AudioData`] - Audio data with metadata //! - [`CameraInfo`] - Camera calibration information -#[allow(unused_imports)] use roboflow_core::Result; use roboflow_media::{AudioData, CameraInfo, ImageData}; use std::collections::HashMap; diff --git a/crates/roboflow-dataset/src/formats/lerobot/writer/parquet.rs b/crates/roboflow-dataset/src/formats/lerobot/writer/parquet.rs index 5eb96379..280cbea6 100644 --- a/crates/roboflow-dataset/src/formats/lerobot/writer/parquet.rs +++ b/crates/roboflow-dataset/src/formats/lerobot/writer/parquet.rs @@ -28,6 +28,9 @@ const DEFAULT_ACTION_DIMENSION: usize = 14; /// /// This is a convenience wrapper that uses chunk index 0. /// For distributed processing with dynamic chunk indices, use `write_episode_parquet_with_chunk`. +/// +/// Note: This function is kept for testing and simple use cases. +/// Production code should use `write_episode_parquet_with_chunk` for full control. #[allow(dead_code)] pub fn write_episode_parquet( frame_data: &[LerobotFrame], diff --git a/crates/roboflow-distributed/Cargo.toml b/crates/roboflow-distributed/Cargo.toml index 96701646..32fca63b 100644 --- a/crates/roboflow-distributed/Cargo.toml +++ b/crates/roboflow-distributed/Cargo.toml @@ -10,7 +10,6 @@ description = "Distributed coordination for roboflow - TiKV backend" [dependencies] roboflow-core = { workspace = true } roboflow-storage = { workspace = true } -roboflow-executor = { workspace = true } # TiKV tikv-client = "0.3" @@ -44,7 +43,6 @@ glob = "0.3" uuid = { version = "1.10", features = ["v4", "serde"] } sha2 = "0.10" gethostname = "0.4" -lru = "0.12" # Parquet (merge operations) polars = { version = "0.41", features = ["parquet", "lazy", "diagonal_concat"] } diff --git a/crates/roboflow-media/Cargo.toml b/crates/roboflow-media/Cargo.toml index 84d1f09f..b8ed0367 100644 --- a/crates/roboflow-media/Cargo.toml +++ b/crates/roboflow-media/Cargo.toml @@ -16,7 +16,6 @@ serde_json = { workspace = true } # Image decoding image = { version = "0.25", default-features = false, features = ["jpeg", "png"] } zune-jpeg = "0.4" -png = "0.17" # Video encoding rsmpeg = { version = "0.18", features = ["link_system_ffmpeg", "link_vcpkg_ffmpeg"] } diff --git a/crates/roboflow-media/src/video/arena.rs b/crates/roboflow-media/src/video/arena.rs index 114fa67d..86b5b53e 100644 --- a/crates/roboflow-media/src/video/arena.rs +++ b/crates/roboflow-media/src/video/arena.rs @@ -60,6 +60,9 @@ pub struct FramePool { alloc_count: AtomicU64, } +// SAFETY: FramePool uses atomic operations (AtomicU64, AtomicBool) for all +// concurrent access. The raw pointer (base_ptr) is managed internally with +// proper synchronization via atomic CAS on free_mask. unsafe impl Send for FramePool {} unsafe impl Sync for FramePool {} @@ -97,6 +100,9 @@ impl FramePool { let base_ptr = if total_size == 0 { std::ptr::null_mut() } else { + // SAFETY: Layout is valid (validated above). We allocate exactly `total_size` bytes + // with 64-byte alignment for SIMD operations. The pointer is stored in base_ptr + // and will be deallocated in Drop using the same layout. let ptr = unsafe { alloc(layout) }; if ptr.is_null() { return Err(FramePoolError::AllocationFailed( @@ -162,6 +168,8 @@ impl FramePool { pub fn acquire(&self) -> Option> { let slot_index = self.try_acquire_slot()?; Some(OwnedSlot { + // SAFETY: slot_index is guaranteed to be < slot_count by try_acquire_slot. + // base_ptr is valid and aligned, and the offset calculation is within bounds. data_ptr: unsafe { NonNull::new_unchecked(self.base_ptr.add((slot_index as usize) * self.slot_size)) }, @@ -217,6 +225,9 @@ impl FramePool { impl Drop for FramePool { fn drop(&mut self) { if !self.base_ptr.is_null() && self.layout.size() > 0 { + // SAFETY: base_ptr was allocated with the same layout in new(), + // and is still valid. All slots must be released before drop + // (enforced by RAII via OwnedSlot). unsafe { dealloc(self.base_ptr, self.layout); } @@ -247,6 +258,8 @@ pub struct OwnedSlot<'a> { height: u32, } +// SAFETY: OwnedSlot points to memory in FramePool which is Send + Sync. +// The slot is exclusively owned, so no data races are possible. unsafe impl Send for OwnedSlot<'_> {} impl OwnedSlot<'_> { @@ -260,10 +273,14 @@ impl OwnedSlot<'_> { } #[inline] pub fn data(&self) -> &[u8] { + // SAFETY: data_ptr is valid for data_size bytes, acquired from FramePool + // which guarantees proper alignment and size. unsafe { std::slice::from_raw_parts(self.data_ptr.as_ptr(), self.data_size) } } #[inline] pub fn data_mut(&mut self) -> &mut [u8] { + // SAFETY: data_ptr is valid for data_size bytes, acquired from FramePool. + // We have exclusive access via &mut self, so aliasing is not possible. unsafe { std::slice::from_raw_parts_mut(self.data_ptr.as_ptr(), self.data_size) } } #[inline] @@ -315,6 +332,9 @@ pub struct AtomicFramePool { inner: FramePool, } +// SAFETY: AtomicFramePool wraps FramePool and uses atomic operations for slot +// acquisition/release. All mutable operations use atomic compare_exchange, +// making it safe for concurrent access from multiple threads. unsafe impl Send for AtomicFramePool {} unsafe impl Sync for AtomicFramePool {} @@ -328,6 +348,8 @@ impl AtomicFramePool { pub fn acquire(self: &Arc) -> Option { let slot_index = self.inner.try_acquire_slot()?; Some(ArcSlot { + // SAFETY: slot_index is guaranteed to be < slot_count by try_acquire_slot. + // base_ptr is valid and aligned, and the offset calculation is within bounds. data_ptr: unsafe { NonNull::new_unchecked( self.inner @@ -388,6 +410,8 @@ pub struct ArcSlot { _guard: SlotGuard, } +// SAFETY: ArcSlot points to memory in AtomicFramePool which is Send + Sync. +// The slot is exclusively owned via SlotGuard, so no data races are possible. unsafe impl Send for ArcSlot {} impl ArcSlot { @@ -401,10 +425,14 @@ impl ArcSlot { } #[inline] pub fn data(&self) -> &[u8] { + // SAFETY: data_ptr is valid for data_size bytes, acquired from AtomicFramePool + // which guarantees proper alignment and size. unsafe { std::slice::from_raw_parts(self.data_ptr.as_ptr(), self.data_size) } } #[inline] pub fn data_mut(&mut self) -> &mut [u8] { + // SAFETY: data_ptr is valid for data_size bytes, acquired from AtomicFramePool. + // We have exclusive access via &mut self, so aliasing is not possible. unsafe { std::slice::from_raw_parts_mut(self.data_ptr.as_ptr(), self.data_size) } } #[inline] diff --git a/crates/roboflow-media/src/video/codec.rs b/crates/roboflow-media/src/video/codec.rs index a20e0d30..f51044ba 100644 --- a/crates/roboflow-media/src/video/codec.rs +++ b/crates/roboflow-media/src/video/codec.rs @@ -243,6 +243,8 @@ fn find_and_create_context(name: &str) -> Result<(AVCodecContext, String, bool), .ok_or_else(|| "No H.264 encoder available".to_string())?; let actual_name = codec.name().to_str().unwrap_or("unknown").to_string(); + // SAFETY: codec.as_ptr() returns a valid pointer to an AVCodec struct + // obtained from FFmpeg's find_encoder. We only read the capabilities field. let codec_caps = unsafe { (*codec.as_ptr()).capabilities }; let supports_flush = (codec_caps & ffi::AV_CODEC_CAP_ENCODER_FLUSH as i32) != 0; diff --git a/crates/roboflow-media/src/video/composer.rs b/crates/roboflow-media/src/video/composer.rs index cfa9740d..ffdd40ce 100644 --- a/crates/roboflow-media/src/video/composer.rs +++ b/crates/roboflow-media/src/video/composer.rs @@ -94,12 +94,28 @@ impl VideoComposer for RsmpegVideoComposer { let mut stream_mapping: Vec> = Vec::new(); for stream in first_input.streams().iter() { let mut out_stream = output_ctx.new_stream(); + // SAFETY: avcodec_parameters_alloc allocates a new parameters struct. + // We check for null before calling avcodec_parameters_copy. + // avcodec_parameters_copy returns 0 on success, negative on error. + // On error, we free the allocated struct before returning. + // The from_raw conversion is safe because we verified the pointer is non-null. let codecpar = unsafe { let new_par = ffi::avcodec_parameters_alloc(); - ffi::avcodec_parameters_copy(new_par, stream.codecpar().as_ptr() as *const _); - rsmpeg::avcodec::AVCodecParameters::from_raw( - std::ptr::NonNull::new(new_par).unwrap(), - ) + let new_par = std::ptr::NonNull::new(new_par) + .ok_or_else(|| RoboflowError::other("failed to allocate codec parameters"))?; + let ret = ffi::avcodec_parameters_copy( + new_par.as_ptr(), + stream.codecpar().as_ptr() as *const _, + ); + if ret < 0 { + let mut ptr = new_par.as_ptr(); + ffi::avcodec_parameters_free(&mut ptr); + return Err(RoboflowError::other(format!( + "avcodec_parameters_copy failed: error code {}", + ret + ))); + } + rsmpeg::avcodec::AVCodecParameters::from_raw(new_par) }; out_stream.set_codecpar(codecpar); out_stream.set_time_base(AVRational { diff --git a/crates/roboflow-media/src/video/encoder.rs b/crates/roboflow-media/src/video/encoder.rs index 29530db1..0c05c8b9 100644 --- a/crates/roboflow-media/src/video/encoder.rs +++ b/crates/roboflow-media/src/video/encoder.rs @@ -459,6 +459,9 @@ impl VideoEncoder { )); } + // SAFETY: frame_data was allocated by AVFrame::get_buffer and is valid for + // rgb_data.len() bytes. We verified it's not null above. The copy does not + // overlap with the source since rgb_data is a separate buffer. unsafe { let frame_data_slice = std::slice::from_raw_parts_mut(frame_data, rgb_data.len()); frame_data_slice.copy_from_slice(rgb_data); @@ -478,6 +481,10 @@ impl VideoEncoder { // Convert pixel format if let Some(ref sws) = self.sws_context { + // SAFETY: sws_context was initialized with valid source and destination + // parameters. input_frame and yuv_frame are valid AVFrame pointers with + // proper buffer allocations. The FFmpeg sws_scale function is safe to call + // with these validated parameters. unsafe { ffi::sws_scale( sws.as_ptr() as *mut _, @@ -721,6 +728,9 @@ impl VideoEncoder { let y_data = yuv_frame.data_mut(); let y_ptr = y_data[0]; if !y_ptr.is_null() { + // SAFETY: y_ptr is a valid pointer to Y plane data in yuv_frame, + // allocated by FFmpeg with sufficient size for y_size bytes. + // y_plane.as_ptr() points to valid source data. No overlap. unsafe { std::ptr::copy_nonoverlapping(y_plane.as_ptr(), y_ptr, y_size); } @@ -730,6 +740,9 @@ impl VideoEncoder { let uv_data = yuv_frame.data_mut(); let uv_ptr = uv_data[1]; if !uv_ptr.is_null() { + // SAFETY: uv_ptr is a valid pointer to UV plane data in yuv_frame, + // allocated by FFmpeg with sufficient size for uv_plane.len() bytes. + // uv_plane.as_ptr() points to valid source data. No overlap. unsafe { std::ptr::copy_nonoverlapping( uv_plane.as_ptr(), @@ -757,10 +770,24 @@ impl VideoEncoder { let frame_data_array = input_frame.data_mut(); let frame_data_ptr = frame_data_array[0]; let frame_data = frame.data(); + + if frame_data_ptr.is_null() { + return Err(RoboflowError::encode( + "VideoEncoder", + "Input frame data pointer is null", + )); + } + + // SAFETY: frame_data_ptr was allocated by input_frame.get_buffer and is + // valid for frame_data.len() bytes. We verified the pointer is non-null. + // copy_from_slice does not overlap. let frame_data_slice = unsafe { std::slice::from_raw_parts_mut(frame_data_ptr, frame_data.len()) }; frame_data_slice.copy_from_slice(frame_data); + // SAFETY: sws_context was initialized with valid source and destination + // parameters. Both input_frame and yuv_frame have valid buffer allocations. + // The FFmpeg sws_scale function is safe with these validated parameters. unsafe { ffi::sws_scale( sws_context.as_ptr() as *mut _, diff --git a/crates/roboflow-pipeline/Cargo.toml b/crates/roboflow-pipeline/Cargo.toml index bf180cdd..f001aaa4 100644 --- a/crates/roboflow-pipeline/Cargo.toml +++ b/crates/roboflow-pipeline/Cargo.toml @@ -9,21 +9,13 @@ description = "Pipeline execution and stages for dataset processing" [dependencies] roboflow-core = { workspace = true } -roboflow-storage = { workspace = true } roboflow-executor = { workspace = true } roboflow-dataset = { workspace = true } roboflow-media = { workspace = true } robocodec = { workspace = true } -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -thiserror = "1.0" tracing = "0.1" -tokio = { workspace = true } async-trait = { workspace = true } -chrono = { workspace = true } -rayon = "1.10" -crossbeam-channel = "0.5" [features] default = [] diff --git a/crates/roboflow-pipeline/src/stages/convert.rs b/crates/roboflow-pipeline/src/stages/convert.rs index 83a96c12..768bb631 100644 --- a/crates/roboflow-pipeline/src/stages/convert.rs +++ b/crates/roboflow-pipeline/src/stages/convert.rs @@ -34,16 +34,15 @@ impl Stage for ConvertStage { fn create_task(&self, partition: PartitionId) -> Box { Box::new(ConvertTask { - output_dir: self.output_dir.clone(), - partition, + _output_dir: self.output_dir.clone(), + _partition: partition, }) } } -#[allow(dead_code)] struct ConvertTask { - output_dir: PathBuf, - partition: PartitionId, + _output_dir: PathBuf, + _partition: PartitionId, } #[async_trait::async_trait] diff --git a/crates/roboflow-pipeline/src/stages/discover.rs b/crates/roboflow-pipeline/src/stages/discover.rs index c3e0829f..ea918c7c 100644 --- a/crates/roboflow-pipeline/src/stages/discover.rs +++ b/crates/roboflow-pipeline/src/stages/discover.rs @@ -32,16 +32,15 @@ impl Stage for DiscoverStage { fn create_task(&self, partition: PartitionId) -> Box { Box::new(DiscoverTask { - input_dir: self.input_dir.clone(), - partition, + _input_dir: self.input_dir.clone(), + _partition: partition, }) } } -#[allow(dead_code)] struct DiscoverTask { - input_dir: PathBuf, - partition: PartitionId, + _input_dir: PathBuf, + _partition: PartitionId, } #[async_trait::async_trait] diff --git a/crates/roboflow-pipeline/src/stages/merge.rs b/crates/roboflow-pipeline/src/stages/merge.rs index 2c178814..c5bf4dc6 100644 --- a/crates/roboflow-pipeline/src/stages/merge.rs +++ b/crates/roboflow-pipeline/src/stages/merge.rs @@ -32,16 +32,15 @@ impl Stage for MergeStage { fn create_task(&self, partition: PartitionId) -> Box { Box::new(MergeTask { - output_dir: self.output_dir.clone(), - partition, + _output_dir: self.output_dir.clone(), + _partition: partition, }) } } -#[allow(dead_code)] struct MergeTask { - output_dir: PathBuf, - partition: PartitionId, + _output_dir: PathBuf, + _partition: PartitionId, } #[async_trait::async_trait] diff --git a/crates/roboflow-storage/src/cached/eviction.rs b/crates/roboflow-storage/src/cached/eviction.rs index c5877a26..e088cdc0 100644 --- a/crates/roboflow-storage/src/cached/eviction.rs +++ b/crates/roboflow-storage/src/cached/eviction.rs @@ -36,6 +36,7 @@ impl std::fmt::Display for EvictionPolicy { /// Metadata for a cache entry used in eviction decisions. /// /// This is used by the `select_eviction_candidate` helper function. +/// Currently only used in tests; kept for future cache management integration. #[derive(Debug)] #[allow(dead_code)] pub struct CacheEntryMeta { @@ -55,6 +56,7 @@ pub struct CacheEntryMeta { /// /// Returns `Some((path, size))` of the entry to evict, or `None` if no /// suitable candidate exists (e.g., all entries have pending uploads). +/// Currently only used in tests; kept for future cache management integration. #[allow(dead_code)] pub fn select_eviction_candidate( entries: &[CacheEntryMeta], @@ -84,18 +86,6 @@ pub fn select_eviction_candidate( mod tests { use super::*; - #[allow(dead_code)] - fn create_entry(path: &str, size: u64, access_count: u64) -> CacheEntryMeta { - let now = SystemTime::now(); - CacheEntryMeta { - path: PathBuf::from(path), - size, - last_accessed: now - std::time::Duration::from_secs(access_count), - created_at: now - std::time::Duration::from_secs(access_count * 2), - access_count, - } - } - #[test] fn test_eviction_policy_default() { let policy = EvictionPolicy::default();