Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 0 additions & 53 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 0 additions & 16 deletions crates/roboflow-dataset/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,20 @@ description = "Dataset formats and conversion for robotics training data"
[dependencies]
roboflow-core = { workspace = true }
roboflow-storage = { workspace = true }
roboflow-executor = { workspace = true }
roboflow-media = { workspace = true }
robocodec = { workspace = true }

polars = { version = "0.41", features = ["parquet", "lazy", "diagonal_concat"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml_ng = "0.10"
toml = "0.8"
thiserror = "1.0"
anyhow = "1.0"
tracing = "0.1"
crossbeam-channel = "0.5"
crossbeam-deque = "0.8"
rayon = "1.10"
num_cpus = "1.16"
tokio = { workspace = true }
async-trait = { workspace = true }
futures = "0.3"
uuid = { version = "1.10", features = ["v4", "serde"] }
mcap = "0.24"
rosbag = "0.6"
chrono = { workspace = true }
zstd = "0.13"
lz4_flex = "0.11"
bzip2 = "0.4"
bytes = "1"
memmap2 = "0.9"
hex = "0.4"
regex = "1.10"
tempfile = "3.10"

[features]
Expand Down
1 change: 0 additions & 1 deletion crates/roboflow-dataset/src/formats/common/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
//! - [`AudioData`] - Audio data with metadata
//! - [`CameraInfo`] - Camera calibration information

#[allow(unused_imports)]
use roboflow_core::Result;
use roboflow_media::{AudioData, CameraInfo, ImageData};
Comment on lines 24 to 25
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

roboflow_media is now optional but used unconditionally. Since common module is used by multiple formats, either:

  1. Make roboflow_media always required (not optional), OR
  2. Feature-gate this entire module with #[cfg(feature = "lerobot")]

Current state will cause compilation errors when default features are disabled.

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/roboflow-dataset/src/formats/common/base.rs
Line: 24-25

Comment:
`roboflow_media` is now optional but used unconditionally. Since `common` module is used by multiple formats, either:
1. Make `roboflow_media` always required (not optional), OR
2. Feature-gate this entire module with `#[cfg(feature = "lerobot")]`

Current state will cause compilation errors when default features are disabled.

How can I resolve this? If you propose a fix, please make it concise.

use std::collections::HashMap;
Expand Down
3 changes: 3 additions & 0 deletions crates/roboflow-dataset/src/formats/lerobot/writer/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const DEFAULT_ACTION_DIMENSION: usize = 14;
///
/// This is a convenience wrapper that uses chunk index 0.
/// For distributed processing with dynamic chunk indices, use `write_episode_parquet_with_chunk`.
///
/// Note: This function is kept for testing and simple use cases.
/// Production code should use `write_episode_parquet_with_chunk` for full control.
#[allow(dead_code)]
pub fn write_episode_parquet(
frame_data: &[LerobotFrame],
Expand Down
2 changes: 0 additions & 2 deletions crates/roboflow-distributed/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ description = "Distributed coordination for roboflow - TiKV backend"
[dependencies]
roboflow-core = { workspace = true }
roboflow-storage = { workspace = true }
roboflow-executor = { workspace = true }

# TiKV
tikv-client = "0.3"
Expand Down Expand Up @@ -44,7 +43,6 @@ glob = "0.3"
uuid = { version = "1.10", features = ["v4", "serde"] }
sha2 = "0.10"
gethostname = "0.4"
lru = "0.12"

# Parquet (merge operations)
polars = { version = "0.41", features = ["parquet", "lazy", "diagonal_concat"] }
Expand Down
1 change: 0 additions & 1 deletion crates/roboflow-media/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ serde_json = { workspace = true }
# Image decoding
image = { version = "0.25", default-features = false, features = ["jpeg", "png"] }
zune-jpeg = "0.4"
png = "0.17"

# Video encoding
rsmpeg = { version = "0.18", features = ["link_system_ffmpeg", "link_vcpkg_ffmpeg"] }
Expand Down
28 changes: 28 additions & 0 deletions crates/roboflow-media/src/video/arena.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ pub struct FramePool {
alloc_count: AtomicU64,
}

// SAFETY: FramePool uses atomic operations (AtomicU64, AtomicBool) for all
// concurrent access. The raw pointer (base_ptr) is managed internally with
// proper synchronization via atomic CAS on free_mask.
unsafe impl Send for FramePool {}
unsafe impl Sync for FramePool {}

Expand Down Expand Up @@ -97,6 +100,9 @@ impl FramePool {
let base_ptr = if total_size == 0 {
std::ptr::null_mut()
} else {
// SAFETY: Layout is valid (validated above). We allocate exactly `total_size` bytes
// with 64-byte alignment for SIMD operations. The pointer is stored in base_ptr
// and will be deallocated in Drop using the same layout.
let ptr = unsafe { alloc(layout) };
if ptr.is_null() {
return Err(FramePoolError::AllocationFailed(
Expand Down Expand Up @@ -162,6 +168,8 @@ impl FramePool {
pub fn acquire(&self) -> Option<OwnedSlot<'_>> {
let slot_index = self.try_acquire_slot()?;
Some(OwnedSlot {
// SAFETY: slot_index is guaranteed to be < slot_count by try_acquire_slot.
// base_ptr is valid and aligned, and the offset calculation is within bounds.
data_ptr: unsafe {
NonNull::new_unchecked(self.base_ptr.add((slot_index as usize) * self.slot_size))
},
Expand Down Expand Up @@ -217,6 +225,9 @@ impl FramePool {
impl Drop for FramePool {
fn drop(&mut self) {
if !self.base_ptr.is_null() && self.layout.size() > 0 {
// SAFETY: base_ptr was allocated with the same layout in new(),
// and is still valid. All slots must be released before drop
// (enforced by RAII via OwnedSlot).
unsafe {
dealloc(self.base_ptr, self.layout);
}
Expand Down Expand Up @@ -247,6 +258,8 @@ pub struct OwnedSlot<'a> {
height: u32,
}

// SAFETY: OwnedSlot points to memory in FramePool which is Send + Sync.
// The slot is exclusively owned, so no data races are possible.
unsafe impl Send for OwnedSlot<'_> {}

impl OwnedSlot<'_> {
Expand All @@ -260,10 +273,14 @@ impl OwnedSlot<'_> {
}
#[inline]
pub fn data(&self) -> &[u8] {
// SAFETY: data_ptr is valid for data_size bytes, acquired from FramePool
// which guarantees proper alignment and size.
unsafe { std::slice::from_raw_parts(self.data_ptr.as_ptr(), self.data_size) }
}
#[inline]
pub fn data_mut(&mut self) -> &mut [u8] {
// SAFETY: data_ptr is valid for data_size bytes, acquired from FramePool.
// We have exclusive access via &mut self, so aliasing is not possible.
unsafe { std::slice::from_raw_parts_mut(self.data_ptr.as_ptr(), self.data_size) }
}
#[inline]
Expand Down Expand Up @@ -315,6 +332,9 @@ pub struct AtomicFramePool {
inner: FramePool,
}

// SAFETY: AtomicFramePool wraps FramePool and uses atomic operations for slot
// acquisition/release. All mutable operations use atomic compare_exchange,
// making it safe for concurrent access from multiple threads.
unsafe impl Send for AtomicFramePool {}
unsafe impl Sync for AtomicFramePool {}

Expand All @@ -328,6 +348,8 @@ impl AtomicFramePool {
pub fn acquire(self: &Arc<Self>) -> Option<ArcSlot> {
let slot_index = self.inner.try_acquire_slot()?;
Some(ArcSlot {
// SAFETY: slot_index is guaranteed to be < slot_count by try_acquire_slot.
// base_ptr is valid and aligned, and the offset calculation is within bounds.
data_ptr: unsafe {
NonNull::new_unchecked(
self.inner
Expand Down Expand Up @@ -388,6 +410,8 @@ pub struct ArcSlot {
_guard: SlotGuard,
}

// SAFETY: ArcSlot points to memory in AtomicFramePool which is Send + Sync.
// The slot is exclusively owned via SlotGuard, so no data races are possible.
unsafe impl Send for ArcSlot {}

impl ArcSlot {
Expand All @@ -401,10 +425,14 @@ impl ArcSlot {
}
#[inline]
pub fn data(&self) -> &[u8] {
// SAFETY: data_ptr is valid for data_size bytes, acquired from AtomicFramePool
// which guarantees proper alignment and size.
unsafe { std::slice::from_raw_parts(self.data_ptr.as_ptr(), self.data_size) }
}
#[inline]
pub fn data_mut(&mut self) -> &mut [u8] {
// SAFETY: data_ptr is valid for data_size bytes, acquired from AtomicFramePool.
// We have exclusive access via &mut self, so aliasing is not possible.
unsafe { std::slice::from_raw_parts_mut(self.data_ptr.as_ptr(), self.data_size) }
}
#[inline]
Expand Down
2 changes: 2 additions & 0 deletions crates/roboflow-media/src/video/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ fn find_and_create_context(name: &str) -> Result<(AVCodecContext, String, bool),
.ok_or_else(|| "No H.264 encoder available".to_string())?;

let actual_name = codec.name().to_str().unwrap_or("unknown").to_string();
// SAFETY: codec.as_ptr() returns a valid pointer to an AVCodec struct
// obtained from FFmpeg's find_encoder. We only read the capabilities field.
let codec_caps = unsafe { (*codec.as_ptr()).capabilities };
let supports_flush = (codec_caps & ffi::AV_CODEC_CAP_ENCODER_FLUSH as i32) != 0;

Expand Down
24 changes: 20 additions & 4 deletions crates/roboflow-media/src/video/composer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,28 @@ impl VideoComposer for RsmpegVideoComposer {
let mut stream_mapping: Vec<Option<usize>> = Vec::new();
for stream in first_input.streams().iter() {
let mut out_stream = output_ctx.new_stream();
// SAFETY: avcodec_parameters_alloc allocates a new parameters struct.
// We check for null before calling avcodec_parameters_copy.
// avcodec_parameters_copy returns 0 on success, negative on error.
// On error, we free the allocated struct before returning.
// The from_raw conversion is safe because we verified the pointer is non-null.
let codecpar = unsafe {
let new_par = ffi::avcodec_parameters_alloc();
ffi::avcodec_parameters_copy(new_par, stream.codecpar().as_ptr() as *const _);
rsmpeg::avcodec::AVCodecParameters::from_raw(
std::ptr::NonNull::new(new_par).unwrap(),
)
let new_par = std::ptr::NonNull::new(new_par)
.ok_or_else(|| RoboflowError::other("failed to allocate codec parameters"))?;
let ret = ffi::avcodec_parameters_copy(
new_par.as_ptr(),
stream.codecpar().as_ptr() as *const _,
);
if ret < 0 {
let mut ptr = new_par.as_ptr();
ffi::avcodec_parameters_free(&mut ptr);
return Err(RoboflowError::other(format!(
"avcodec_parameters_copy failed: error code {}",
ret
)));
}
Comment on lines +106 to +117
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory leak on the error path: if avcodec_parameters_copy returns a negative error code, new_par was allocated by avcodec_parameters_alloc but is never freed before the early return. NonNull is just a pointer wrapper with no Drop impl, so the AVCodecParameters struct leaks. This only triggers when the copy fails (e.g. OOM), but it should still be cleaned up. You need to call ffi::avcodec_parameters_free on the error path (passing &mut new_par.as_ptr()).

Suggested change
let ret = ffi::avcodec_parameters_copy(
new_par.as_ptr(),
stream.codecpar().as_ptr() as *const _,
);
if ret < 0 {
return Err(RoboflowError::other(format!(
"avcodec_parameters_copy failed: error code {}",
ret
)));
}
let ret = ffi::avcodec_parameters_copy(
new_par.as_ptr(),
stream.codecpar().as_ptr() as *const _,
);
if ret < 0 {
ffi::avcodec_parameters_free(&mut new_par.as_ptr());
return Err(RoboflowError::other(format!(
"avcodec_parameters_copy failed: error code {}",
ret
)));
}

Fix it with Roo Code or mention @roomote and request a fix.

rsmpeg::avcodec::AVCodecParameters::from_raw(new_par)
};
out_stream.set_codecpar(codecpar);
out_stream.set_time_base(AVRational {
Expand Down
Loading
Loading