Skip to content

Commit

Permalink
Merge pull request #9 from COMBINE-lab/parallel-collate
Browse files Browse the repository at this point in the history
Parallel collate
  • Loading branch information
rob-p committed Aug 27, 2020
2 parents dc242ca + 6f9ee8c commit 74b2e2c
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 19 deletions.
12 changes: 10 additions & 2 deletions libradicl/src/cellfilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use libradicl::exit_codes;
use needletail::bitkmer::*;
use num_format::{Locale, ToFormattedString};
use serde_json::json;
use std::cmp;
use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
Expand Down Expand Up @@ -292,8 +293,15 @@ pub fn generate_permit_list(
CellFilterMethod::ExplicitList(valid_bc_file) => {
valid_bc = libradicl::permit_list_from_file(valid_bc_file, ft_vals.bclen);
}
CellFilterMethod::ExpectCells(_expected_num_cells) => {
unimplemented!();
CellFilterMethod::ExpectCells(expected_num_cells) => {
let robust_quantile = 0.99f64;
let robust_div = 10.0f64;
let robust_ind = (expected_num_cells as f64 * robust_quantile).round() as u64;
// the robust ind must be valid
let ind = cmp::min(freq.len() - 1, robust_ind as usize);
let robust_freq = freq[ind];
let min_freq = std::cmp::max(1u64, (robust_freq as f64 / robust_div).round() as u64);
valid_bc = libradicl::permit_list_from_threshold(&hm, min_freq);
}
}

Expand Down
68 changes: 56 additions & 12 deletions libradicl/src/quant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ pub fn quantify(
num_bootstraps: u32,
init_uniform: bool,
summary_stat: bool,
use_mtx: bool,
resolution: ResolutionStrategy,
//no_em: bool,
//naive: bool,
Expand Down Expand Up @@ -414,7 +415,7 @@ pub fn quantify(
let bootstrap_path = output_path.join("bootstraps.eds.gz");
let bootstrap_mean_path = output_path.join("bootstraps_mean.eds.gz");
let bootstrap_var_path = output_path.join("bootstraps_var.eds.gz");
let buffered = GzEncoder::new(fs::File::create(mat_path)?, Compression::default());
let buffered = GzEncoder::new(fs::File::create(&mat_path)?, Compression::default());

let ff_path = output_path.join("features.txt");
let mut ff_file = fs::File::create(ff_path)?;
Expand Down Expand Up @@ -451,10 +452,23 @@ pub fn quantify(
// BufWriter::new(bt_buffered),
// ));

let tmcap = if use_mtx {
(0.2f64 * num_genes as f64 * hdr.num_chunks as f64).round() as usize
} else {
0usize
};

let trimat = sprs::TriMatI::<f32, u32>::with_capacity(
(hdr.num_chunks as usize, num_genes as usize),
tmcap,
);

let bc_writer = Arc::new(Mutex::new((
BufWriter::new(bc_file),
BufWriter::new(buffered),
BufWriter::new(ff_file),
trimat,
0usize,
)));

let mut thread_handles: Vec<thread::JoinHandle<_>> = Vec::with_capacity(n_workers);
Expand Down Expand Up @@ -499,7 +513,8 @@ pub fn quantify(
let mut no_ambiguity = vec![false; num_genes];
let mut eq_map = EqMap::new(ref_count);
let mut expressed_vec = Vec::<f32>::with_capacity(num_genes);

let mut expressed_ind = Vec::<usize>::with_capacity(num_genes);
let mut eds_bytes = Vec::<u8>::new();
// pop from the work queue until everything is
// processed
while cells_remaining.load(Ordering::SeqCst) > 0 {
Expand Down Expand Up @@ -639,12 +654,14 @@ pub fn quantify(
let mut sum_umi = 0.0f32;
let mut num_expr: u32 = 0;
expressed_vec.clear();
for c in &counts {
expressed_ind.clear();
for (gn, c) in counts.iter().enumerate() {
max_umi = if *c > max_umi { *c } else { max_umi };
sum_umi += *c;
if *c > 0.0 {
num_expr += 1;
expressed_vec.push(*c);
expressed_ind.push(gn);
}
}

Expand All @@ -662,27 +679,42 @@ pub fn quantify(
{
// writing the files
let bc_mer: BitKmer = (bc, bclen as u8);
let eds_bytes: Vec<u8> = sce::eds::as_bytes(&counts, num_genes)
.expect("can't conver vector to eds");

if !use_mtx {
eds_bytes = sce::eds::as_bytes(&counts, num_genes)
.expect("can't conver vector to eds");
}

let writer_deref = bcout.lock();
let writer = &mut *writer_deref.unwrap();

// get the row index and then increment it
let row_index = writer.4;
writer.4 += 1;

// write to barcode file
writeln!(&mut writer.0, "{}\t{}", cell_num, unsafe {
writeln!(&mut writer.0, "{}", unsafe {
std::str::from_utf8_unchecked(&bitmer_to_bytes(bc_mer)[..])
})
.expect("can't write to barcode file.");

// write to matrix file
writer
.1
.write_all(&eds_bytes)
.expect("can't write to matrix file.");

if !use_mtx {
// write in eds format
writer
.1
.write_all(&eds_bytes)
.expect("can't write to matrix file.");
} else {
// fill out the triplet matrix in memory
for (ind, val) in expressed_ind.iter().zip(expressed_vec.iter()) {
writer.3.add_triplet(row_index as usize, *ind, *val);
}
}
writeln!(
&mut writer.2,
"{}\t{}\t{}\t{}\t{}\t{}\t{}",
cell_num,
row_index,
num_reads,
sum_umi,
dedup_rate,
Expand Down Expand Up @@ -779,6 +811,18 @@ pub fn quantify(
}
}

// write to matrix market if we are using it
if use_mtx {
let writer_deref = bc_writer.lock();
let writer = &mut *writer_deref.unwrap();
writer.1.flush().unwrap();
//drop(writer.1.into_inner().expect("couldn't unwrap"));
// now remove it
fs::remove_file(&mat_path)?;
let mtx_path = output_matrix_path.join("quants_mat.mtx");
sprs::io::write_matrix_market(&mtx_path, &writer.3)?;
}

let pb_msg = format!("finished quantifying {} cells.", hdr.num_chunks);
pbar.finish_with_message(&pb_msg);

Expand Down
9 changes: 4 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use libradicl::schema::ResolutionStrategy;
use mimalloc::MiMalloc;
use rand::Rng;
use slog::{crit, o, warn, Drain};
use std::unimplemented;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
Expand Down Expand Up @@ -96,6 +95,7 @@ fn main() {
.arg(Arg::from("-b, --num-bootstraps 'number of bootstraps to use'").default_value("0"))
.arg(Arg::from("--init-uniform 'flag for uniform sampling'").requires("num-bootstraps").takes_value(false).required(false))
.arg(Arg::from("--summary-stat 'flag for storing only summary statistics'").requires("num-bootstraps").takes_value(false).required(false))
.arg(Arg::from("--use-mtx 'flag for writing output matrix in matrix market instead of EDS'").takes_value(false).required(false))
.arg(Arg::from("-r, --resolution 'the resolution strategy by which molecules will be counted'")
.possible_values(&["full", "trivial", "cr-like", "cr-like-em", "parsimony"])
.default_value("full")
Expand Down Expand Up @@ -166,16 +166,13 @@ fn main() {

let mut fmeth = CellFilterMethod::KneeFinding;

let expect_cells: Option<usize> = match t.value_of_t("expect-cells") {
let _expect_cells: Option<usize> = match t.value_of_t("expect-cells") {
Ok(v) => {
fmeth = CellFilterMethod::ExpectCells(v);
Some(v)
}
Err(_) => None,
};
if expect_cells.is_some() {
unimplemented!();
}

if t.is_present("knee-distance") {
fmeth = CellFilterMethod::KneeFinding;
Expand Down Expand Up @@ -216,6 +213,7 @@ fn main() {
let num_bootstraps = t.value_of_t("num-bootstraps").unwrap();
let init_uniform = t.is_present("init-uniform");
let summary_stat = t.is_present("summary-stat");
let use_mtx = t.is_present("use-mtx");
let input_dir = t.value_of_t("input-dir").unwrap();
let output_dir = t.value_of_t("output-dir").unwrap();
let tg_map = t.value_of_t("tg-map").unwrap();
Expand All @@ -228,6 +226,7 @@ fn main() {
num_bootstraps,
init_uniform,
summary_stat,
use_mtx,
resolution,
&log,
)
Expand Down

0 comments on commit 74b2e2c

Please sign in to comment.