Skip to content

Commit

Permalink
async-llvm(4): Move work coordination to separate thread in order to …
Browse files Browse the repository at this point in the history
…free up the main thread for translation.
  • Loading branch information
michaelwoerister committed Jul 31, 2017
1 parent bac57cf commit df6be33
Showing 1 changed file with 61 additions and 63 deletions.
124 changes: 61 additions & 63 deletions src/librustc_trans/back/write.rs
Expand Up @@ -28,7 +28,6 @@ use syntax::ext::hygiene::Mark;
use syntax_pos::MultiSpan;
use context::{is_pie_binary, get_reloc_model};
use jobserver::{Client, Acquired};
use crossbeam::{scope, Scope};
use rustc_demangle;

use std::cmp;
Expand All @@ -38,8 +37,10 @@ use std::io;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::str;
use std::sync::Arc;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::slice;
use std::thread;
use libc::{c_uint, c_void, c_char, size_t};

pub const RELOC_MODEL_ARGS : [(&'static str, llvm::RelocMode); 7] = [
Expand Down Expand Up @@ -283,13 +284,13 @@ impl ModuleConfig {

/// Additional resources used by optimize_and_codegen (not module specific)
#[derive(Clone)]
pub struct CodegenContext<'a> {
pub struct CodegenContext {
// Resouces needed when running LTO
pub time_passes: bool,
pub lto: bool,
pub no_landing_pads: bool,
pub exported_symbols: &'a ExportedSymbols,
pub opts: &'a config::Options,
pub exported_symbols: Arc<ExportedSymbols>,
pub opts: Arc<config::Options>,
pub crate_types: Vec<config::CrateType>,
pub each_linked_rlib_for_lto: Vec<(CrateNum, PathBuf)>,
// Handler to use for diagnostics produced during codegen.
Expand All @@ -307,18 +308,18 @@ pub struct CodegenContext<'a> {
pub coordinator_send: Sender<Message>,
}

impl<'a> CodegenContext<'a> {
impl CodegenContext {
fn create_diag_handler(&self) -> Handler {
Handler::with_emitter(true, false, Box::new(self.diag_emitter.clone()))
}
}

struct HandlerFreeVars<'a> {
cgcx: &'a CodegenContext<'a>,
cgcx: &'a CodegenContext,
diag_handler: &'a Handler,
}

unsafe extern "C" fn report_inline_asm<'a, 'b>(cgcx: &'a CodegenContext<'a>,
unsafe extern "C" fn report_inline_asm<'a, 'b>(cgcx: &'a CodegenContext,
msg: &'b str,
cookie: c_uint) {
cgcx.diag_emitter.inline_asm_error(cookie as u32, msg.to_string());
Expand Down Expand Up @@ -775,9 +776,8 @@ pub fn run_passes(sess: &Session,
let num_workers = cmp::min(work_items.len() - 1, 32);
Client::new(num_workers).expect("failed to create jobserver")
});
scope(|scope| {
execute_work(sess, work_items, client, &trans.exported_symbols, scope);
});

execute_work(sess, work_items, client, trans.exported_symbols.clone());

// If in incr. comp. mode, preserve the `.o` files for potential re-use
for mtrans in modules.iter() {
Expand Down Expand Up @@ -1052,11 +1052,10 @@ pub struct Diagnostic {
lvl: Level,
}

fn execute_work<'a>(sess: &'a Session,
mut work_items: Vec<WorkItem>,
jobserver: Client,
exported_symbols: &'a ExportedSymbols,
scope: &Scope<'a>) {
fn execute_work(sess: &Session,
mut work_items: Vec<WorkItem>,
jobserver: Client,
exported_symbols: Arc<ExportedSymbols>) {
let (tx, rx) = channel();
let tx2 = tx.clone();

Expand Down Expand Up @@ -1092,7 +1091,7 @@ fn execute_work<'a>(sess: &'a Session,
each_linked_rlib_for_lto: each_linked_rlib_for_lto,
lto: sess.lto(),
no_landing_pads: sess.no_landing_pads(),
opts: &sess.opts,
opts: Arc::new(sess.opts.clone()),
time_passes: sess.time_passes(),
exported_symbols: exported_symbols,
plugin_passes: sess.plugin_llvm_passes.borrow().clone(),
Expand Down Expand Up @@ -1158,68 +1157,67 @@ fn execute_work<'a>(sess: &'a Session,
// Before that work finishes, however, we may acquire a token. In that case
// we actually wastefully acquired the token, so we relinquish it back to
// the jobserver.
let mut tokens = Vec::new();
let mut running = 0;
while work_items.len() > 0 || running > 0 {

// Spin up what work we can, only doing this while we've got available
// parallelism slots and work left to spawn.
while work_items.len() > 0 && running < tokens.len() + 1 {
let item = work_items.pop().unwrap();
let worker_index = work_items.len();

let cgcx = CodegenContext {
worker: worker_index,
.. cgcx.clone()
};

spawn_work(cgcx,
scope,
item);
running += 1;
}
thread::spawn(move || {
let mut tokens = Vec::new();
let mut running = 0;
while work_items.len() > 0 || running > 0 {

// Relinquish accidentally acquired extra tokens
tokens.truncate(running.saturating_sub(1));
// Spin up what work we can, only doing this while we've got available
// parallelism slots and work left to spawn.
while work_items.len() > 0 && running < tokens.len() + 1 {
let item = work_items.pop().unwrap();
let worker_index = work_items.len();

match rx.recv().unwrap() {
// Save the token locally and the next turn of the loop will use
// this to spawn a new unit of work, or it may get dropped
// immediately if we have no more work to spawn.
Message::Token(token) => {
tokens.push(token.expect("failed to acquire jobserver token"));
}
let cgcx = CodegenContext {
worker: worker_index,
.. cgcx.clone()
};

// If a thread exits successfully then we drop a token associated
// with that worker and update our `running` count. We may later
// re-acquire a token to continue running more work. We may also not
// actually drop a token here if the worker was running with an
// "ephemeral token"
//
// Note that if the thread failed that means it panicked, so we
// abort immediately.
Message::Done { success: true } => {
drop(tokens.pop());
running -= 1;
spawn_work(cgcx, item);
running += 1;
}
Message::Done { success: false } => {
shared_emitter.fatal("aborting due to worker thread panic".to_string());

// Relinquish accidentally acquired extra tokens
tokens.truncate(running.saturating_sub(1));

match rx.recv().unwrap() {
// Save the token locally and the next turn of the loop will use
// this to spawn a new unit of work, or it may get dropped
// immediately if we have no more work to spawn.
Message::Token(token) => {
tokens.push(token.expect("failed to acquire jobserver token"));
}

// If a thread exits successfully then we drop a token associated
// with that worker and update our `running` count. We may later
// re-acquire a token to continue running more work. We may also not
// actually drop a token here if the worker was running with an
// "ephemeral token"
//
// Note that if the thread failed that means it panicked, so we
// abort immediately.
Message::Done { success: true } => {
drop(tokens.pop());
running -= 1;
}
Message::Done { success: false } => {
shared_emitter.fatal("aborting due to worker thread panic".to_string());
}
}
}
}).join().unwrap();

shared_emitter_main.check(sess);
}
shared_emitter_main.check(sess);

// Just in case, check this on the way out.
sess.diagnostic().abort_if_errors();
}

fn spawn_work<'a>(cgcx: CodegenContext<'a>,
scope: &Scope<'a>,
work: WorkItem) {
fn spawn_work(cgcx: CodegenContext, work: WorkItem) {
let depth = time_depth();

scope.spawn(move || {
thread::spawn(move || {
set_time_depth(depth);

// Set up a destructor which will fire off a message that we're done as
Expand Down

0 comments on commit df6be33

Please sign in to comment.