Skip to content

Commit

Permalink
not working yet to new branch
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-p committed Feb 14, 2022
1 parent d7b6ec4 commit db632bd
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 55 deletions.
223 changes: 172 additions & 51 deletions src/cellfilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,36 @@
*
* License: 3-clause BSD, see https://opensource.org/licenses/BSD-3-Clause
*/
use crate::utils as afutils;

use slog::crit;
use slog::info;
#[allow(unused_imports)]
use slog::{crit, info, warn};

use crate::utils as afutils;
use crate::io_utils;
#[allow(unused_imports)]
use ahash::{AHasher, RandomState};
use bio_types::strand::Strand;
use bstr::io::BufReadExt;
use crossbeam_queue::ArrayQueue;
use itertools::Itertools;
use libradicl::exit_codes;
use libradicl::rad_types;
use libradicl::BarcodeLookupMap;
use needletail::bitkmer::*;
use num_format::{Locale, ToFormattedString};
use scroll::Pread;
use serde_json::json;
use std::cmp;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader, Read};
use std::io::{BufWriter, Write};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Instant;

#[derive(Clone)]
pub enum CellFilterMethod {
// cut off at this cell in
// the frequency sorted list
Expand Down Expand Up @@ -560,6 +567,16 @@ fn process_filtered(
num_corrected
}

struct FilterPassInfo {
unfiltered_bc_counts: Option<HashMap<u64, u64, ahash::RandomState>>,
hm: HashMap<u64, u64, ahash::RandomState>,
unmatched_bc: Vec<u64>,
max_ambiguity_read: usize,
num_reads: usize,
num_orientation_compat_reads: usize,
expected_ori: Strand,
}

/// Given the input RAD file `input_file`, compute
/// and output (in `output_dir`) the list of valid
/// (i.e. "permitted") barcode values, as well as
Expand Down Expand Up @@ -662,6 +679,17 @@ pub fn generate_permit_list(
let umi_type = rad_types::decode_int_type_tag(umit.expect("no umi tag description present"))
.expect("unknown barcode type id.");

// one thread will read off the file and the
// other will run through and populate the maps
let n_workers = 1;
let q = Arc::new(ArrayQueue::<io_utils::MetaChunk>::new(4 * n_workers));

// the number of chunks left to process
let chunks_to_process = Arc::new(AtomicUsize::new(hdr.num_chunks as usize));

// the thread handles to hold the return values of the workers
let mut thread_handles: Vec<thread::JoinHandle<usize>> = Vec::with_capacity(n_workers);

// if dealing with filtered type
let s = ahash::RandomState::with_seeds(2u64, 7u64, 1u64, 8u64);
let mut hm = HashMap::with_hasher(s);
Expand All @@ -672,75 +700,168 @@ pub fn generate_permit_list(
let mut num_orientation_compat_reads = 0usize;
let mut max_ambiguity_read = 0usize;

match filter_meth {
CellFilterMethod::UnfilteredExternalList(_, _min_reads) => {
unmatched_bc = Vec::with_capacity(10000000);
// the unfiltered_bc_count map must be valid in this branch
if let Some(mut hmu) = unfiltered_bc_counts {
for _ in 0..(hdr.num_chunks as usize) {
let c = rad_types::Chunk::from_bytes(&mut br, &bc_type, &umi_type);
num_orientation_compat_reads += update_barcode_hist_unfiltered(
&mut hmu,
&mut unmatched_bc,
&mut max_ambiguity_read,
&c,
&expected_ori,
);
num_reads += c.reads.len();
// this will be the information filled out
// in our reader thread.
let fp_info = Arc::new(Mutex::new(FilterPassInfo {
unfiltered_bc_counts,
hm,
unmatched_bc,
max_ambiguity_read,
num_reads,
num_orientation_compat_reads,
expected_ori,
}));

// since we will use a move closure, we need clones of
// all of the Arcs we will move into the thread

// our main struct holding important variables
let local_fp_info = fp_info.clone();
// the filering method
let local_filter_meth = filter_meth.clone();
// each thread will need to access the work queue
let in_q = q.clone();
// and the atomic counter of remaining work
let chunks_remaining = chunks_to_process.clone();
// the logger
let thread_log = log.clone();

// now, make the worker thread
let handle = std::thread::spawn(move || {
let mut local_nrec = 0usize;
// nobody else should be able to get this
// since there is only one worker thread.
if let Ok(mut fp) = local_fp_info.lock() {
let eo = fp.expected_ori.clone();

let mut hm = &mut fp.hm;
let mut ubc = &mut fp.unmatched_bc;
let mut mar = &mut fp.max_ambiguity_read;
let mut num_orientation_compat_reads = &mut fp.num_orientation_compat_reads;
let mut num_reads = &mut fp.num_reads;

match local_filter_meth {
CellFilterMethod::UnfilteredExternalList(_, _min_reads) => {
*ubc = Vec::with_capacity(10000000);
}
_ => {}
};

// pop MetaChunks from the work queue until everything is
// processed
while chunks_remaining.load(Ordering::SeqCst) > 0 {
if let Some((
first_cell_in_chunk,
cells_in_chunk,
_nbytes_total,
_nrec_total,
buf,
)) = in_q.pop()
{
// for every cell (chunk) within this meta-chunk
let mut byte_offset = 0usize;
for cn in 0..cells_in_chunk {
chunks_remaining.fetch_sub(1, Ordering::SeqCst);
let cell_num = first_cell_in_chunk + cn;
// nbytes for the current cell
let nbytes = buf[byte_offset..].pread::<u32>(0).unwrap();
let nrec = buf[byte_offset..].pread::<u32>(4).unwrap();
local_nrec += nrec as usize;
let mut nbr =
BufReader::new(&buf[byte_offset..(byte_offset + nbytes as usize)]);
byte_offset += nbytes as usize;

let c = rad_types::Chunk::from_bytes(&mut nbr, &bc_type, &umi_type);
if c.reads.is_empty() {
warn!(thread_log, "Discovered empty chunk; should not happen! cell_num = {}, nbytes = {}, nrec = {}", cell_num, nbytes, nrec);
}

match local_filter_meth {
CellFilterMethod::UnfilteredExternalList(_, _min_reads) => {
let mut hmu = fp.unfiltered_bc_counts.as_ref().unwrap();
*num_orientation_compat_reads +=
update_barcode_hist_unfiltered(&mut hmu, &mut ubc, &mut mar, &c, &eo);
*num_reads += c.reads.len();
}
_ => {
update_barcode_hist(&mut hm, &mut mar, &c, &eo);
*num_reads += c.reads.len();
}
}
}
}
info!(
log,
"observed {} reads ({} orientation consistent) in {} chunks --- max ambiguity read occurs in {} refs",
num_reads.to_formatted_string(&Locale::en),
num_orientation_compat_reads.to_formatted_string(&Locale::en),
hdr.num_chunks.to_formatted_string(&Locale::en),
max_ambiguity_read.to_formatted_string(&Locale::en)
);
Ok(process_unfiltered(
hmu,
unmatched_bc,
&ft_vals,
&filter_meth,
expected_ori,
&output_dir,
version,
max_ambiguity_read,
velo_mode,
cmdline,
log,
))
} else {
Ok(0)
}
}
_ => {
for _ in 0..(hdr.num_chunks as usize) {
let c = rad_types::Chunk::from_bytes(&mut br, &bc_type, &umi_type);
update_barcode_hist(&mut hm, &mut max_ambiguity_read, &c, &expected_ori);
num_reads += c.reads.len();
local_nrec
});
thread_handles.push(handle);

io_utils::fill_work_queue(q, br, hdr.num_chunks as usize, None)?;

let mut _total_records = 0usize;
for h in thread_handles {
match h.join() {
Ok(rc) => {
_total_records += rc;
}
Err(_e) => {
info!(log, "thread panicked");
}
}
}

let main_fp = fp_info.lock().unwrap();

let unmatched_bc = main_fp.unmatched_bc;

match filter_meth {
CellFilterMethod::UnfilteredExternalList(_, _min_reads) => {
info!(
log,
"observed {} reads ({} orientation consistent) in {} chunks --- max ambiguity read occurs in {} refs",
main_fp.num_reads.to_formatted_string(&Locale::en),
main_fp.num_orientation_compat_reads.to_formatted_string(&Locale::en),
hdr.num_chunks.to_formatted_string(&Locale::en),
main_fp.max_ambiguity_read.to_formatted_string(&Locale::en)
);

let hmu = main_fp.unfiltered_bc_counts.as_ref().unwrap();
Ok(process_unfiltered(
*hmu,
unmatched_bc,
&ft_vals,
&filter_meth,
expected_ori,
&output_dir,
version,
main_fp.max_ambiguity_read,
velo_mode,
cmdline,
log,
))
}
_ => {
info!(
log,
"observed {} reads in {} chunks --- max ambiguity read occurs in {} refs",
num_reads.to_formatted_string(&Locale::en),
main_fp.num_reads.to_formatted_string(&Locale::en),
hdr.num_chunks.to_formatted_string(&Locale::en),
max_ambiguity_read.to_formatted_string(&Locale::en)
main_fp.max_ambiguity_read.to_formatted_string(&Locale::en)
);
Ok(process_filtered(
&hm,
&main_fp.hm,
&ft_vals,
&filter_meth,
expected_ori,
&output_dir,
version,
max_ambiguity_read,
main_fp.max_ambiguity_read,
velo_mode,
cmdline,
log,
))
}
}

/*
let valid_bc: Vec<u64>;
let mut freq: Vec<u64> = hm.values().cloned().collect();
Expand Down
7 changes: 4 additions & 3 deletions src/io_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub(crate) fn fill_work_queue<T: Read>(
q: Arc<ArrayQueue<MetaChunk>>,
mut br: T,
num_chunks: usize,
pbar: &ProgressBar,
pbar: Option<&ProgressBar>,
) -> Result<(), Box<dyn std::error::Error>> {
const BUFSIZE: usize = 524208;
// the buffer that will hold our records
Expand Down Expand Up @@ -88,8 +88,9 @@ pub(crate) fn fill_work_queue<T: Read>(
// no point trying to push if the queue is full
while q.is_full() {}
}
pbar.inc(cells_in_chunk as u64);

if let Some(pb) = pbar {
pb.inc(cells_in_chunk as u64);
}
// offset of the first cell in the next chunk
first_cell += cells_in_chunk;
// reset the counters
Expand Down
2 changes: 1 addition & 1 deletion src/quant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,7 @@ pub fn do_quantify<T: Read>(
)?;
} else {
// we're quantifying everything
io_utils::fill_work_queue(q, br, hdr.num_chunks as usize, &pbar)?;
io_utils::fill_work_queue(q, br, hdr.num_chunks as usize, Some(&pbar))?;
}

let gn_path = output_matrix_path.join("quants_mat_cols.txt");
Expand Down

0 comments on commit db632bd

Please sign in to comment.