Skip to content

Commit e9e59fa

Browse files
authored
Shm allocators (#2117)
* Implicit SHM optimization added * rustfmt * fix transport performance by optimizing it's layout * optimize TransmissionPipelineProducer layout (that also optimizes layouts for transports) * code format * significantly improve TransmissionPipelineProducer layout (add 600k msgs\sec) * add comment * - fix bug with always-enabled SHM * Add collection of shm allocators with various strong sides * polish code * spelling * fix merge issues * make tests for implicit SHM optimization * review fixes * go back to Duration in Waits * Update Cargo.toml * fallback to buddy_system_allocator 0.10.0 as Rust 1.75-friendly * clippy fix * taplo fix * Fix port number in tests * fix port conflict in tests * Update unicast_shm.rs * review fixes
1 parent 07269d9 commit e9e59fa

File tree

12 files changed

+881
-300
lines changed

12 files changed

+881
-300
lines changed

Cargo.lock

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ async-std = { version = "1.6.5", features = ["tokio1"] }
8989
async-trait = "0.1.82"
9090
base64 = "0.22.1"
9191
bincode = "1.3.3"
92+
buddy_system_allocator = "=0.10.0"
9293
bytes = "1.7.1"
9394
clap = { version = "4.5.17", features = ["derive"] }
9495
console-subscriber = "0.4.0"
@@ -170,6 +171,7 @@ static_assertions = "1.1.0"
170171
static_init = "1.0.3"
171172
stop-token = "0.7.0"
172173
syn = "2.0"
174+
talc = { version = "4.4.3", default-features = false }
173175
test-case = "3.3.1"
174176
tide = "0.16.0"
175177
time = "0.3.36"

commons/zenoh-shm/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ test = ["num_cpus"]
3333

3434
[dependencies]
3535
async-trait = { workspace = true }
36+
buddy_system_allocator = { workspace = true }
3637
crossbeam-channel = { workspace = true }
3738
crossbeam-queue = { workspace = true }
3839
num-traits = { workspace = true }
@@ -41,6 +42,7 @@ rand = { workspace = true, features = ["std", "std_rng"] }
4142
stabby = { workspace = true }
4243
static_assertions = { workspace = true }
4344
static_init = { workspace = true }
45+
talc = { workspace = true }
4446
thread-priority = { workspace = true }
4547
tokio = { workspace = true }
4648
tracing = { workspace = true }

commons/zenoh-shm/src/api/protocol_implementations/posix/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@
1313
//
1414

1515
pub mod posix_shm_client;
16+
1617
pub mod posix_shm_provider_backend;
18+
pub mod posix_shm_provider_backend_binary_heap;
19+
pub mod posix_shm_provider_backend_buddy;
20+
pub mod posix_shm_provider_backend_talc;
21+
1722
pub mod protocol_id;
1823

1924
pub(crate) mod posix_shm_segment;

commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend.rs

Lines changed: 4 additions & 283 deletions
Original file line numberDiff line numberDiff line change
@@ -12,289 +12,10 @@
1212
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
1313
//
1414

15-
use std::{
16-
cmp,
17-
collections::BinaryHeap,
18-
num::NonZeroUsize,
19-
sync::{
20-
atomic::{AtomicUsize, Ordering},
21-
Arc, Mutex,
22-
},
15+
use crate::api::protocol_implementations::posix::posix_shm_provider_backend_talc::{
16+
PosixShmProviderBackendTalc, PosixShmProviderBackendTalcBuilder,
2317
};
2418

25-
use zenoh_core::{zlock, Resolvable, Wait};
26-
use zenoh_result::ZResult;
19+
pub type PosixShmProviderBackendBuilder<What> = PosixShmProviderBackendTalcBuilder<What>;
2720

28-
use super::posix_shm_segment::PosixShmSegment;
29-
use crate::api::{
30-
common::{
31-
types::{ChunkID, ProtocolID, PtrInSegment},
32-
with_id::WithProtocolID,
33-
},
34-
protocol_implementations::posix::protocol_id::POSIX_PROTOCOL_ID,
35-
provider::{
36-
chunk::{AllocatedChunk, ChunkDescriptor},
37-
memory_layout::{MemLayout, MemoryLayout, StaticLayout, TryIntoMemoryLayout},
38-
shm_provider_backend::ShmProviderBackend,
39-
types::{AllocAlignment, ChunkAllocResult, ZAllocError, ZLayoutError},
40-
},
41-
};
42-
43-
#[derive(Eq, Copy, Clone, Debug)]
44-
struct Chunk {
45-
offset: ChunkID,
46-
size: NonZeroUsize,
47-
}
48-
49-
impl Ord for Chunk {
50-
fn cmp(&self, other: &Self) -> cmp::Ordering {
51-
self.size.cmp(&other.size)
52-
}
53-
}
54-
55-
impl PartialOrd for Chunk {
56-
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
57-
Some(self.cmp(other))
58-
}
59-
}
60-
61-
impl PartialEq for Chunk {
62-
fn eq(&self, other: &Self) -> bool {
63-
self.size == other.size
64-
}
65-
}
66-
67-
/// Builder to create posix SHM provider
68-
#[zenoh_macros::unstable_doc]
69-
pub struct PosixShmProviderBackendBuilder<Layout: MemLayout> {
70-
layout: Layout,
71-
}
72-
73-
#[zenoh_macros::unstable_doc]
74-
impl<Layout: TryIntoMemoryLayout> Resolvable for PosixShmProviderBackendBuilder<Layout> {
75-
type To = ZResult<PosixShmProviderBackend>;
76-
}
77-
78-
#[zenoh_macros::unstable_doc]
79-
impl<Layout: TryIntoMemoryLayout> Wait for PosixShmProviderBackendBuilder<Layout> {
80-
fn wait(self) -> <Self as Resolvable>::To {
81-
let layout: MemoryLayout = self.layout.try_into()?;
82-
PosixShmProviderBackend::new(&layout)
83-
}
84-
}
85-
86-
#[zenoh_macros::unstable_doc]
87-
impl Resolvable for PosixShmProviderBackendBuilder<&MemoryLayout> {
88-
type To = ZResult<PosixShmProviderBackend>;
89-
}
90-
91-
#[zenoh_macros::unstable_doc]
92-
impl Wait for PosixShmProviderBackendBuilder<&MemoryLayout> {
93-
fn wait(self) -> <Self as Resolvable>::To {
94-
PosixShmProviderBackend::new(self.layout)
95-
}
96-
}
97-
98-
#[zenoh_macros::unstable_doc]
99-
impl<T> Resolvable for PosixShmProviderBackendBuilder<StaticLayout<T>> {
100-
type To = ZResult<PosixShmProviderBackend>;
101-
}
102-
103-
#[zenoh_macros::unstable_doc]
104-
impl<T> Wait for PosixShmProviderBackendBuilder<StaticLayout<T>> {
105-
fn wait(self) -> <Self as Resolvable>::To {
106-
PosixShmProviderBackend::new(&self.layout.into())
107-
}
108-
}
109-
110-
/// A backend for ShmProvider based on POSIX shared memory.
111-
/// This is the default general-purpose backed shipped with Zenoh.
112-
#[zenoh_macros::unstable_doc]
113-
pub struct PosixShmProviderBackend {
114-
available: AtomicUsize,
115-
segment: Arc<PosixShmSegment>,
116-
free_list: Mutex<BinaryHeap<Chunk>>,
117-
alignment: AllocAlignment,
118-
}
119-
120-
impl PosixShmProviderBackend {
121-
/// Get the builder to construct a new instance
122-
#[zenoh_macros::unstable_doc]
123-
pub fn builder<Layout: MemLayout>(layout: Layout) -> PosixShmProviderBackendBuilder<Layout> {
124-
PosixShmProviderBackendBuilder { layout }
125-
}
126-
127-
fn new(layout: &MemoryLayout) -> ZResult<Self> {
128-
let segment = Arc::new(PosixShmSegment::create(layout.size())?);
129-
130-
// because of platform specific, our shm segment is >= requested size, so in order to utilize
131-
// additional memory we re-layout the size
132-
let real_size = segment.segment.elem_count().get();
133-
let aligned_size = (real_size
134-
- (real_size % layout.alignment().get_alignment_value().get()))
135-
.try_into()?;
136-
137-
let mut free_list = BinaryHeap::new();
138-
let root_chunk = Chunk {
139-
offset: 0,
140-
size: aligned_size,
141-
};
142-
free_list.push(root_chunk);
143-
144-
tracing::trace!(
145-
"Created PosixShmProviderBackend id {}, layout {:?}, aligned size {aligned_size}",
146-
segment.segment.id(),
147-
layout
148-
);
149-
150-
Ok(Self {
151-
available: AtomicUsize::new(aligned_size.get()),
152-
segment,
153-
free_list: Mutex::new(free_list),
154-
alignment: layout.alignment(),
155-
})
156-
}
157-
}
158-
159-
impl WithProtocolID for PosixShmProviderBackend {
160-
fn id(&self) -> ProtocolID {
161-
POSIX_PROTOCOL_ID
162-
}
163-
}
164-
165-
impl ShmProviderBackend for PosixShmProviderBackend {
166-
fn alloc(&self, layout: &MemoryLayout) -> ChunkAllocResult {
167-
tracing::trace!("PosixShmProviderBackend::alloc({:?})", layout);
168-
169-
let required_len = layout.size();
170-
171-
if self.available.load(Ordering::Relaxed) < required_len.get() {
172-
tracing::trace!( "PosixShmProviderBackend does not have sufficient free memory to allocate {:?}, try de-fragmenting!", layout);
173-
return Err(ZAllocError::OutOfMemory);
174-
}
175-
176-
let mut guard = zlock!(self.free_list);
177-
// The strategy taken is the same for some Unix System V implementations -- as described in the
178-
// famous Bach's book -- in essence keep an ordered list of free slot and always look for the
179-
// biggest as that will give the biggest left-over.
180-
match guard.pop() {
181-
Some(mut chunk) if chunk.size >= required_len => {
182-
// NOTE: don't loose any chunks here, as it will lead to memory leak
183-
tracing::trace!("Allocator selected Chunk ({:?})", &chunk);
184-
185-
if chunk.size > required_len {
186-
let free_chunk = Chunk {
187-
offset: chunk.offset + required_len.get() as ChunkID,
188-
// SAFETY: this is safe because we always operate on a leftover, which is checked above!
189-
size: unsafe {
190-
NonZeroUsize::new_unchecked(chunk.size.get() - required_len.get())
191-
},
192-
};
193-
tracing::trace!("The allocation will leave a Free Chunk: {:?}", &free_chunk);
194-
guard.push(free_chunk);
195-
chunk.size = required_len;
196-
}
197-
198-
self.available
199-
.fetch_sub(chunk.size.get(), Ordering::Relaxed);
200-
201-
let descriptor =
202-
ChunkDescriptor::new(self.segment.segment.id(), chunk.offset, chunk.size);
203-
204-
let data = PtrInSegment::new(
205-
unsafe { self.segment.segment.elem_mut(chunk.offset) },
206-
self.segment.clone(),
207-
);
208-
209-
Ok(AllocatedChunk { descriptor, data })
210-
}
211-
Some(c) => {
212-
tracing::trace!("PosixShmProviderBackend::alloc({:?}) cannot find any big enough chunk\nShmManager::free_list = {:?}", layout, self.free_list);
213-
guard.push(c);
214-
Err(ZAllocError::NeedDefragment)
215-
}
216-
None => {
217-
// NOTE: that should never happen! If this happens - there is a critical bug somewhere around!
218-
let err = format!("PosixShmProviderBackend::alloc({:?}) cannot find any available chunk\nShmManager::free_list = {:?}", layout, self.free_list);
219-
#[cfg(feature = "test")]
220-
panic!("{err}");
221-
#[cfg(not(feature = "test"))]
222-
{
223-
tracing::error!("{err}");
224-
Err(ZAllocError::OutOfMemory)
225-
}
226-
}
227-
}
228-
}
229-
230-
fn free(&self, chunk: &ChunkDescriptor) {
231-
let free_chunk = Chunk {
232-
offset: chunk.chunk,
233-
size: chunk.len,
234-
};
235-
self.available
236-
.fetch_add(free_chunk.size.get(), Ordering::Relaxed);
237-
zlock!(self.free_list).push(free_chunk);
238-
}
239-
240-
fn defragment(&self) -> usize {
241-
fn try_merge_adjacent_chunks(a: &Chunk, b: &Chunk) -> Option<Chunk> {
242-
let end_offset = a.offset as usize + a.size.get();
243-
if end_offset == b.offset as usize {
244-
Some(Chunk {
245-
// SAFETY: this is safe because we operate on non-zero sizes and it will never overflow
246-
size: unsafe { NonZeroUsize::new_unchecked(a.size.get() + b.size.get()) },
247-
offset: a.offset,
248-
})
249-
} else {
250-
None
251-
}
252-
}
253-
254-
let mut largest = 0usize;
255-
256-
// TODO: optimize this!
257-
// this is an old legacy algo for merging adjacent chunks
258-
// we extract chunks to separate container, sort them by offset and then check each chunk for
259-
// adjacence with neighbour. Adjacent chunks are joined and returned back to temporary container.
260-
// If chunk is not adjacent with it's neighbour, it is placed back to self.free_list
261-
let mut guard = zlock!(self.free_list);
262-
if guard.len() > 1 {
263-
let mut fbs: Vec<Chunk> = guard.drain().collect();
264-
fbs.sort_by(|x, y| x.offset.cmp(&y.offset));
265-
let mut current = fbs.remove(0);
266-
let mut i = 0;
267-
let n = fbs.len();
268-
for chunk in fbs.iter() {
269-
i += 1;
270-
let next = *chunk;
271-
match try_merge_adjacent_chunks(&current, &next) {
272-
Some(c) => {
273-
current = c;
274-
largest = largest.max(current.size.get());
275-
if i == n {
276-
guard.push(current)
277-
}
278-
}
279-
None => {
280-
guard.push(current);
281-
if i == n {
282-
guard.push(next);
283-
} else {
284-
current = next;
285-
}
286-
}
287-
}
288-
}
289-
}
290-
largest
291-
}
292-
293-
fn available(&self) -> usize {
294-
self.available.load(Ordering::Relaxed)
295-
}
296-
297-
fn layout_for(&self, layout: MemoryLayout) -> Result<MemoryLayout, ZLayoutError> {
298-
layout.extend(self.alignment)
299-
}
300-
}
21+
pub type PosixShmProviderBackend = PosixShmProviderBackendTalc;

0 commit comments

Comments
 (0)