Skip to content

Commit

Permalink
m
Browse files Browse the repository at this point in the history
  • Loading branch information
Crispy13 committed Feb 19, 2024
1 parent 2b88add commit 05f6db9
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 58 deletions.
4 changes: 2 additions & 2 deletions src/core/fasta_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::aux::he::{
make_custom_error, make_custom_error3, make_custom_error4, ErrorExplained, OrExaplain,
};
use crate::aux::limited_bufreader::LimitedBufReader;
use crate::aux::pbar::prepare_pbar;
use crate::aux::pbar::{prepare_pbar, prepare_pbar_force};

use super::fusion_scan::Error;

Expand Down Expand Up @@ -191,7 +191,7 @@ impl FastaReader {
}

pub(crate) fn read_all(&mut self) {
let pbar = prepare_pbar(0);
let pbar = prepare_pbar_force(0);
pbar.set_message("Reading references...");
while self.read_next() {
pbar.inc(1);
Expand Down
4 changes: 2 additions & 2 deletions src/core/fusion_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl<'s> FusionMapper<'s> {
),
}
}
pub(crate) fn filter_matches(&mut self, inner_thread_pool:&ThreadPool) -> () {
pub(crate) fn filter_matches(&mut self, inner_thread_pool:Option<&ThreadPool>) -> () {
// calc the sequence number before any filtering
// let mut total = 0;
// for fm in self.fusion_matches.lock().unwrap().iter() {
Expand Down Expand Up @@ -485,7 +485,7 @@ impl<'s> FusionMapper<'s> {
log::info!("found {} fusions", self.m_fusion_results.len(),);
}

fn remove_alignables(&mut self, inner_thread_pool:&ThreadPool) -> () {
fn remove_alignables(&mut self, inner_thread_pool:Option<&ThreadPool>) -> () {
if self.get_ref().is_none() {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/fusion_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl FusionScan {
// exit(0);
// }

let mut scanner_m_ref = ScannerFastaReader::new(m_reference);
let scanner_m_ref = ScannerFastaReader::new(m_reference);

let fusion_csv_paths = self.get_fusion_csv_vec_from_input()?;
let (html_file_paths, json_file_paths) =
Expand Down
52 changes: 30 additions & 22 deletions src/core/matcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Matcher {
pub(crate) fn from_ref_and_seqs(
fasta_ref: Arc<FastaReader>,
seqs: &[Sequence],
inner_thread_pool:&ThreadPool,
inner_thread_pool:Option<&ThreadPool>,
) -> Self {
let mut matcher = Self {
m_kmer_positions: HashMap::with_hasher(GFHasherBuilder::new()),
Expand Down Expand Up @@ -117,7 +117,7 @@ impl Matcher {

// }

fn make_index(&mut self, itp:&ThreadPool) {
fn make_index(&mut self, itp:Option<&ThreadPool>) {
if self.m_reference.is_none() {
return;
}
Expand All @@ -131,26 +131,34 @@ impl Matcher {

log::debug!("indexing contig per ctg_name...");
// let mut seq_cv= Vec::new();
itp.install(|| {
contig_ref.iter().enumerate().par_bridge().for_each(|(ctg, e)| {
let (ctg_name, s) = (e.0.as_str(), e.1.as_str());
let seq_cv = s
.as_bytes()
.iter()
.copied()
.map(|b| b.to_ascii_uppercase())
.collect::<Vec<u8>>();
// let s = s.to_uppercase();
m_contig_names.lock().unwrap().push(ctg_name.to_owned());

//index forward
self.index_contig_bytes(ctg as i32, &seq_cv, 0, &m_kmer_positions);

//index reverse complement
// ctg.fetch_add(1, Ordering::Relaxed);
// seq_cv.clear();
});
});

let do_index_contig = |(ctg, e):(usize, (&String, &String))| {
let (ctg_name, s) = (e.0.as_str(), e.1.as_str());
let seq_cv = s
.as_bytes()
.iter()
.copied()
.map(|b| b.to_ascii_uppercase())
.collect::<Vec<u8>>();
// let s = s.to_uppercase();
m_contig_names.lock().unwrap().push(ctg_name.to_owned());

//index forward
self.index_contig_bytes(ctg as i32, &seq_cv, 0, &m_kmer_positions);

//index reverse complement
// ctg.fetch_add(1, Ordering::Relaxed);
// seq_cv.clear();
};

match itp{
Some(itp) => {
itp.install(|| {
contig_ref.iter().enumerate().par_bridge().for_each(do_index_contig);
});
},
None => contig_ref.iter().enumerate().for_each(do_index_contig),
};


log::info!("matcher indexing done");
Expand Down
40 changes: 26 additions & 14 deletions src/core/pescanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub(crate) struct PairEndScanner<'s> {
m_produce_finished: AtomicBool,
m_thread_num: i32,
m_fusion_mapper_o: Option<FusionMapper<'s>>,
m_thread_pool: ThreadPool,
m_thread_pool: Option<ThreadPool>,
input_seq_pairs: Option<&'s [SequenceReadPair]>,
}

Expand All @@ -75,10 +75,16 @@ impl<'s> PairEndScanner<'s> {
thread_num: i32,
input_seq_pairs: Option<&'s [SequenceReadPair]>,
) -> Self {
let itp = ThreadPoolBuilder::new()
.num_threads(thread_num as usize)
.build()
.unwrap();
let itp = if thread_num > 1 {
let tp = ThreadPoolBuilder::new()
.num_threads(thread_num as usize)
.build()
.unwrap();

Some(tp)
} else {
None
};

Self {
m_fusion_file: fusion_file,
Expand Down Expand Up @@ -288,15 +294,21 @@ impl<'s> PairEndScanner<'s> {
}

fn _scan(&mut self) -> Result<bool, Error> {
self.m_thread_pool.scope(|tps| {
tps.spawn(|tps| self.producer_task().unwrap());
match self.m_thread_pool {
Some(ref itp) => {
itp.scope(|tps| {
tps.spawn(|tps| self.producer_task().unwrap());

for t in (0..(rayon::current_num_threads() - 1)) {
tps.spawn(|tps| {
self.consumer_task().unwrap();
})
}
});

for t in (0..(rayon::current_num_threads() - 1)) {
tps.spawn(|tps| {
self.consumer_task().unwrap();
})
}
});
},
None => self.producer_task().unwrap(),
}

log::debug!("Produced and consumed all the tasks.");
let m_fusion_mapper = self.m_fusion_mapper_o.as_mut().unwrap();
Expand All @@ -320,7 +332,7 @@ impl<'s> PairEndScanner<'s> {
// exit(0);

log::debug!("run matches methods...");
m_fusion_mapper.filter_matches(&self.m_thread_pool);
m_fusion_mapper.filter_matches(self.m_thread_pool.as_ref());
m_fusion_mapper.sort_matches();
m_fusion_mapper.cluster_matches();

Expand Down
65 changes: 48 additions & 17 deletions src/core/sescanner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{
error,
sync::{atomic::{AtomicBool, AtomicUsize, Ordering}, Arc},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
thread::sleep,
time::Duration,
};
Expand All @@ -15,7 +18,13 @@ use crate::core::{
};

use super::{
fasta_reader::FastaReader, fusion_mapper::FusionMapper, fusion_scan::Error, html_reporter::HtmlReporter, json_reporter::JsonReporter, read::{SequenceRead, SequenceReadCow}, read_match::ReadMatch
fasta_reader::FastaReader,
fusion_mapper::FusionMapper,
fusion_scan::Error,
html_reporter::HtmlReporter,
json_reporter::JsonReporter,
read::{SequenceRead, SequenceReadCow},
read_match::ReadMatch,
};

#[derive(Debug)]
Expand Down Expand Up @@ -45,7 +54,7 @@ pub(crate) struct SingleEndScanner<'s> {
m_produce_finished: AtomicBool,
m_thread_num: i32,
m_fusion_mapper_o: Option<FusionMapper<'s>>,
m_thread_pool: ThreadPool,
m_thread_pool: Option<ThreadPool>,
input_seqs: Option<&'s [SequenceRead]>,
}

Expand All @@ -57,9 +66,18 @@ impl<'s> SingleEndScanner<'s> {
html: String,
json: String,
thread_num: i32,
input_seq_pairs:Option<&'s [SequenceRead]>,
input_seq_pairs: Option<&'s [SequenceRead]>,
) -> Self {
let itp = ThreadPoolBuilder::new().num_threads(thread_num as usize).build().unwrap();
let itp = if thread_num > 1 {
let tp = ThreadPoolBuilder::new()
.num_threads(thread_num as usize)
.build()
.unwrap();

Some(tp)
} else {
None
};

Self {
m_fusion_file: fusion_file,
Expand All @@ -73,7 +91,6 @@ impl<'s> SingleEndScanner<'s> {
m_fusion_mapper_o: None,
m_thread_pool: itp,
input_seqs: input_seq_pairs,

// repo_not_full: Condvar::new(),
// repo_not_empty: Condvar::new(),
}
Expand All @@ -84,15 +101,30 @@ impl<'s> SingleEndScanner<'s> {
}

fn _scan(&mut self) -> Result<bool, Error> {
self.m_thread_pool.scope(|tps| {
tps.spawn(|tps| self.producer_task().unwrap());
match self.m_thread_pool {
Some(ref itp) => {
itp.scope(|tps| {
tps.spawn(|tps| self.producer_task().unwrap());

for t in (0..(rayon::current_num_threads() - 1)) {
tps.spawn(|tps| {
self.consumer_task().unwrap();
})
}
});

for t in (0..(rayon::current_num_threads() - 1)) {
tps.spawn(|tps| {
self.consumer_task().unwrap();
})
}
});
},
None => self.producer_task().unwrap(),
}
// self.m_thread_pool.scope(|tps| {
// tps.spawn(|tps| self.producer_task().unwrap());

// for t in (0..(rayon::current_num_threads() - 1)) {
// tps.spawn(|tps| {
// self.consumer_task().unwrap();
// })
// }
// });

log::debug!("Produced and consumed all the tasks.");
let m_fusion_mapper = self.m_fusion_mapper_o.as_mut().unwrap();
Expand All @@ -116,7 +148,7 @@ impl<'s> SingleEndScanner<'s> {
// exit(0);

log::debug!("run matches methods...");
m_fusion_mapper.filter_matches(&self.m_thread_pool);
m_fusion_mapper.filter_matches(self.m_thread_pool.as_ref());
m_fusion_mapper.sort_matches();
m_fusion_mapper.cluster_matches();

Expand Down Expand Up @@ -410,7 +442,6 @@ impl<'s> SingleEndScanner<'s> {
// }
}


struct FastqReaderWrapper<'s> {
input_seq_pairs_iter: Option<std::slice::Iter<'s, SequenceRead>>,
fastq_reader_pair: Option<FastqReader>,
Expand Down Expand Up @@ -439,4 +470,4 @@ impl<'s> FastqReaderWrapper<'s> {
.map(SequenceReadCow::Owned),
}
}
}
}

0 comments on commit 05f6db9

Please sign in to comment.