Skip to content

Commit

Permalink
Merge pull request #1 from Crispy13/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
Crispy13 committed Feb 18, 2024
2 parents bf710ad + 03b015a commit 0eb719f
Show file tree
Hide file tree
Showing 20 changed files with 403 additions and 102 deletions.
14 changes: 13 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,16 @@ hg38_fusion_csv_list.txt
hg19_fusion_csv_list.txt
genefuse.gz
genefuse
TODO.md
TODO.md
heaptrack.genefuse.3393.zst
heaptrack.genefuse.19481.zst
heaptrack.genefuse.24376.zst
heaptrack.genefuse.24844.zst
*.zst
*.fg
*.massif
hp2.fg
*.histo
*.fg
ht4.fg
ht5.fg
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "genefuse"
version = "0.1.0"
version = "0.1.2"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# GeneFuse vRust
This program is a Rust porting version based on [**GeneFuse 0.8.0**](https://github.com/OpenGene/GeneFuse) (original language: C++)
This program is a Rust porting version based on [**GeneFuse 0.8.0**](https://github.com/OpenGene/GeneFuse)


## How to use: Build from source
Expand Down
17 changes: 17 additions & 0 deletions src/aux/pbar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,24 @@ use std::fmt::Write;

use indicatif::{ProgressBar, ProgressDrawTarget, ProgressState, ProgressStyle};

use crate::core::fusion_scan::MULTI_CSV_MODE;

pub fn prepare_pbar(len: u64) -> ProgressBar {
// will not use progress bar when in multi-csv-mode.
if *MULTI_CSV_MODE.get().unwrap() {
return ProgressBar::hidden();
}

_prepare_pbar(len)

}

/// Make a progress bar ignoring suppresing conditions. (e.g. multi-csv-mode)
pub(crate) fn prepare_pbar_force(len: u64) -> ProgressBar {
_prepare_pbar(len)
}

fn _prepare_pbar(len: u64) -> ProgressBar {
let pb = ProgressBar::new(len);
pb.set_draw_target(ProgressDrawTarget::stderr_with_hz(8));

Expand Down
2 changes: 1 addition & 1 deletion src/core/fastq_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl FastqReader
let mut s = String::new();
let name = if let Ok(true) = buf_reader.read_line(&mut s).and_then(|rl| Ok(rl > 0)) {
let mut c = s.clone();
c.pop().and_then(|ch| {
c.pop().and_then(|ch| { // remove newline character if it exists
if ch != '\n' {
c.push(ch);
}
Expand Down
24 changes: 15 additions & 9 deletions src/core/fusion_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,24 @@ use crate::{
global_settings::global_settings,
pbar::{prepare_pbar, PBSummary},
},
core::{fusion, pescanner::DBT, sequence::Sequence},
core::{fusion, fusion_scan::MULTI_CSV_MODE, pescanner::DBT, sequence::Sequence},
utils::{dis_connected_count, StringCPP},
};

use super::{
edit_distance::edit_distance, fasta_reader::FastaReader, fusion::Fusion, fusion_result::FusionResult, fusion_scan::Error, indexer::{Indexer, SeqMatch}, matcher::Matcher, read::SequenceRead, read_match::ReadMatch, sequence::reverse_complement
};

pub(crate) struct FusionMapper {
pub(crate) struct FusionMapper<'s> {
pub(crate) m_ref_file: String,
pub(crate) m_fusion_match_size: i32,
pub(crate) m_indexer: Indexer,
pub(crate) fusion_list: Vec<Fusion>,
pub(crate) fusion_matches: Mutex<Vec<Vec<ReadMatch>>>,
pub(crate) m_fusion_results: Vec<FusionResult>,
pub(crate) fusion_matches: Mutex<Vec<Vec<ReadMatch<'s>>>>,
pub(crate) m_fusion_results: Vec<FusionResult<'s>>,
}

impl FusionMapper {
impl<'s> FusionMapper<'s> {
pub(crate) fn from_ref_and_fusion_files(
ref_file: &str,
fusion_file: &str,
Expand Down Expand Up @@ -96,7 +96,7 @@ impl FusionMapper {
mapable: &mut bool,
distance_req: i32,
qual_req: i32,
) -> Result<Option<ReadMatch>, Error> {
) -> Result<Option<ReadMatch<'s>>, Error> {
let mut mapping = self.m_indexer.map_read(r);

// if r.m_name.contains(DBT) {
Expand Down Expand Up @@ -151,7 +151,7 @@ impl FusionMapper {
// self.m_indexer.get_ref_mut()
// }

fn make_match(&self, r: &SequenceRead, mapping: &mut [SeqMatch]) -> Option<ReadMatch> {
fn make_match(&self, r: &SequenceRead, mapping: &mut [SeqMatch]) -> Option<ReadMatch<'s>> {
if mapping.len() != 2 {
return None;
}
Expand Down Expand Up @@ -250,7 +250,7 @@ impl FusionMapper {
edit_distance(&ss, ss.len(), &ref_str, ref_str.len()) as i32
}

pub(crate) fn add_match(&self, m: ReadMatch) -> () {
pub(crate) fn add_match(&self, m: ReadMatch<'s>) -> () {
let left_contig = m.m_left_gp.contig;
let right_contig = m.m_right_gp.contig;

Expand Down Expand Up @@ -402,6 +402,8 @@ impl FusionMapper {
"fusion_matches_len={}",
self.fusion_matches.lock().unwrap().len()
);

let multi_csv_mode = MULTI_CSV_MODE.get().unwrap().clone();
for (i, fm) in (0..(self.m_fusion_match_size)).zip(
self.fusion_matches
.lock()
Expand Down Expand Up @@ -469,7 +471,11 @@ impl FusionMapper {
continue;
}
}
fr.print(&self.fusion_list);

if !multi_csv_mode {
fr.print(&self.fusion_list);
}

self.m_fusion_results.push(fr);
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/core/fusion_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use std::{
};

#[derive(Default)]
pub(crate) struct FusionResult {
pub(crate) struct FusionResult<'s> {
pub(crate) m_left_gp: GenePos,
pub(crate) m_right_gp: GenePos,
pub(crate) m_matches: Vec<ReadMatch>,
pub(crate) m_matches: Vec<ReadMatch<'s>>,
pub(crate) m_unique: i32,
pub(crate) m_title: String,
pub(crate) m_left_ref: String,
Expand All @@ -46,7 +46,7 @@ pub(crate) struct FusionResult {
pub(crate) m_right_intron_num: f32,
}

impl FusionResult {
impl<'s> FusionResult<'s> {
pub(crate) fn with_minimum() -> Self {
Self {
m_left_is_exon: false,
Expand Down Expand Up @@ -409,7 +409,7 @@ impl FusionResult {
total_ed as i32
}

pub(crate) fn add_match(&mut self, m: ReadMatch) {
pub(crate) fn add_match(&mut self, m: ReadMatch<'s>) {
self.m_matches.push(m);
}

Expand Down
92 changes: 84 additions & 8 deletions src/core/fusion_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,27 @@ use std::{
mem,
path::{Path, PathBuf},
process::exit,
sync::Arc,
sync::{Arc, OnceLock, RwLock}, time::Duration,
};

use rayon::{prelude::*, ThreadPoolBuilder};

use crate::{
aux::limited_bufreader::LimitedBufReader,
aux::{limited_bufreader::LimitedBufReader, pbar::prepare_pbar_force},
core::{
fasta_reader::FastaReader,
fastq_reader::{FastqReader, FastqReaderPair},
read::{SequenceReadCow, SequenceReadPairCow},
sescanner::{self, SingleEndScanner},
},
};

use super::{fusion::Fusion, pescanner::PairEndScanner};

pub(crate) type Error = Box<dyn error::Error + Send + Sync>;

pub(crate) static MULTI_CSV_MODE:OnceLock<bool> = OnceLock::new();

pub(crate) struct FusionScan {
m_fusion_file: String,
m_read1_file: String,
Expand Down Expand Up @@ -62,6 +67,39 @@ impl FusionScan {
log::debug!("Reading reference, {}", &ref_file);
m_reference.read_all();

// read input seq fastqs
log::info!("Reading input seqeunces...");
let (srp_vec, sr_vec) = if !self.m_read2_file.is_empty() {
let mut fqr = FastqReaderPair::from_paths(&self.m_read1_file, &self.m_read2_file)?;

let mut srp_vec = vec![];
while let Some(srp) = fqr.read() {
srp_vec.push(srp);
}

(Some(srp_vec), None)
} else {
let mut fqr = FastqReader::new(&self.m_read1_file, true)?;

let mut sr_vec = vec![];
while let Some(srp) = fqr.read() {
sr_vec.push(srp);
}

(None, Some(sr_vec))
};

// if let Some(s) = srp_vec {
// let a = s.iter().cloned().map(SequenceReadPairCow::Owned).collect::<Vec<_>>();
// let b = s.iter().map(SequenceReadPairCow::Borrowed).collect::<Vec<_>>();

// let c = b.iter().map(|e| {
// (SequenceReadCow::Owned(e.m_left.clone()), SequenceReadCow::Owned(e.m_right.clone()))
// }).collect::<Vec<_>>();

// exit(0);
// }

let mut scanner_m_ref = ScannerFastaReader::new(m_reference);

let fusion_csv_paths = self.get_fusion_csv_vec_from_input()?;
Expand All @@ -77,13 +115,32 @@ impl FusionScan {
)
};

log::info!(
"given csv count={}, parallel job count={}, inner_thread_num={}",
fusion_csv_paths.len(),
outer_thread_num,
inner_thread_num
);

let tp = ThreadPoolBuilder::new()
.num_threads(outer_thread_num)
.thread_name(|i| format!("MainThreadPool-{i}"))
.build()
.unwrap();

tp.install(|| {
log::info!(
"Multi csv input mode enabled. Suppress all logging messages while doing jobs in parallel.",
);

// lower log max level.
let log_max_level = log::max_level();
log::set_max_level(log::LevelFilter::Off);

let pb = prepare_pbar_force(fusion_csv_paths.len() as u64);
pb.set_message("Scanning fusions given in csv...");
pb.enable_steady_tick(Duration::from_millis(125));

let scan_result = tp.install(|| {
fusion_csv_paths
.into_iter()
.zip(html_file_paths.into_iter().zip(json_file_paths))
Expand All @@ -98,6 +155,7 @@ impl FusionScan {
html_file,
json_file,
inner_thread_num as i32,
srp_vec.as_ref().map(|v| v.as_slice()),
);

scanner_m_ref.scan_per_fusion_csv(pescanner)
Expand All @@ -109,16 +167,24 @@ impl FusionScan {
html_file,
json_file,
inner_thread_num as i32,
sr_vec.as_ref().map(|v| v.as_slice()),
);

scanner_m_ref.scan_per_fusion_csv(sescanner)
};

pb.inc(1);
res
})
.collect::<Result<Vec<bool>, Error>>()
.and_then(|vb| Ok(vb.into_iter().all(|e| e)))
})
});

pb.finish();

log::set_max_level(log_max_level);

scan_result
}

fn get_report_names_from_fusion_csvs(
Expand Down Expand Up @@ -223,6 +289,7 @@ impl FusionScan {
self.m_html_file,
self.m_json_file,
self.m_thread_num as i32,
None,
);

Ok(pescanner.scan()?)
Expand All @@ -234,6 +301,7 @@ impl FusionScan {
self.m_html_file,
self.m_json_file,
self.m_thread_num as i32,
None,
);

Ok(sescanner.scan()?)
Expand All @@ -248,8 +316,16 @@ impl FusionScan {
.to_str()
.unwrap()
{
"csv" => self.scan_single_csv(),
_ => self.scan_per_fusion_csv(),
"csv" => {
MULTI_CSV_MODE.get_or_init(|| false); // set MULTI_CSV_MODE flag.

self.scan_single_csv()
},
_ => {
MULTI_CSV_MODE.get_or_init(|| true);

self.scan_per_fusion_csv()
},
}
}
}
Expand Down Expand Up @@ -286,7 +362,7 @@ pub(crate) trait Scanner {
// fn drop_and_get_back_fasta_reader(self) -> FastaReader;
}

impl Scanner for PairEndScanner {
impl<'s> Scanner for PairEndScanner<'s> {
fn scan(&mut self) -> Result<bool, Error> {
self.scan()
}
Expand All @@ -300,7 +376,7 @@ impl Scanner for PairEndScanner {
// }
}

impl Scanner for SingleEndScanner {
impl<'s> Scanner for SingleEndScanner<'s> {
fn scan(&mut self) -> Result<bool, Error> {
self.scan()
}
Expand Down
Loading

0 comments on commit 0eb719f

Please sign in to comment.