Skip to content

Commit

Permalink
new parallelization strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-p committed Aug 5, 2020
1 parent f3a64fc commit 0742dc0
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 114 deletions.
1 change: 1 addition & 0 deletions libradicl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ bincode = "1.3.1"
csv = "1.1.3"
indicatif = "0.15.0"
crossbeam-channel = "0.4.3"
crossbeam-queue = "0.2.3"
petgraph = "0.5.1"
executors = "0.7"
bio-types = "0.6.0"
Expand Down
2 changes: 1 addition & 1 deletion libradicl/src/cellfilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ pub fn generate_permit_list(
CellFilterMethod::ExplicitList(valid_bc_file) => {
valid_bc = libradicl::permit_list_from_file(valid_bc_file, ft_vals.bclen);
}
CellFilterMethod::ExpectCells(expected_num_cells) => {
CellFilterMethod::ExpectCells(_expected_num_cells) => {
unimplemented!();
}
}
Expand Down
292 changes: 180 additions & 112 deletions libradicl/src/quant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,26 @@ extern crate needletail;
extern crate petgraph;
extern crate serde;
extern crate slog;
extern crate crossbeam_queue;

use executors::crossbeam_channel_pool;
use executors::*;
//use executors::crossbeam_channel_pool;
//use executors::*;
use sprs::TriMatI;
use std::sync::mpsc::channel;
//use std::sync::mpsc::channel;

use self::indicatif::{ProgressBar, ProgressStyle};
use self::petgraph::prelude::*;
#[allow(unused_imports)]
use self::slog::crit;
use self::slog::info;
use crate as libradicl;
use crossbeam_channel::bounded;
use crossbeam_queue::{ArrayQueue};
//use crossbeam_channel::bounded;
use fasthash::sea;
use needletail::bitkmer::*;
use scroll::Pwrite;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::collections::{HashMap, HashSet};
use std::fs;
use std::fs::File;
Expand Down Expand Up @@ -321,128 +325,182 @@ pub fn quantify(
.progress_chars("╢▌▌░╟"),
);

//let mut eq_map = EqMap::new(hdr.ref_count as u32);

let mut _global_distinct_umis = 0usize;

let n_workers = num_threads as usize;
let pool = crossbeam_channel_pool::ThreadPool::new(n_workers);
// Trying this parallelization strategy to avoid
// many temporary data structures.

// We have a work queue that contains collated chunks
// (i.e. data at the level of each cell). One thread
// populates the queue, and the remaining worker threads
// pop a chunk, perform the quantification, and update the
// output. The updating of the output requires acquiring
// two locks (1) to update the data in the matrix and
// (2) to write to the barcode file. We also have to
// decrement an atomic coutner for the numebr of cells that
// remain to be processed.

// create a thread-safe queue based on the number of worker threads
let n_workers = if num_threads > 1 { (num_threads - 1) as usize } else { 1 };
let q = Arc::new(ArrayQueue::<(usize, u32, u32, Vec<u8>)>::new(4*n_workers));

// the number of cells left to process
let cells_to_process = Arc::new(AtomicUsize::new(hdr.num_chunks as usize));
// each thread needs a *read-only* copy of this transcript <-> gene map
let tid_to_gid_shared = std::sync::Arc::new(tid_to_gid);
// the number of reference sequences
let ref_count = hdr.ref_count as u32;
let (tx, rx) = bounded(n_workers);

// the types for the barcodes and umis
let bc_type = libradicl::decode_int_type_tag(bct).expect("unsupported barcode type id.");
let umi_type = libradicl::decode_int_type_tag(umit).expect("unsupported umi type id.");
// the number of genes (different than the number of reference sequences, which are transcripts)
let num_genes = gene_name_to_id.len();

for _cell_num in 0..(hdr.num_chunks as usize) {
let tx = tx.clone();
//eq_map.clear();
let (nbytes_chunk, nrec_chunk) = libradicl::Chunk::read_header(&mut br);
let mut buf = vec![0u8; nbytes_chunk as usize + 8];
buf.pwrite::<u32>(nbytes_chunk, 0)?;
buf.pwrite::<u32>(nrec_chunk, 4)?;
br.read_exact(&mut buf[8..]).unwrap();
let tid_to_gid = tid_to_gid.clone();
let log = log.clone();
let num_genes = gene_name_to_id.len();
let bc_type = bc_type.clone();
let umi_type = umi_type.clone();
//let resolution = resolution.clone();

pool.execute(move || {
let mut eq_map = EqMap::new(ref_count);
let mut nbr = BufReader::new(&buf[..]);
let mut unique_evidence = vec![false; num_genes];
let mut no_ambiguity = vec![false; num_genes];
let mut c = libradicl::Chunk::from_bytes(&mut nbr, &bc_type, &umi_type);
let bc = c.reads.first().expect("chunk with no reads").bc;
eq_map.init_from_chunk(&mut c);
let counts: Vec<f32>;
match resolution {
ResolutionStrategy::CellRangerLike => {
counts = pugutils::get_num_molecules_cell_ranger_like(
&eq_map,
&tid_to_gid,
num_genes,
&log,
);
}
ResolutionStrategy::Trivial => {
counts = pugutils::get_num_molecules_trivial_discard_all_ambig(
&eq_map,
&tid_to_gid,
num_genes,
&log,
);
}
ResolutionStrategy::Parsimony => {
let g = extract_graph(&eq_map, &log);
let gene_eqc = pugutils::get_num_molecules(&g, &eq_map, &tid_to_gid, &log);
counts = em_optimize(
&gene_eqc,
&mut unique_evidence,
&mut no_ambiguity,
num_genes,
true,
&log,
);
}
ResolutionStrategy::Full => {
let g = extract_graph(&eq_map, &log);
let gene_eqc = pugutils::get_num_molecules(&g, &eq_map, &tid_to_gid, &log);
counts = em_optimize(
&gene_eqc,
&mut unique_evidence,
&mut no_ambiguity,
num_genes,
false,
&log,
);
}
}
tx.send((bc, counts))
.expect("failed to send cell result over channel");
});
}
// create our output directory
let output_path = std::path::Path::new(&output_dir);
fs::create_dir_all(output_path)?;

let num_genes = gene_name_to_id.len();
// well need a protected handle to write out the barcode
let bc_path = output_path.join("barcodes.txt");
let bc_file = fs::File::create(bc_path)?;
let bc_writer = Arc::new(Mutex::new(BufWriter::new(bc_file)));

// TODO: guess capacity better
// TODO: in the future, we may not want to hold the
// entire triplet matrix in memory at once?
// @k3yavi: Changing this to usize for testing
let mut omat = TriMatI::<f32, usize>::new((num_genes, hdr.num_chunks as usize));
// this handle is protected since threads will need to update it
let omat = Arc::new(Mutex::new(TriMatI::<f32, usize>::new((num_genes, hdr.num_chunks as usize))));

let output_path = std::path::Path::new(&output_dir);
fs::create_dir_all(output_path)?;
// for each worker, spawn off a thread
for _worker in 0..n_workers {

let bc_path = output_path.join("barcodes.txt");
let bc_file = fs::File::create(bc_path)?;
let mut bc_writer = BufWriter::new(bc_file);

let mut c = 0usize;
rx.iter().take(hdr.num_chunks as usize).for_each(|x| {
pbar.inc(1);
for (i, v) in x.1.iter().enumerate() {
if *v > 0.0 {
//&mat_writer.write(format!("{}\t{}\t{}", i, c, *v).as_bytes()).expect("can't write to output file");
omat.add_triplet(i, c, *v);
// each thread will need to access the work queue
let in_q = q.clone();
// and the logger
let log = log.clone();
// the shared tid_to_gid map
let tid_to_gid = tid_to_gid_shared.clone();
// and the atomic counter of remaining work
let cells_remaining = cells_to_process.clone();
// they will need to know the bc and umi type
let bc_type = bc_type.clone();
let umi_type = umi_type.clone();
// will need a shared handle to the count matrix
let omatrix = Arc::clone(&omat);
// and the barcode file
let bcout = bc_writer.clone();
// and will need to know the barcode length
let bclen = ft_vals.bclen;

// now, make the worker thread
std::thread::spawn( move || {
// these can be created once and cleared after processing
// each cell.
let mut unique_evidence = vec![false; num_genes];
let mut no_ambiguity = vec![false; num_genes];
let mut eq_map = EqMap::new(ref_count);
// pop from the work queue until everything is
// processed
while cells_remaining.load(Ordering::SeqCst) > 0 {
match in_q.pop() {
Ok((cell_num, _nbyte, _nrec, buf)) => {
let mut nbr = BufReader::new(&buf[..]);
let mut c = libradicl::Chunk::from_bytes(&mut nbr, &bc_type, &umi_type);
let bc = c.reads.first().expect("chunk with no reads").bc;
eq_map.init_from_chunk(&mut c);

let counts: Vec<f32>;
match resolution {
ResolutionStrategy::CellRangerLike => {
counts = pugutils::get_num_molecules_cell_ranger_like(
&eq_map,
&tid_to_gid,
num_genes,
&log,
);
}
ResolutionStrategy::Trivial => {
counts = pugutils::get_num_molecules_trivial_discard_all_ambig(
&eq_map,
&tid_to_gid,
num_genes,
&log,
);
}
ResolutionStrategy::Parsimony => {
let g = extract_graph(&eq_map, &log);
let gene_eqc = pugutils::get_num_molecules(&g, &eq_map, &tid_to_gid, &log);
counts = em_optimize(
&gene_eqc,
&mut unique_evidence,
&mut no_ambiguity,
num_genes,
true,
&log,
);
}
ResolutionStrategy::Full => {
let g = extract_graph(&eq_map, &log);
let gene_eqc = pugutils::get_num_molecules(&g, &eq_map, &tid_to_gid, &log);
counts = em_optimize(
&gene_eqc,
&mut unique_evidence,
&mut no_ambiguity,
num_genes,
false,
&log,
);
}
}
// clear our local variables
eq_map.clear();
// Note: there is a fill method, but it is only on
// the nightly branch. Use this for now:
unique_evidence.clear(); unique_evidence.resize(num_genes, false);
no_ambiguity.clear(); no_ambiguity.resize(num_genes, false);
// done clearing

// update the matrix
{
let mut omat = omatrix.lock().unwrap();
for (i, v) in counts.iter().enumerate() {
if *v > 0.0 {
omat.add_triplet(i, cell_num, *v);
}
}
}

// write to barcode file
{
let bc_mer: BitKmer = (bc, bclen as u8);
let mut bc_writer = bcout.lock().unwrap();
write!(&mut bc_writer, "{}\t{}\n", cell_num,
unsafe{ std::str::from_utf8_unchecked(&bitmer_to_bytes(bc_mer)[..]) }).expect("can't write to barcode file.");
}

cells_remaining.fetch_sub(1, Ordering::SeqCst);
},
Err(_) => {}
}
}
let bc_mer: BitKmer = (x.0, ft_vals.bclen as u8);
bc_writer
.write(&bitmer_to_bytes(bc_mer)[..])
.expect("can't write to barcode file.");
bc_writer
.write(&"\n".as_bytes())
.expect("can't write to barcode file.");
c += 1;
});

//let mat_path = output_path.join("counts.mtx");
//sprs::io::write_matrix_market(&mat_path, &omat)?;
});
}

let mat_path = output_path.join("counts.eds.gz");
sce::eds::writer(&mat_path, &omat.to_csr())?;
let qp = q.clone();
let mut buf = vec![0u8; 65536];
for cell_num in 0..(hdr.num_chunks as usize) {
let (nbytes_chunk, nrec_chunk) = libradicl::Chunk::read_header(&mut br);
buf.resize(nbytes_chunk as usize + 8, 0);
buf.pwrite::<u32>(nbytes_chunk, 0)?;
buf.pwrite::<u32>(nrec_chunk, 4)?;
br.read_exact(&mut buf[8..]).unwrap();
loop {
let r = qp.push((cell_num, nbytes_chunk, nrec_chunk, buf.clone()));
match r {
Ok(_) => { pbar.inc(1); break;},
_ => {}
}
}
}

let gn_path = output_path.join("gene_names.txt");
let gn_file = File::create(gn_path).expect("couldn't create gene name file.");
Expand All @@ -451,6 +509,16 @@ pub fn quantify(
gn_writer.write(format!("{}\n", g).as_bytes())?;
}

pbar.finish_with_message("processed all cells.");
//let mat_path = output_path.join("counts.mtx");
//sprs::io::write_matrix_market(&mat_path, &omat)?;
while cells_to_process.load(Ordering::SeqCst) > 0 {
// waiting
}

let mat_path = output_path.join("counts.eds.gz");
{
let omat = omat.lock().unwrap();
sce::eds::writer(&mat_path, &omat.to_csr())?;
}
Ok(())
}
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use libradicl::cellfilter::{generate_permit_list, CellFilterMethod};
use libradicl::schema::ResolutionStrategy;
use mimalloc::MiMalloc;
use rand::Rng;
use slog::{crit, info, o, warn, Drain};
use slog::{crit, o, warn, Drain};
use std::unimplemented;

#[global_allocator]
Expand Down

0 comments on commit 0742dc0

Please sign in to comment.