Skip to content

Commit 5b58cb8

Browse files
committed
perf: search, searchset and replace sort chunks, not the raw data
- as chunks are contiguous, so its unnecessary to sort the raw data after multithreaded operations. Reduces sort from O(n log n) to O(n_chunks log n_chunks) - as number of chunks is equal to number of CPUs, no need to do parallel sort and just use simple sort
1 parent bcc84e1 commit 5b58cb8

File tree

3 files changed

+87
-85
lines changed

3 files changed

+87
-85
lines changed

src/cmd/replace.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -313,8 +313,6 @@ impl Args {
313313
rconfig: &Config,
314314
replacement: &[u8],
315315
) -> CliResult<()> {
316-
use rayon::slice::ParallelSliceMut;
317-
318316
let mut rdr = rconfig.reader()?;
319317
let headers = rdr.byte_headers()?.clone();
320318
let sel = rconfig.selection(&headers)?;
@@ -379,13 +377,13 @@ impl Args {
379377
drop(send);
380378

381379
// Collect all results from all chunks
382-
let mut all_results: Vec<ReplaceResult> = Vec::with_capacity(idx_count);
380+
let mut all_chunks: Vec<Vec<ReplaceResult>> = Vec::with_capacity(nchunks);
383381
for chunk_results in &recv {
384-
all_results.extend(chunk_results);
382+
all_chunks.push(chunk_results);
385383
}
386384

387-
// Sort by row_number to maintain original order
388-
all_results.par_sort_unstable_by_key(|r| r.row_number);
385+
// Sort chunks by first row_number to maintain original order
386+
all_chunks.sort_unstable_by_key(|chunk| chunk.first().map_or(0, |r| r.row_number));
389387

390388
// Setup writer
391389
let mut wtr = Config::new(self.flag_output.as_ref()).writer()?;
@@ -400,13 +398,15 @@ impl Args {
400398
#[cfg(any(feature = "feature_capable", feature = "lite"))]
401399
let mut _rows_with_matches_ctr: u64 = 0;
402400

403-
for result in all_results {
404-
total_match_ctr += result.match_count;
405-
#[cfg(any(feature = "feature_capable", feature = "lite"))]
406-
if result.had_match {
407-
_rows_with_matches_ctr += 1;
401+
for chunk in all_chunks {
402+
for result in chunk {
403+
total_match_ctr += result.match_count;
404+
#[cfg(any(feature = "feature_capable", feature = "lite"))]
405+
if result.had_match {
406+
_rows_with_matches_ctr += 1;
407+
}
408+
wtr.write_byte_record(&result.record)?;
408409
}
409-
wtr.write_byte_record(&result.record)?;
410410
}
411411

412412
wtr.flush()?;

src/cmd/search.rs

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,8 @@ fn should_collect_preview(
205205
/// Write a single result record to output
206206
/// Returns true if the record was written (for match counting)
207207
#[allow(clippy::too_many_arguments)]
208+
#[allow(clippy::fn_params_excessive_bools)]
209+
#[allow(clippy::inline_always)]
208210
#[inline(always)]
209211
fn write_result_record(
210212
record: &mut csv::ByteRecord,
@@ -528,8 +530,6 @@ impl Args {
528530
pattern: regex::bytes::Regex,
529531
rconfig: &Config,
530532
) -> CliResult<()> {
531-
use rayon::slice::ParallelSliceMut;
532-
533533
let mut rdr = rconfig.reader()?;
534534
let mut headers = rdr.byte_headers()?.clone();
535535
let sel = rconfig.selection(&headers)?;
@@ -607,17 +607,17 @@ impl Args {
607607
drop(send);
608608

609609
// Collect all results from all chunks
610-
let mut all_results: Vec<SearchResult> = Vec::with_capacity(idx_count);
610+
let mut all_chunks: Vec<Vec<SearchResult>> = Vec::with_capacity(nchunks);
611611
for chunk_results in &recv {
612-
all_results.extend(chunk_results);
612+
all_chunks.push(chunk_results);
613613
}
614614

615-
// Sort by row_number to maintain original order
616-
all_results.par_sort_unstable_by_key(|r| r.row_number);
615+
// Sort chunks by first row_number to maintain original order
616+
all_chunks.sort_unstable_by_key(|chunk| chunk.first().map_or(0, |r| r.row_number));
617617

618618
// Handle --quick mode: find earliest match
619619
if self.flag_quick {
620-
if let Some(first_match) = all_results.iter().find(|r| r.matched) {
620+
if let Some(first_match) = all_chunks.iter().flatten().find(|r| r.matched) {
621621
if !self.flag_quiet {
622622
eprintln!("{}", first_match.row_number);
623623
}
@@ -649,29 +649,31 @@ impl Args {
649649
json_wtr.write_all(b"[")?;
650650
}
651651

652-
for result in all_results {
653-
let mut record = result.record;
654-
let matched = result.matched;
652+
for chunk in all_chunks {
653+
for result in chunk {
654+
let mut record = result.record;
655+
let matched = result.matched;
655656

656-
if matched {
657-
match_ctr += 1;
658-
}
657+
if matched {
658+
match_ctr += 1;
659+
}
659660

660-
// Use helper to write record if needed
661-
write_result_record(
662-
&mut record,
663-
result.row_number,
664-
matched,
665-
flag_flag,
666-
flag_json,
667-
flag_no_headers,
668-
matches_only,
669-
&headers,
670-
&mut wtr,
671-
&mut json_wtr,
672-
&mut is_first,
673-
&mut matched_rows,
674-
)?;
661+
// Use helper to write record if needed
662+
write_result_record(
663+
&mut record,
664+
result.row_number,
665+
matched,
666+
flag_flag,
667+
flag_json,
668+
flag_no_headers,
669+
matches_only,
670+
&headers,
671+
&mut wtr,
672+
&mut json_wtr,
673+
&mut is_first,
674+
&mut matched_rows,
675+
)?;
676+
}
675677
}
676678

677679
// Use helper to finalize output

src/cmd/searchset.rs

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -410,8 +410,6 @@ impl Args {
410410
rconfig: &Config,
411411
regex_labels: &[String],
412412
) -> CliResult<()> {
413-
use rayon::slice::ParallelSliceMut;
414-
415413
let mut rdr = rconfig.reader()?;
416414
let mut headers = rdr.byte_headers()?.clone();
417415
let sel = rconfig.selection(&headers)?;
@@ -506,17 +504,17 @@ impl Args {
506504
drop(send);
507505

508506
// Collect all results from all chunks
509-
let mut all_results: Vec<SearchSetResult> = Vec::with_capacity(idx_count);
507+
let mut all_chunks: Vec<Vec<SearchSetResult>> = Vec::with_capacity(nchunks);
510508
for chunk_results in &recv {
511-
all_results.extend(chunk_results);
509+
all_chunks.push(chunk_results);
512510
}
513511

514-
// Sort by row_number to maintain original order
515-
all_results.par_sort_unstable_by_key(|r| r.row_number);
512+
// Sort chunks by first row_number to maintain original order
513+
all_chunks.sort_unstable_by_key(|chunk| chunk.first().map_or(0, |r| r.row_number));
516514

517515
// Handle --quick mode: find earliest match
518516
if self.flag_quick {
519-
if let Some(first_match) = all_results.iter().find(|r| r.matched) {
517+
if let Some(first_match) = all_chunks.iter().flatten().find(|r| r.matched) {
520518
if !self.flag_quiet {
521519
eprintln!("{}", first_match.row_number);
522520
}
@@ -546,51 +544,53 @@ impl Args {
546544
#[allow(unused_assignments)]
547545
let mut match_list_with_row = String::with_capacity(20);
548546

549-
for result in all_results {
550-
let mut record = result.record;
551-
let matched = result.matched;
552-
let match_list = result.match_list;
547+
for chunk in all_chunks {
548+
for result in chunk {
549+
let mut record = result.record;
550+
let matched = result.matched;
551+
let match_list = result.match_list;
553552

554-
if matched {
555-
match_row_ctr += 1;
556-
}
553+
if matched {
554+
match_row_ctr += 1;
555+
}
557556

558-
if do_match_list {
559-
let flag_column = if matched {
560-
itoa::Buffer::new()
561-
.format(result.row_number)
562-
.clone_into(&mut matched_rows);
563-
if self.flag_invert_match {
564-
matched_rows.as_bytes().to_vec()
565-
} else {
566-
total_matches += match_list.len() as u64;
567-
// builds format!("{matched_rows};{match_list}")
568-
// without intermediate Vec allocation
569-
match_list_with_row.clear();
570-
match_list_with_row.push_str(&matched_rows);
571-
match_list_with_row.push(';');
572-
for (idx, i) in match_list.iter().enumerate() {
573-
if idx > 0 {
574-
match_list_with_row.push(',');
557+
if do_match_list {
558+
let flag_column = if matched {
559+
itoa::Buffer::new()
560+
.format(result.row_number)
561+
.clone_into(&mut matched_rows);
562+
if self.flag_invert_match {
563+
matched_rows.as_bytes().to_vec()
564+
} else {
565+
total_matches += match_list.len() as u64;
566+
// builds format!("{matched_rows};{match_list}")
567+
// without intermediate Vec allocation
568+
match_list_with_row.clear();
569+
match_list_with_row.push_str(&matched_rows);
570+
match_list_with_row.push(';');
571+
for (idx, i) in match_list.iter().enumerate() {
572+
if idx > 0 {
573+
match_list_with_row.push(',');
574+
}
575+
match_list_with_row.push_str(&regex_labels[*i - 1]);
575576
}
576-
match_list_with_row.push_str(&regex_labels[*i - 1]);
577+
match_list_with_row.as_bytes().to_vec()
577578
}
578-
match_list_with_row.as_bytes().to_vec()
579-
}
580-
} else {
581-
b"0".to_vec()
582-
};
579+
} else {
580+
b"0".to_vec()
581+
};
583582

584-
if self.flag_flag_matches_only && !matched {
585-
if self.flag_unmatched_output.is_some() {
586-
unmatched_wtr.write_byte_record(&record)?;
583+
if self.flag_flag_matches_only && !matched {
584+
if self.flag_unmatched_output.is_some() {
585+
unmatched_wtr.write_byte_record(&record)?;
586+
}
587+
continue;
587588
}
588-
continue;
589+
record.push_field(&flag_column);
590+
wtr.write_byte_record(&record)?;
591+
} else if matched {
592+
wtr.write_byte_record(&record)?;
589593
}
590-
record.push_field(&flag_column);
591-
wtr.write_byte_record(&record)?;
592-
} else if matched {
593-
wtr.write_byte_record(&record)?;
594594
}
595595
}
596596

0 commit comments

Comments
 (0)