Skip to content

Commit

Permalink
Reduce the amount of unsafe code and mark handle_deadlock as unsafe
Browse files Browse the repository at this point in the history
  • Loading branch information
Zoxc committed Jun 6, 2018
1 parent 3e83248 commit 131ef97
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 52 deletions.
98 changes: 49 additions & 49 deletions src/librustc/ty/maps/job.rs
Expand Up @@ -151,6 +151,10 @@ impl<'tcx> QueryJob<'tcx> {
#[cfg(parallel_queries)]
self.latch.set();
}

fn as_ptr(&self) -> *const QueryJob<'tcx> {
self as *const _
}
}

#[cfg(parallel_queries)]
Expand Down Expand Up @@ -233,13 +237,9 @@ impl<'tcx> QueryLatch<'tcx> {
}
}

/// A pointer to an active query job. This is used to give query jobs an identity.
#[cfg(parallel_queries)]
type Ref<'tcx> = *const QueryJob<'tcx>;

/// A resumable waiter of a query. The usize is the index into waiters in the query's latch
#[cfg(parallel_queries)]
type Waiter<'tcx> = (Ref<'tcx>, usize);
type Waiter<'tcx> = (Lrc<QueryJob<'tcx>>, usize);

/// Visits all the non-resumable and resumable waiters of a query.
/// Only waiters in a query are visited.
Expand All @@ -251,25 +251,23 @@ type Waiter<'tcx> = (Ref<'tcx>, usize);
/// required information to resume the waiter.
/// If all `visit` calls returns None, this function also returns None.
#[cfg(parallel_queries)]
fn visit_waiters<'tcx, F>(query_ref: Ref<'tcx>, mut visit: F) -> Option<Option<Waiter<'tcx>>>
fn visit_waiters<'tcx, F>(query: Lrc<QueryJob<'tcx>>, mut visit: F) -> Option<Option<Waiter<'tcx>>>
where
F: FnMut(Span, Ref<'tcx>) -> Option<Option<Waiter<'tcx>>>
F: FnMut(Span, Lrc<QueryJob<'tcx>>) -> Option<Option<Waiter<'tcx>>>
{
let query = unsafe { &*query_ref };

// Visit the parent query which is a non-resumable waiter since it's on the same stack
if let Some(ref parent) = query.parent {
if let Some(cycle) = visit(query.info.span, &**parent as Ref) {
if let Some(cycle) = visit(query.info.span, parent.clone()) {
return Some(cycle);
}
}

// Visit the explict waiters which use condvars and are resumable
for (i, waiter) in query.latch.info.lock().waiters.iter().enumerate() {
if let Some(ref waiter_query) = waiter.query {
if visit(waiter.span, &**waiter_query).is_some() {
if visit(waiter.span, waiter_query.clone()).is_some() {
// Return a value which indicates that this waiter can be resumed
return Some(Some((query_ref, i)));
return Some(Some((query.clone(), i)));
}
}
}
Expand All @@ -281,12 +279,13 @@ where
/// If a cycle is detected, this initial value is replaced with the span causing
/// the cycle.
#[cfg(parallel_queries)]
fn cycle_check<'tcx>(query: Ref<'tcx>,
fn cycle_check<'tcx>(query: Lrc<QueryJob<'tcx>>,
span: Span,
stack: &mut Vec<(Span, Ref<'tcx>)>,
visited: &mut HashSet<Ref<'tcx>>) -> Option<Option<Waiter<'tcx>>> {
if visited.contains(&query) {
return if let Some(p) = stack.iter().position(|q| q.1 == query) {
stack: &mut Vec<(Span, Lrc<QueryJob<'tcx>>)>,
visited: &mut HashSet<*const QueryJob<'tcx>>
) -> Option<Option<Waiter<'tcx>>> {
if visited.contains(&query.as_ptr()) {
return if let Some(p) = stack.iter().position(|q| q.1.as_ptr() == query.as_ptr()) {
// We detected a query cycle, fix up the initial span and return Some

// Remove previous stack entries
Expand All @@ -300,8 +299,8 @@ fn cycle_check<'tcx>(query: Ref<'tcx>,
}

// Mark this query is visited and add it to the stack
visited.insert(query);
stack.push((span, query));
visited.insert(query.as_ptr());
stack.push((span, query.clone()));

// Visit all the waiters
let r = visit_waiters(query, |span, successor| {
Expand All @@ -320,18 +319,21 @@ fn cycle_check<'tcx>(query: Ref<'tcx>,
/// from `query` without going through any of the queries in `visited`.
/// This is achieved with a depth first search.
#[cfg(parallel_queries)]
fn connected_to_root<'tcx>(query: Ref<'tcx>, visited: &mut HashSet<Ref<'tcx>>) -> bool {
fn connected_to_root<'tcx>(
query: Lrc<QueryJob<'tcx>>,
visited: &mut HashSet<*const QueryJob<'tcx>>
) -> bool {
// We already visited this or we're deliberately ignoring it
if visited.contains(&query) {
if visited.contains(&query.as_ptr()) {
return false;
}

// This query is connected to the root (it has no query parent), return true
if unsafe { (*query).parent.is_none() } {
if query.parent.is_none() {
return true;
}

visited.insert(query);
visited.insert(query.as_ptr());

let mut connected = false;

Expand All @@ -351,7 +353,7 @@ fn connected_to_root<'tcx>(query: Ref<'tcx>, visited: &mut HashSet<Ref<'tcx>>) -
/// the function returns false.
#[cfg(parallel_queries)]
fn remove_cycle<'tcx>(
jobs: &mut Vec<Ref<'tcx>>,
jobs: &mut Vec<Lrc<QueryJob<'tcx>>>,
wakelist: &mut Vec<Lrc<QueryWaiter<'tcx>>>,
tcx: TyCtxt<'_, 'tcx, '_>
) -> bool {
Expand All @@ -367,7 +369,7 @@ fn remove_cycle<'tcx>(

// Extract the spans and queries into separate arrays
let mut spans: Vec<_> = stack.iter().map(|e| e.0).collect();
let queries = stack.iter().map(|e| e.1);
let queries = stack.into_iter().map(|e| e.1);

// Shift the spans so that queries are matched with the span for their waitee
let last = spans.pop().unwrap();
Expand All @@ -378,23 +380,25 @@ fn remove_cycle<'tcx>(

// Remove the queries in our cycle from the list of jobs to look at
for r in &stack {
jobs.remove_item(&r.1);
if let Some(pos) = jobs.iter().position(|j| j.as_ptr() == r.1.as_ptr()) {
jobs.remove(pos);
}
}

// Find the queries in the cycle which are
// connected to queries outside the cycle
let entry_points: Vec<Ref<'_>> = stack.iter().filter_map(|query| {
let entry_points: Vec<Lrc<QueryJob<'tcx>>> = stack.iter().filter_map(|query| {
// Mark all the other queries in the cycle as already visited
let mut visited = HashSet::from_iter(stack.iter().filter_map(|q| {
if q.1 != query.1 {
Some(q.1)
if q.1.as_ptr() != query.1.as_ptr() {
Some(q.1.as_ptr())
} else {
None
}
}));

if connected_to_root(query.1, &mut visited) {
Some(query.1)
if connected_to_root(query.1.clone(), &mut visited) {
Some(query.1.clone())
} else {
None
}
Expand All @@ -403,39 +407,36 @@ fn remove_cycle<'tcx>(
// Deterministically pick an entry point
// FIXME: Sort this instead
let mut hcx = tcx.create_stable_hashing_context();
let entry_point = *entry_points.iter().min_by_key(|&&q| {
let entry_point = entry_points.iter().min_by_key(|q| {
let mut stable_hasher = StableHasher::<u64>::new();
unsafe { (*q).info.query.hash_stable(&mut hcx, &mut stable_hasher); }
q.info.query.hash_stable(&mut hcx, &mut stable_hasher);
stable_hasher.finish()
}).unwrap();
}).unwrap().as_ptr();

// Shift the stack until our entry point is first
while stack[0].1 != entry_point {
while stack[0].1.as_ptr() != entry_point {
let last = stack.pop().unwrap();
stack.insert(0, last);
}

// Create the cycle error
let mut error = CycleError {
usage: None,
cycle: stack.iter().map(|&(s, q)| QueryInfo {
cycle: stack.iter().map(|&(s, ref q)| QueryInfo {
span: s,
query: unsafe { (*q).info.query.clone() },
query: q.info.query.clone(),
} ).collect(),
};

// We unwrap `waiter` here since there must always be one
// edge which is resumeable / waited using a query latch
let (waitee_query, waiter_idx) = waiter.unwrap();
let waitee_query = unsafe { &*waitee_query };

// Extract the waiter we want to resume
let waiter = waitee_query.latch.extract_waiter(waiter_idx);

// Set the cycle error so it will be picked up when resumed
unsafe {
*waiter.cycle.lock() = Some(error);
}
*waiter.cycle.lock() = Some(error);

// Put the waiter on the list of things to resume
wakelist.push(waiter);
Expand All @@ -448,8 +449,9 @@ fn remove_cycle<'tcx>(

/// Creates a new thread and forwards information in thread locals to it.
/// The new thread runs the deadlock handler.
/// Must only be called when a deadlock is about to happen.
#[cfg(parallel_queries)]
pub fn handle_deadlock() {
pub unsafe fn handle_deadlock() {
use syntax;
use syntax_pos;

Expand All @@ -458,25 +460,23 @@ pub fn handle_deadlock() {
let gcx_ptr = tls::GCX_PTR.with(|gcx_ptr| {
gcx_ptr as *const _
});
let gcx_ptr = unsafe { &*gcx_ptr };
let gcx_ptr = &*gcx_ptr;

let syntax_globals = syntax::GLOBALS.with(|syntax_globals| {
syntax_globals as *const _
});
let syntax_globals = unsafe { &*syntax_globals };
let syntax_globals = &*syntax_globals;

let syntax_pos_globals = syntax_pos::GLOBALS.with(|syntax_pos_globals| {
syntax_pos_globals as *const _
});
let syntax_pos_globals = unsafe { &*syntax_pos_globals };
let syntax_pos_globals = &*syntax_pos_globals;
thread::spawn(move || {
tls::GCX_PTR.set(gcx_ptr, || {
syntax_pos::GLOBALS.set(syntax_pos_globals, || {
syntax_pos::GLOBALS.set(syntax_pos_globals, || {
tls::with_thread_locals(|| {
unsafe {
tls::with_global(|tcx| deadlock(tcx, &registry))
}
tls::with_global(|tcx| deadlock(tcx, &registry))
})
})
})
Expand All @@ -497,7 +497,7 @@ fn deadlock(tcx: TyCtxt<'_, '_, '_>, registry: &rayon_core::Registry) {
});

let mut wakelist = Vec::new();
let mut jobs: Vec<_> = tcx.maps.collect_active_jobs().iter().map(|j| &**j as Ref).collect();
let mut jobs: Vec<_> = tcx.maps.collect_active_jobs();

let mut found_cycle = false;

Expand Down
7 changes: 4 additions & 3 deletions src/librustc_driver/driver.rs
Expand Up @@ -85,9 +85,10 @@ pub fn spawn_thread_pool<F: FnOnce(config::Options) -> R + sync::Send, R: sync::

let gcx_ptr = &Lock::new(0);

let config = ThreadPoolBuilder::new().num_threads(Session::query_threads_from_opts(&opts))
.deadlock_handler(ty::maps::handle_deadlock)
.stack_size(16 * 1024 * 1024);
let config = ThreadPoolBuilder::new()
.num_threads(Session::query_threads_from_opts(&opts))
.deadlock_handler(|| unsafe { ty::maps::handle_deadlock() })
.stack_size(16 * 1024 * 1024);

let with_pool = move |pool: &ThreadPool| {
pool.install(move || f(opts))
Expand Down

0 comments on commit 131ef97

Please sign in to comment.