Skip to content

Commit

Permalink
Add documentation, slighly reorganize collector code
Browse files Browse the repository at this point in the history
Should make the collector a little easier to follow
  • Loading branch information
Others committed Jun 15, 2020
1 parent d5a61b0 commit 8a2003e
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 20 deletions.
2 changes: 2 additions & 0 deletions src/collector/alloc.rs
Expand Up @@ -6,12 +6,14 @@ use std::ptr;
use crate::collector::InternalGcRef;
use crate::{Finalize, Scan, Scanner};

/// Represents a piece of data allocated by shredder
#[derive(Copy, Clone, Debug, Hash)]
pub struct GcAllocation {
scan_ptr: *const dyn Scan,
deallocation_action: DeallocationAction,
}

/// What additional action should we run before deallocating?
#[derive(Copy, Clone, Debug, Hash)]
pub enum DeallocationAction {
DoNothing,
Expand Down
66 changes: 50 additions & 16 deletions src/collector/mod.rs
Expand Up @@ -18,10 +18,12 @@ use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator};

use crate::collector::alloc::GcAllocation;
use crate::collector::dropper::{BackgroundDropper, DropMessage};
use crate::collector::trigger::Trigger;
use crate::collector::trigger::GcTrigger;
use crate::lockout::{ExclusiveWarrant, Lockout, Warrant};
use crate::{Finalize, Scan};

