Skip to content

Commit

Permalink
Add support for tracking thread causality.
Browse files Browse the repository at this point in the history
This change adds support for tracking causality among thread operations.
We do this in the standard way, by associating a vector clock with each thread.
The i'th element of a thread's vector clock denotes its knowledge of the clock
of thread i. Clocks are partially ordered using a pointwise ordering <.
The main property we want is that for any pair of events p, q:
    (p causally precedes q)  iff  (clock at p  <  clock at q).

We update the code for thread spawn and join, as well as the various synchronization
objects (Atomics, Barriers, CondVars, Mutexes, RwLocks and mpsc channels) to track
causality by updating vector clocks appropriately.

This change does not currently properly track causality for async interactions;
those will be done in a subsequent PR.
  • Loading branch information
Rajeev Joshi authored and Rajeev Joshi committed Jun 29, 2021
1 parent 74fb737 commit 544f247
Show file tree
Hide file tree
Showing 16 changed files with 1,037 additions and 67 deletions.
53 changes: 48 additions & 5 deletions src/runtime/execution.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::runtime::failure::persist_failure;
use crate::runtime::task::{Task, TaskId, TaskState, DEFAULT_INLINE_TASKS};
use crate::runtime::task::{Task, TaskId, TaskState, VectorClock, DEFAULT_INLINE_TASKS};
use crate::scheduler::{Schedule, Scheduler};
use crate::{Config, MaxSteps};
use futures::Future;
Expand Down Expand Up @@ -57,7 +57,7 @@ impl Execution {

EXECUTION_STATE.set(&state, move || {
// Spawn `f` as the first task
ExecutionState::spawn_thread(f, config.stack_size, None);
ExecutionState::spawn_thread(f, config.stack_size, None, Some(VectorClock::new()));

// Run the test to completion
while self.step(config) {}
Expand Down Expand Up @@ -234,19 +234,33 @@ impl ExecutionState {
{
Self::with(|state| {
let task_id = TaskId(state.tasks.len());
let task = Task::from_future(future, stack_size, task_id, name);
let clock = state.increment_clock_mut(); // Increment the parent's clock
clock.extend(task_id); // and extend it with an entry for the new task
let task = Task::from_future(future, stack_size, task_id, name, clock.clone());
state.tasks.push(task);
task_id
})
}

pub(crate) fn spawn_thread<F>(f: F, stack_size: usize, name: Option<String>) -> TaskId
pub(crate) fn spawn_thread<F>(
f: F,
stack_size: usize,
name: Option<String>,
mut initial_clock: Option<VectorClock>,
) -> TaskId
where
F: FnOnce() + Send + 'static,
{
Self::with(|state| {
let task_id = TaskId(state.tasks.len());
let task = Task::from_closure(f, stack_size, task_id, name);
let clock = if let Some(ref mut clock) = initial_clock {
clock
} else {
// Inherit the clock of the parent thread (which spawned this task)
state.increment_clock_mut()
};
clock.extend(task_id); // and extend it with an entry for the new thread
let task = Task::from_closure(f, stack_size, task_id, name, clock.clone());
state.tasks.push(task);
task_id
})
Expand Down Expand Up @@ -353,6 +367,35 @@ impl ExecutionState {
Self::with(|state| state.context_switches)
}

pub(crate) fn get_clock(&self, id: TaskId) -> &VectorClock {
&self.tasks.get(id.0).unwrap().clock
}

pub(crate) fn get_clock_mut(&mut self, id: TaskId) -> &mut VectorClock {
&mut self.tasks.get_mut(id.0).unwrap().clock
}

// Increment the current thread's clock entry and update its clock with the one provided.
pub(crate) fn update_clock(&mut self, clock: &VectorClock) {
let task = self.current_mut();
task.clock.increment(task.id);
task.clock.update(clock);
}

// Increment the current thread's clock and return a shared reference to it
pub(crate) fn increment_clock(&mut self) -> &VectorClock {
let task = self.current_mut();
task.clock.increment(task.id);
&task.clock
}

// Increment the current thread's clock and return a mutable reference to it
pub(crate) fn increment_clock_mut(&mut self) -> &mut VectorClock {
let task = self.current_mut();
task.clock.increment(task.id);
&mut task.clock
}

/// Run the scheduler to choose the next task to run. `has_yielded` should be false if the
/// scheduler is being invoked from within a running task, in which case `schedule` should not
/// panic.
Expand Down
173 changes: 169 additions & 4 deletions src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ use crate::runtime::thread::continuation::{ContinuationPool, PooledContinuation}
use bitvec::prelude::*;
use bitvec::vec::BitVec;
use futures::{task::Waker, Future};
use smallvec::SmallVec;
use std::cell::RefCell;
use std::cmp::{Ordering, PartialOrd};
use std::fmt::Debug;
use std::iter::Iterator;
use std::rc::Rc;
use std::slice::Iter;
use std::task::Context;

pub(crate) mod waker;
Expand Down Expand Up @@ -42,6 +46,8 @@ pub(crate) struct Task {

pub(super) continuation: Rc<RefCell<PooledContinuation>>,

pub(crate) clock: VectorClock,

waiter: Option<TaskId>,

waker: Waker,
Expand All @@ -53,34 +59,49 @@ pub(crate) struct Task {

impl Task {
/// Create a task from a continuation
fn new<F>(f: F, stack_size: usize, id: TaskId, task_type: TaskType, name: Option<String>) -> Self
fn new<F>(
f: F,
stack_size: usize,
id: TaskId,
task_type: TaskType,
name: Option<String>,
clock: VectorClock,
) -> Self
where
F: FnOnce() + Send + 'static,
{
let mut continuation = ContinuationPool::acquire(stack_size);
continuation.initialize(Box::new(f));
let waker = make_waker(id);
let continuation = Rc::new(RefCell::new(continuation));
assert!(id.0 < clock.time.len());
Self {
id,
state: TaskState::Runnable,
task_type,
continuation,
clock,
waiter: None,
waker,
woken_by_self: false,
name,
}
}

pub(crate) fn from_closure<F>(f: F, stack_size: usize, id: TaskId, name: Option<String>) -> Self
pub(crate) fn from_closure<F>(f: F, stack_size: usize, id: TaskId, name: Option<String>, clock: VectorClock) -> Self
where
F: FnOnce() + Send + 'static,
{
Self::new(f, stack_size, id, TaskType::Thread, name)
Self::new(f, stack_size, id, TaskType::Thread, name, clock)
}

pub(crate) fn from_future<F>(future: F, stack_size: usize, id: TaskId, name: Option<String>) -> Self
pub(crate) fn from_future<F>(
future: F,
stack_size: usize,
id: TaskId,
name: Option<String>,
clock: VectorClock,
) -> Self
where
F: Future<Output = ()> + Send + 'static,
{
Expand All @@ -101,6 +122,7 @@ impl Task {
id,
TaskType::Future,
name,
clock,
)
}

Expand Down Expand Up @@ -265,3 +287,146 @@ impl Debug for TaskSet {
write!(f, "}}")
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct VectorClock {
pub(crate) time: SmallVec<[u32; DEFAULT_INLINE_TASKS]>,
}

impl VectorClock {
pub(crate) fn new() -> Self {
Self { time: SmallVec::new() }
}

#[cfg(test)]
pub(crate) fn new_from(v: &[u32]) -> Self {
let time: SmallVec<[u32; DEFAULT_INLINE_TASKS]> = SmallVec::from(v);
Self { time }
}

// Zero extend clock to accommodate `task_id` tasks.
pub(crate) fn extend(&mut self, task_id: TaskId) {
let d = 1 + task_id.0 - self.time.len();
self.time.extend_from_slice(vec![0u32; d].as_slice());
}

pub(crate) fn increment(&mut self, task_id: TaskId) {
assert!(task_id.0 < self.time.len());
self.time[task_id.0] += 1;
}

// Update the clock of `self` with the clock from `other`
pub(crate) fn update(&mut self, other: &Self) {
let n1 = self.time.len();
let n2 = other.time.len();
for i in 0..std::cmp::min(n1, n2) {
self.time[i] = std::cmp::max(self.time[i], other.time[i])
}
for i in n1..n2 {
// could be empty
self.time.push(other.time[i]);
}
}

pub fn get(&self, i: usize) -> u32 {
assert!(i < self.time.len());
self.time[i]
}
}

impl<'a> IntoIterator for &'a VectorClock {
type Item = &'a u32;
type IntoIter = Iter<'a, u32>;

fn into_iter(self) -> Self::IntoIter {
self.time.iter()
}
}

impl From<VectorClock> for SmallVec<[u32; DEFAULT_INLINE_TASKS]> {
fn from(v: VectorClock) -> SmallVec<[u32; DEFAULT_INLINE_TASKS]> {
v.time
}
}

impl<'a> From<&'a VectorClock> for &'a SmallVec<[u32; DEFAULT_INLINE_TASKS]> {
fn from(v: &'a VectorClock) -> &'a SmallVec<[u32; DEFAULT_INLINE_TASKS]> {
&v.time
}
}

fn unify(a: Ordering, b: Ordering) -> Option<Ordering> {
use Ordering::*;

match (a, b) {
(Equal, Equal) => Some(Equal),
(Less, Greater) | (Greater, Less) => None,
(Less, _) | (_, Less) => Some(Less),
(Greater, _) | (_, Greater) => Some(Greater),
}
}

impl PartialOrd for VectorClock {
// Compare vector clocks
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
let n1 = self.time.len();
let n2 = other.time.len();
// if (n1<n2), then other can't have happened before self, similarly for (n1>n2)
let mut ord = n1.cmp(&n2);
for i in 0..std::cmp::min(n1, n2) {
ord = unify(ord, self.time[i].cmp(&other.time[i]))?; // return if incomparable
}
Some(ord)
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn vector_clock() {
let v1 = VectorClock::new_from(&[1, 2, 3, 4]);
let v2 = VectorClock::new_from(&[1, 2, 4, 5]);
let v3 = VectorClock::new_from(&[1, 2, 3, 1]);
let v4 = VectorClock::new_from(&[1, 2, 4, 1]);
let v5 = VectorClock::new_from(&[1, 2, 3, 4]);
assert!(v1 < v2 && v1 > v3 && v1 == v5);
assert!(v2 > v3 && v2 > v4);
assert!(v3 < v4);
assert_eq!(v1.partial_cmp(&v4), None);

let v1 = VectorClock::new_from(&[1, 2, 3, 4]);
let v2 = VectorClock::new_from(&[1, 2, 2]);
let v3 = VectorClock::new_from(&[1, 2, 3]);
let v4 = VectorClock::new_from(&[1, 2, 4]);
assert!(v1 > v2);
assert!(v1 > v3);
assert_eq!(v1.partial_cmp(&v4), None);

let v1 = VectorClock::new_from(&[]);
let v2 = VectorClock::new_from(&[1]);
assert!(v1 < v2);

let v1 = VectorClock::new_from(&[1, 2, 1]);
let v2 = VectorClock::new_from(&[1, 3]);
let v3 = VectorClock::new_from(&[1, 1, 1, 2]);
let v4 = VectorClock::new_from(&[1, 1, 2]);

let mut v = v1.clone();
v.update(&v2);
assert_eq!(v, VectorClock::new_from(&[1, 3, 1]));

let mut v = v1.clone();
v.update(&v3);
assert_eq!(v, VectorClock::new_from(&[1, 2, 1, 2]));

let mut v = v1.clone();
v.update(&v4);
assert_eq!(v, VectorClock::new_from(&[1, 2, 2]));

let mut v = v1.clone();
v.update(&VectorClock::new());
assert_eq!(v, v1);
}
}
Loading

0 comments on commit 544f247

Please sign in to comment.