Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parralel computation test +optimization #3

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions Cargo.toml
Expand Up @@ -11,9 +11,12 @@ edition = "2021"
GSL = "6.0.0"

assert_approx_eq = "1.1.0"
lmdb = "0.8.0"
rayon = "1.7.0"


serde = { version = "1.0.104", features = ["derive"] }
serde-pickle = "1.1.1"
serde_json = "1.0.48"


Expand Down
104 changes: 62 additions & 42 deletions src/correlations.rs
@@ -1,5 +1,5 @@
use std::cmp::Ordering;

use rayon::prelude::*;
use crate::parser::parse_rows_with_names;
use crate::reader::BufferReader;
use crate::sorter::sort_write_to_file;
Expand All @@ -21,7 +21,7 @@ pub struct Pearson {
}

#[derive(PartialEq, Debug)]
struct Spearman {
pub struct Spearman {
n: usize,
degrees_of_freedom: f64,
}
Expand Down Expand Up @@ -129,60 +129,80 @@ impl<'a> Compute<'a> {

pub fn sorter(results: &mut [(f64, f64)]) {
//naive sorter

results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
}

pub fn compute(&self) -> std::io::Result<String> {
let mut corr_results: Vec<(String, f64, f64, i32)> = Vec::new();

let reader = BufferReader::new(self.dataset_path);

match reader {
Ok(mut buffer_read) => {
let mut n_string = String::new();

while let Some(val) = buffer_read.read_line(&mut n_string) {
if let Ok(array_new_val) = val {
let ty = parse_rows_with_names(
self.x_vals,
&array_new_val
.split(self.file_delimiter)
.collect::<Vec<&str>>(),
);
if ty.x_vals.len() < 4 || ty.y_vals.len() < 4 {
// minimum number of acceptable trait values for
// computing the correlations
continue;
}
use std::io::{self, BufRead, BufReader};

let chunk_size = 1000; // Set the desired chunk size here
let file = std::fs::File::open(&self.dataset_path)?;
let chunks: Vec<Vec<String>> = BufReader::new(file)
.lines()
.filter_map(|line| line.ok())
.collect::<Vec<String>>()
.chunks(chunk_size)
.map(|chunk| chunk.to_vec())

.collect();


let results:Vec<Vec<(String, f64, f64, i32)>> = chunks
.par_iter().map(|chunk|{

let mut chunk_results = Vec::new();
for line in chunk {

let ty = parse_rows_with_names(
self.x_vals,
&line
.split(self.file_delimiter)
.collect::<Vec<&str>>(),

);
if ty.x_vals.len() < 4 || ty.y_vals.len() < 4 {
// minimum number of acceptable trait values for
// computing the correlations
continue;
}

let (key_name, parsed_x_val, parsed_y_val) =
(ty.row_name, ty.x_vals, ty.y_vals);
(ty.row_name, ty.x_vals, ty.y_vals);

if self.method == CorrelationMethod::Pearson {
let (rho, p_val) = Pearson::new(parsed_x_val.len())
.correlate(&parsed_x_val, &parsed_y_val);
let (rho, p_val) = match self.method {
CorrelationMethod::Pearson => Pearson::new(parsed_x_val.len()).correlate(&parsed_x_val, &parsed_y_val),
_ => Spearman::new(parsed_x_val.len()).correlate(&parsed_x_val, &parsed_y_val),
};
chunk_results.push((key_name, rho, p_val, parsed_x_val.len() as i32));


corr_results.push((key_name, rho, p_val, parsed_x_val.len() as i32));
} else {
let (rho, p_val) = Spearman::new(parsed_x_val.len())
.correlate(&parsed_x_val, &parsed_y_val);

corr_results.push((key_name, rho, p_val, parsed_x_val.len() as i32));
}
}
}
}

chunk_results.sort_by(|a, b| {
b.1.abs().partial_cmp(&a.1.abs()).unwrap_or_else(|| {
Ordering::Less})});
if chunk_results.len() > 500 {
chunk_results.truncate(500);
};

Err(err) => panic!("an error ocurrexxxxxxxxxxxxxxd {:?}", err),
}
chunk_results

}).collect();

//naive implementation try extern sorting could save 3 seconds
let mut corr_results: Vec<(String, f64, f64, i32)> = results
.into_par_iter()
.flatten()
.collect();

corr_results.sort_by(|a, b| {
if corr_results.len() > 500 {
corr_results.truncate(500);
}
corr_results.sort_by(|a, b| {
b.1.abs().partial_cmp(&a.1.abs()).unwrap_or_else(|| {
Ordering::Less})});

sort_write_to_file(String::from(self.output_file), corr_results)
sort_write_to_file(String::from(self.output_file),
corr_results)
}
}
#[derive(PartialEq, Debug)]
Expand Down