/// Intermediate struct. `Gc<T>` holds a `InternalGcRef`, which references a `GcHandle`
/// There should be one `GcHandle` per `Gc<T>`
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct InternalGcRef {
handle_ref: Arc<GcHandle>,
Expand All @@ -37,26 +39,43 @@ impl InternalGcRef {
}

pub struct Collector {
/// just a monotonic counter. used to assign unique ids
monotonic_counter: AtomicU64,
/// shredder only allows one collection to proceed at a time
gc_lock: Mutex<()>,
trigger: Trigger,
/// trigger decides when we should run a collection
trigger: GcTrigger,
/// dropping happens in a background thread. This struct lets us communicate with that thread
dropper: BackgroundDropper,
/// we run automatic gc in a background thread
/// sending to this channel indicates that thread should check the trigger, then collect if the
/// trigger indicates it should
async_gc_notifier: Sender<()>,
/// all the data we are managing plus metadata about what `Gc<T>`s exist
tracked_data: TrackedData,
}

/// Stores metadata about each piece of tracked data, plus metadata about each handle
#[derive(Debug)]
struct TrackedData {
// TODO: Could we reuse the monotonic counter?
/// we increment this whenever we collect
current_collection_number: AtomicU64,
/// a set storing metadata on the live data the collector is managing
data: DashMap<Arc<GcData>, ()>,
/// a set storing metadata on each live handle (`Gc<T>`) the collector is managing
handles: DashMap<Arc<GcHandle>, ()>,
}

/// Represents a piece of data tracked by the collector
#[derive(Debug)]
pub(crate) struct GcData {
unique_id: u64,
/// a wrapper to manage (ie deallocate) the underlying allocation
underlying_allocation: GcAllocation,
/// lockout to prevent scanning the underlying data while it may be changing
lockout: Arc<Lockout>,
/// have we started deallocating this piece of data yet?
deallocated: AtomicBool,
// During what collection was this last marked?
// 0 if this is a new piece of data
Expand Down Expand Up @@ -89,10 +108,13 @@ impl PartialOrd for GcData {
}
}

/// There is one `GcHandle` per `Gc<T>`. We need this metadata for collection
#[derive(Debug)]
pub(crate) struct GcHandle {
unique_id: u64,
/// what data is backing this handle
underlying_data: Arc<GcData>,
/// lockout to prevent scanning the underlying data while it may be changing
lockout: Arc<Lockout>,
// During what collection was this last found in a piece of GcData?
// 0 if this is a new piece of data
Expand Down Expand Up @@ -134,7 +156,7 @@ impl Collector {
let res = Arc::new(Self {
monotonic_counter: AtomicU64::new(1),
gc_lock: Mutex::default(),
trigger: Trigger::default(),
trigger: GcTrigger::default(),
dropper: BackgroundDropper::new(),
async_gc_notifier,
tracked_data: TrackedData {
Expand Down Expand Up @@ -288,6 +310,9 @@ impl Collector {
}

pub fn synchronize_destructors(&self) {
// We send a channel to the drop thread and wait for it to respond
// This has the effect of synchronizing this thread with the drop thread

let (sender, receiver) = crossbeam::bounded(1);
let drop_msg = DropMessage::SyncUp(sender);
{
Expand Down Expand Up @@ -349,10 +374,13 @@ impl Collector {

// eprintln!("tracked data {:?}", tracked_data);
// eprintln!("tracked handles {:?}", tracked_handles);

// In this step we calculate what's not rooted by marking all data definitively in a Gc
self.tracked_data.data.iter().par_bridge().for_each(|ele| {
let data = ele.key();

// If data.last_marked == 0, then it is new data. Update that we've seen this data
// (this step helps synchronize what data is valid to be deallocated)
if data.last_marked.load(Ordering::SeqCst) == 0 {
data.last_marked
.store(current_collection - 1, Ordering::SeqCst);
Expand All @@ -375,6 +403,7 @@ impl Collector {
}
});

// The handles that were not just marked need to be treated as roots
let mut roots = Vec::new();
for ele in self.tracked_data.handles.iter() {
let handle = ele.key();
Expand All @@ -386,6 +415,8 @@ impl Collector {

// eprintln!("roots {:?}", roots);

// This step is dfs through the object graph (starting with the roots)
// We mark each object we find
let dfs_stack = DynQueue::new(roots);
dfs_stack.into_par_iter().for_each(|(queue, handle)| {
let data = &handle.underlying_data;
Expand Down Expand Up @@ -422,6 +453,7 @@ impl Collector {
// Now cleanup by removing all the data that is done for
par_retain(&self.tracked_data.data, |data, _| {
// Mark the new data as in use for now
// This stops us deallocating data that was allocated during collection
if data.last_marked.load(Ordering::SeqCst) == 0 {
data.last_marked.store(current_collection, Ordering::SeqCst);
}
Expand All @@ -447,9 +479,11 @@ impl Collector {
}
});

// update the trigger based on the new baseline
self.trigger
.set_data_count_after_collection(self.tracked_data_count());

// update collection number
self.tracked_data
.current_collection_number
.fetch_add(1, Ordering::SeqCst);
Expand All @@ -462,6 +496,19 @@ impl Collector {

pub static COLLECTOR: Lazy<Arc<Collector>> = Lazy::new(Collector::new);

// Helper function! Lives here because it has nowhere else to go ;-;
fn par_retain<K, V, F: Fn(&K, &V) -> bool>(map: &DashMap<K, V>, retain_fn: F)
where
K: Eq + Hash + Send + Sync,
V: Send + Sync,
F: Send + Sync,
{
map.shards()
.iter()
.par_bridge()
.for_each(|s| s.write().retain(|k, v| retain_fn(k, v.get())));
}

#[cfg(test)]
pub(crate) fn get_mock_handle() -> InternalGcRef {
use crate::{GcSafe, Scanner};
Expand Down Expand Up @@ -490,16 +537,3 @@ pub(crate) fn get_mock_handle() -> InternalGcRef {
last_non_rooted: AtomicU64::new(0),
}))
}

// Helper function! Lives here because it has nowhere else to go ;-;
fn par_retain<K, V, F: Fn(&K, &V) -> bool>(map: &DashMap<K, V>, retain_fn: F)
where
K: Eq + Hash + Send + Sync,
V: Send + Sync,
F: Send + Sync,
{
map.shards()
.iter()
.par_bridge()
.for_each(|s| s.write().retain(|k, v| retain_fn(k, v.get())));
}
9 changes: 5 additions & 4 deletions src/collector/trigger.rs
Expand Up @@ -5,7 +5,8 @@ const DEFAULT_ALLOCATION_TRIGGER_PERCENT: f32 = 0.75;
const DEFAULT_HANDLE_DEFICIT_TRIGGER_PERCENT: f32 = 0.9;
const MIN_ALLOCATIONS_FOR_COLLECTION: f32 = 512.0 * 1.3;

pub struct Trigger {
/// Deals with deciding when we need to run a collection
pub struct GcTrigger {
data: Mutex<InternalTriggerData>,
}

Expand All @@ -17,7 +18,7 @@ struct InternalTriggerData {
data_count_at_last_collection: usize,
}

impl Trigger {
impl GcTrigger {
pub fn set_trigger_percent(&self, p: f32) {
self.data.lock().allocations_trigger_percent = p;
}
Expand Down Expand Up @@ -56,9 +57,9 @@ impl Trigger {
}
}

impl Default for Trigger {
impl Default for GcTrigger {
fn default() -> Self {
Trigger {
GcTrigger {
data: Mutex::new(InternalTriggerData {
allocations_trigger_percent: DEFAULT_ALLOCATION_TRIGGER_PERCENT,
handle_deficit_trigger_percent: DEFAULT_HANDLE_DEFICIT_TRIGGER_PERCENT,
Expand Down
1 change: 1 addition & 0 deletions src/scan/mod.rs
Expand Up @@ -71,6 +71,7 @@ pub use r::{RMut, R};
/// ```
///
/// In emergencies, you can break out `#[shredder(unsafe_skip)]`, but this is potentially unsafe
/// (the field you're skipping MUST uphold invariants as-if it was `GcSafe`)
/// ```
/// use std::sync::Arc;
/// use shredder::{Scan, GcSafeWrapper};
Expand Down

0 comments on commit 8a2003e

Please sign in to comment.