From 632d4b64516dec611b333f71005c7de299357908 Mon Sep 17 00:00:00 2001 From: AdityaPandeyCN Date: Wed, 24 Sep 2025 14:03:34 +0530 Subject: [PATCH 01/13] Adding chromosome base file splitting Signed-off-by: AdityaPandeyCN clang format Signed-off-by: AdityaPandeyCN code organization Signed-off-by: AdityaPandeyCN clang changes Signed-off-by: AdityaPandeyCN Add region query benchmark (#8) * query performance Signed-off-by: AdityaPandeyCN * clang format Signed-off-by: AdityaPandeyCN * code organization Signed-off-by: AdityaPandeyCN * clang changes Signed-off-by: AdityaPandeyCN --------- Signed-off-by: AdityaPandeyCN add chromosome based file splitting Signed-off-by: AdityaPandeyCN delete example sam file Signed-off-by: AdityaPandeyCN clang changes Signed-off-by: AdityaPandeyCN test file changes Signed-off-by: AdityaPandeyCN clang changes Signed-off-by: AdityaPandeyCN --- benchmark/CMakeLists.txt | 35 ++++++++-- benchmark/benchmark_utils.h | 7 ++ benchmark/region_query_benchmark.cxx | 86 +++++++++++++++++++++++ benchmark/sam_to_ram_benchmark.cxx | 7 +- inc/ramcore/SamToNTuple.h | 7 +- src/ramcore/SamToNTuple.cxx | 69 ++++++++++++++++++ test/CMakeLists.txt | 3 +- test/chromosome_split_test.cxx | 100 +++++++++++++++++++++++++++ tools/samtoramntuple.cxx | 65 +++++++++-------- 9 files changed, 338 insertions(+), 41 deletions(-) create mode 100644 benchmark/benchmark_utils.h create mode 100644 benchmark/region_query_benchmark.cxx create mode 100644 test/chromosome_split_test.cxx diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt index c248384..84f1946 100644 --- a/benchmark/CMakeLists.txt +++ b/benchmark/CMakeLists.txt @@ -9,8 +9,19 @@ target_include_directories(sam_generator PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ) -target_link_libraries(sam_generator PRIVATE benchmark::benchmark) -add_dependencies(sam_generator benchmark::benchmark) +target_link_libraries(sam_generator PRIVATE + benchmark::benchmark +) + +add_library(ramtools_views STATIC + ${CMAKE_SOURCE_DIR}/tools/ramview.cxx + ${CMAKE_SOURCE_DIR}/tools/ramntupleview.cxx +) + +target_link_libraries(ramtools_views PRIVATE + ramcore + ROOT::TreePlayer +) ROOT_EXECUTABLE(sam_to_ram_benchmark sam_to_ram_benchmark.cxx @@ -28,14 +39,30 @@ ROOT_EXECUTABLE(conversion_time_benchmark sam_generator ) -install(TARGETS sam_to_ram_benchmark conversion_time_benchmark +ROOT_EXECUTABLE(region_query_benchmark + region_query_benchmark.cxx + LIBRARIES + benchmark::benchmark + ramcore + sam_generator + ramtools_views +) + +install(TARGETS sam_to_ram_benchmark conversion_time_benchmark region_query_benchmark RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} ) add_custom_target(benchmark + COMMAND ${CMAKE_COMMAND} -E echo "=== SAM to RAM Benchmark ===" COMMAND sam_to_ram_benchmark + COMMAND ${CMAKE_COMMAND} -E echo "" + COMMAND ${CMAKE_COMMAND} -E echo "=== Conversion Time Benchmark ===" COMMAND conversion_time_benchmark - DEPENDS sam_to_ram_benchmark conversion_time_benchmark + COMMAND ${CMAKE_COMMAND} -E echo "" + COMMAND ${CMAKE_COMMAND} -E echo "=== Region Query Benchmark ===" + COMMAND region_query_benchmark + DEPENDS sam_to_ram_benchmark conversion_time_benchmark region_query_benchmark WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR} + COMMENT "Running all RAM tools benchmarks" ) diff --git a/benchmark/benchmark_utils.h b/benchmark/benchmark_utils.h new file mode 100644 index 0000000..1a712c7 --- /dev/null +++ b/benchmark/benchmark_utils.h @@ -0,0 +1,7 @@ +#pragma once + +#ifdef _WIN32 +#define NULL_DEVICE "NUL" +#else +#define NULL_DEVICE "/dev/null" +#endif diff --git a/benchmark/region_query_benchmark.cxx b/benchmark/region_query_benchmark.cxx new file mode 100644 index 0000000..3a2d6d5 --- /dev/null +++ b/benchmark/region_query_benchmark.cxx @@ -0,0 +1,86 @@ +#include +#include "generate_sam_benchmark.h" +#include "benchmark_utils.h" +#include "ramcore/SamToTTree.h" +#include "ramcore/SamToNTuple.h" +#include +#include + +void ramview(const char *file, const char *query, bool cache = true, bool perfstats = false, + const char *perfstatsfilename = "perf.root"); +void ramntupleview(const char *file, const char *query, bool cache = true, bool perfstats = false, + const char *perfstatsfilename = "perf.root"); + +class RegionQueryFixture : public benchmark::Fixture { +public: + void SetUp(const benchmark::State &state) override + { + num_reads_ = static_cast(state.range(0)); + sam_file_ = "region_query_test_" + std::to_string(num_reads_) + ".sam"; + + GenerateSAMFile(sam_file_, num_reads_); + } + + void TearDown(const benchmark::State &) override { std::remove(sam_file_.c_str()); } + +protected: + int num_reads_; + std::string sam_file_; + static constexpr const char *region_ = "chr1:1-100000000"; + + void suppress_output() { freopen(NULL_DEVICE, "w", stdout); } + + void restore_output() { freopen("/dev/tty", "w", stdout); } +}; + +BENCHMARK_DEFINE_F(RegionQueryFixture, TTree)(benchmark::State &state) +{ + std::string root_file = "ttree_" + std::to_string(num_reads_) + ".root"; + + suppress_output(); + samtoram(sam_file_.c_str(), root_file.c_str(), true, true, true, 1, 0); + restore_output(); + + for (auto _ : state) { + suppress_output(); + ramview(root_file.c_str(), region_, true, false, "perf.root"); + restore_output(); + } + + std::remove(root_file.c_str()); + + state.counters["reads_per_sec"] = benchmark::Counter(num_reads_, benchmark::Counter::kIsRate); +} + +BENCHMARK_DEFINE_F(RegionQueryFixture, RNTuple)(benchmark::State &state) +{ + std::string root_file = "rntuple_" + std::to_string(num_reads_) + ".root"; + + suppress_output(); + samtoramntuple(sam_file_.c_str(), root_file.c_str(), true, true, true, 505, 0); + restore_output(); + + for (auto _ : state) { + suppress_output(); + ramntupleview(root_file.c_str(), region_, true, false, "perf.root"); + restore_output(); + } + + std::remove(root_file.c_str()); + + state.counters["reads_per_sec"] = benchmark::Counter(num_reads_, benchmark::Counter::kIsRate); +} + +BENCHMARK_REGISTER_F(RegionQueryFixture, TTree) + ->Args({1000}) + ->Args({10000}) + ->Args({100000}) + ->Unit(benchmark::kMillisecond); + +BENCHMARK_REGISTER_F(RegionQueryFixture, RNTuple) + ->Args({1000}) + ->Args({10000}) + ->Args({100000}) + ->Unit(benchmark::kMillisecond); + +BENCHMARK_MAIN(); diff --git a/benchmark/sam_to_ram_benchmark.cxx b/benchmark/sam_to_ram_benchmark.cxx index 7baa1e2..7413c98 100644 --- a/benchmark/sam_to_ram_benchmark.cxx +++ b/benchmark/sam_to_ram_benchmark.cxx @@ -1,5 +1,6 @@ #include #include "generate_sam_benchmark.h" +#include "benchmark_utils.h" #include "ramcore/SamToTTree.h" #include "ramcore/SamToNTuple.h" #include @@ -7,12 +8,6 @@ #include #include -#ifdef _WIN32 -#define NULL_DEVICE "NUL" -#else -#define NULL_DEVICE "/dev/null" -#endif - static void BM_SamToRamComparison(benchmark::State &state) { int num_reads = state.range(0); diff --git a/inc/ramcore/SamToNTuple.h b/inc/ramcore/SamToNTuple.h index 429c2fa..2149255 100644 --- a/inc/ramcore/SamToNTuple.h +++ b/inc/ramcore/SamToNTuple.h @@ -1,7 +1,8 @@ #ifndef RAMCORE_SAMTONTUPLE_H #define RAMCORE_SAMTONTUPLE_H -#include +#include +#include void samtoramntuple(const char *datafile, const char *treefile, @@ -9,5 +10,7 @@ void samtoramntuple(const char *datafile, int compression_algorithm, uint32_t quality_policy); -#endif // RAMCORE_SAMTONTUPLE_H +void samtoramntuple_split_by_chromosome(const char *datafile, const char *output_prefix, int compression_algorithm, + uint32_t quality_policy); +#endif diff --git a/src/ramcore/SamToNTuple.cxx b/src/ramcore/SamToNTuple.cxx index 067af1d..e979f3f 100644 --- a/src/ramcore/SamToNTuple.cxx +++ b/src/ramcore/SamToNTuple.cxx @@ -10,6 +10,12 @@ #include #include +#include +#include +#include +#include +#include + void samtoramntuple(const char *datafile, const char *treefile, bool index, bool split, bool cache, @@ -120,3 +126,66 @@ void samtoramntuple(const char *datafile, stopwatch.Print(); } +void samtoramntuple_split_by_chromosome(const char *datafile, const char *output_prefix, int compression_algorithm, + uint32_t quality_policy) +{ + std::ifstream input(datafile); + if (!input) { + std::cerr << "Error: Cannot open " << datafile << std::endl; + return; + } + + std::vector headers; + std::map> chr_files; + std::map chr_temp_filenames; + std::string line; + + while (std::getline(input, line)) { + if (line.empty()) + continue; + + if (line[0] == '@') { + headers.push_back(line); + continue; + } + + size_t pos = line.find('\t'); + if (pos == std::string::npos) + continue; + pos = line.find('\t', pos + 1); + if (pos == std::string::npos) + continue; + + size_t end_pos = line.find('\t', pos + 1); + if (end_pos == std::string::npos) + continue; + + std::string rname = line.substr(pos + 1, end_pos - pos - 1); + if (rname == "*") + continue; + + if (chr_files.find(rname) == chr_files.end()) { + std::string temp_filename = std::string(output_prefix) + "_" + rname + ".tmp.sam"; + chr_temp_filenames[rname] = temp_filename; + chr_files[rname] = std::make_unique(temp_filename); + + for (const auto &header : headers) { + *(chr_files[rname]) << header << "\n"; + } + } + + *(chr_files[rname]) << line << "\n"; + } + + input.close(); + for (auto &[chr, file] : chr_files) { + file->close(); + } + + for (const auto &[chr, temp_filename] : chr_temp_filenames) { + std::string output_filename = std::string(output_prefix) + "_" + chr + ".root"; + samtoramntuple(temp_filename.c_str(), output_filename.c_str(), false, false, false, compression_algorithm, + quality_policy); + std::remove(temp_filename.c_str()); + } +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 1cddd96..0920747 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -24,8 +24,9 @@ function(add_ramcore_test test_name) endfunction() add_ramcore_test(ramcoretests ramcoretests.cxx) +add_ramcore_test(chromosome_split_test chromosome_split_test.cxx) -install(TARGETS ramcoretests +install(TARGETS ramcoretests chromosome_split_test RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} ) diff --git a/test/chromosome_split_test.cxx b/test/chromosome_split_test.cxx new file mode 100644 index 0000000..3d3df46 --- /dev/null +++ b/test/chromosome_split_test.cxx @@ -0,0 +1,100 @@ +#include +#include "ramcore/SamToNTuple.h" +#include "rntuple/RAMNTupleRecord.h" +#include "generate_sam_benchmark.h" +#include +#include +#include + +class ChromosomeSplitTest : public ::testing::Test { +protected: + void SetUp() override + { + if (!std::filesystem::exists("test.sam")) { + GenerateSAMFile("test.sam", 100); + } + CleanupTestFiles(); + } + + void TearDown() override { CleanupTestFiles(); } + + void CleanupTestFiles() + { + std::remove("test_regular.root"); + for (const auto &entry : std::filesystem::directory_iterator(".")) { + std::string filename = entry.path().filename().string(); + if (filename.find("test_split_") == 0 && filename.find(".root") != std::string::npos) { + std::remove(filename.c_str()); + } + } + } +}; + +TEST_F(ChromosomeSplitTest, NoDataLoss) +{ + samtoramntuple("test.sam", "test_regular.root", false, true, true, 505, 1); + auto regularReader = ROOT::Experimental::RNTupleReader::Open("RAM", "test_regular.root"); + Long64_t totalEntries = regularReader->GetNEntries(); + + samtoramntuple_split_by_chromosome("test.sam", "test_split", 505, 1); + + Long64_t splitEntriesSum = 0; + for (const auto &entry : std::filesystem::directory_iterator(".")) { + std::string filename = entry.path().filename().string(); + if (filename.find("test_split_") == 0 && filename.find(".root") != std::string::npos) { + auto reader = ROOT::Experimental::RNTupleReader::Open("RAM", filename); + if (reader) { + splitEntriesSum += reader->GetNEntries(); + } + } + } + + EXPECT_EQ(totalEntries, splitEntriesSum); +} + +TEST_F(ChromosomeSplitTest, CorrectChromosomeAssignment) +{ + samtoramntuple_split_by_chromosome("test.sam", "test_split", 505, 1); + + for (const auto &entry : std::filesystem::directory_iterator(".")) { + std::string filename = entry.path().filename().string(); + if (filename.find("test_split_") == 0 && filename.find(".root") != std::string::npos) { + size_t pos = filename.find("test_split_"); + size_t end = filename.find(".root"); + std::string expectedChr = filename.substr(pos + 11, end - pos - 11); + + auto reader = ROOT::Experimental::RNTupleReader::Open("RAM", filename); + ASSERT_NE(reader, nullptr); + + auto viewRecord = reader->GetView("record"); + + for (auto i : reader->GetEntryRange()) { + const auto &record = viewRecord(i); + std::string actualChr = record.GetRNAME(); + EXPECT_EQ(expectedChr, actualChr) << "Wrong chromosome in " << filename << " at entry " << i; + } + } + } +} + +TEST_F(ChromosomeSplitTest, MetadataPresent) +{ + samtoramntuple_split_by_chromosome("test.sam", "test_split", 505, 1); + + int filesChecked = 0; + for (const auto &entry : std::filesystem::directory_iterator(".")) { + std::string filename = entry.path().filename().string(); + if (filename.find("test_split_") == 0 && filename.find(".root") != std::string::npos) { + auto metaReader = ROOT::Experimental::RNTupleReader::Open("METADATA", filename); + EXPECT_NE(metaReader, nullptr) << "Missing METADATA in " << filename; + + if (metaReader) { + EXPECT_GT(metaReader->GetNEntries(), 0); + } + + filesChecked++; + } + } + + EXPECT_GT(filesChecked, 0); +} diff --git a/tools/samtoramntuple.cxx b/tools/samtoramntuple.cxx index 242f7cf..f7e6351 100644 --- a/tools/samtoramntuple.cxx +++ b/tools/samtoramntuple.cxx @@ -6,48 +6,57 @@ int main(int argc, char* argv[]) { if (argc < 2) { - std::cout << "Usage: " << argv[0] << " [output.ram]\n"; - std::cout << "Options:\n"; - std::cout << " -noindex Disable indexing\n"; - std::cout << " -illumina Use Illumina quality binning\n"; - std::cout << " -dropqual Drop quality scores\n"; - return 1; + std::cout << "Usage: " << argv[0] << " [output]\n"; + std::cout << "Options:\n"; + std::cout << " -split Split by chromosome\n"; + std::cout << " -noindex Disable indexing\n"; + std::cout << " -illumina Use Illumina quality binning\n"; + std::cout << " -dropqual Drop quality scores\n"; + return 1; } const char* input = argv[1]; const char* output = nullptr; - - std::string outfile; - if (argc > 2 && argv[2][0] != '-') { - output = argv[2]; - } else { - outfile = std::string(input); - size_t pos = outfile.rfind(".sam"); - if (pos != std::string::npos) { - outfile.replace(pos, 4, ".ram"); - } else { - outfile += ".ram"; - } - output = outfile.c_str(); - } - - + + bool do_split = false; bool do_index = true; uint32_t quality_mode = RAMNTupleRecord::kPhred33; for (int i = 2; i < argc; i++) { std::string arg = argv[i]; - if (arg == "-noindex") { - do_index = false; + if (arg == "-split") { + do_split = true; + } else if (arg == "-noindex") { + do_index = false; } else if (arg == "-illumina") { - quality_mode = RAMNTupleRecord::kIlluminaBinning; + quality_mode = RAMNTupleRecord::kIlluminaBinning; } else if (arg == "-dropqual") { - quality_mode = RAMNTupleRecord::kDrop; + quality_mode = RAMNTupleRecord::kDrop; + } else if (arg[0] != '-') { + output = argv[i]; } } - + + std::string outfile; + if (!output) { + outfile = std::string(input); + size_t pos = outfile.rfind(".sam"); + if (pos != std::string::npos) { + outfile.erase(pos); + } + output = outfile.c_str(); + } + try { - samtoramntuple(input, output, do_index, true, true, 505, quality_mode); + if (do_split) { + samtoramntuple_split_by_chromosome(input, output, 505, quality_mode); + } else { + std::string ramfile = std::string(output); + if (ramfile.find(".root") == std::string::npos && ramfile.find(".ram") == std::string::npos) { + ramfile += ".ram"; + } + samtoramntuple(input, ramfile.c_str(), do_index, true, true, 505, quality_mode); + } } catch (const std::exception& e) { std::cerr << "Error: " << e.what() << std::endl; return 1; From 8927555723750498fe7e035ae286dcda4975b47a Mon Sep 17 00:00:00 2001 From: AdityaPandeyCN Date: Sun, 5 Oct 2025 15:05:59 +0530 Subject: [PATCH 02/13] added bam vs ram file benchmark for chromosome based file splitting Signed-off-by: AdityaPandeyCN --- benchmark/CMakeLists.txt | 15 ++- benchmark/chromosome_split_benchmark.cxx | 113 +++++++++++++++++++++++ 2 files changed, 126 insertions(+), 2 deletions(-) create mode 100644 benchmark/chromosome_split_benchmark.cxx diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt index 84f1946..377ba4f 100644 --- a/benchmark/CMakeLists.txt +++ b/benchmark/CMakeLists.txt @@ -48,7 +48,15 @@ ROOT_EXECUTABLE(region_query_benchmark ramtools_views ) -install(TARGETS sam_to_ram_benchmark conversion_time_benchmark region_query_benchmark +ROOT_EXECUTABLE(chromosome_split_benchmark + chromosome_split_benchmark.cxx + LIBRARIES + benchmark::benchmark + ramcore + sam_generator +) + +install(TARGETS sam_to_ram_benchmark conversion_time_benchmark region_query_benchmark chromosome_split_benchmark RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} ) @@ -61,7 +69,10 @@ add_custom_target(benchmark COMMAND ${CMAKE_COMMAND} -E echo "" COMMAND ${CMAKE_COMMAND} -E echo "=== Region Query Benchmark ===" COMMAND region_query_benchmark - DEPENDS sam_to_ram_benchmark conversion_time_benchmark region_query_benchmark + COMMAND ${CMAKE_COMMAND} -E echo "" + COMMAND ${CMAKE_COMMAND} -E echo "=== Chromosome Split Benchmark ===" + COMMAND chromosome_split_benchmark + DEPENDS sam_to_ram_benchmark conversion_time_benchmark region_query_benchmark chromosome_split_benchmark WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR} COMMENT "Running all RAM tools benchmarks" ) diff --git a/benchmark/chromosome_split_benchmark.cxx b/benchmark/chromosome_split_benchmark.cxx new file mode 100644 index 0000000..98c1f85 --- /dev/null +++ b/benchmark/chromosome_split_benchmark.cxx @@ -0,0 +1,113 @@ +#include +#include "ramcore/SamToNTuple.h" +#include "generate_sam_benchmark.h" +#include +#include +#include +#include +#include +#include + +static void CleanupFiles(const std::string& pattern) { + for (const auto& entry : std::filesystem::directory_iterator(".")) { + if (entry.path().filename().string().find(pattern) != std::string::npos) { + std::remove(entry.path().c_str()); + } + } +} + +static size_t GetTotalFileSize(const std::string& pattern) { + size_t total = 0; + for (const auto& entry : std::filesystem::directory_iterator(".")) { + if (entry.path().filename().string().find(pattern) != std::string::npos) { + total += std::filesystem::file_size(entry.path()); + } + } + return total; +} + +static std::vector GetChromosomes(const std::string& sam_file) { + std::vector chroms; + std::ifstream f(sam_file); + std::string line; + + while (std::getline(f, line) && line[0] == '@') { + if (line.find("@SQ\tSN:") == 0) { + size_t start = 7; + size_t end = line.find('\t', start); + chroms.push_back(line.substr(start, end - start)); + } + } + return chroms; +} + +static void BM_SamtoolsSplit(benchmark::State& state) { + int num_reads = state.range(0); + std::string sam_file = "bench_st_" + std::to_string(num_reads) + ".sam"; + std::string bam_file = "bench_st_" + std::to_string(num_reads) + ".bam"; + std::string sorted_bam = "bench_st_" + std::to_string(num_reads) + "_sorted.bam"; + + GenerateSAMFile(sam_file, num_reads); + + // Convert SAM to BAM + system(("samtools view -bS " + sam_file + " -o " + bam_file + " 2>/dev/null").c_str()); + + // Sort BAM + system(("samtools sort " + bam_file + " -o " + sorted_bam + " 2>/dev/null").c_str()); + + // Index BAM + system(("samtools index " + sorted_bam + " 2>/dev/null").c_str()); + + auto chromosomes = GetChromosomes(sam_file); + + for (auto _ : state) { + // Extract each chromosome + for (const auto& chr : chromosomes) { + std::string cmd = "samtools view -b " + sorted_bam + " " + chr + " > bench_st_" + chr + ".bam 2>/dev/null"; + system(cmd.c_str()); + } + + state.counters["size_MB"] = GetTotalFileSize("bench_st_chr") / (1024.0 * 1024.0); + CleanupFiles("bench_st_chr"); + } + + std::remove(sam_file.c_str()); + std::remove(bam_file.c_str()); + std::remove(sorted_bam.c_str()); + std::remove((sorted_bam + ".bai").c_str()); + state.counters["reads/s"] = benchmark::Counter(num_reads, benchmark::Counter::kIsRate); +} + +static void BM_ChromosomeSplit(benchmark::State& state) { + int num_reads = state.range(0); + std::string sam_file = "bench_split_" + std::to_string(num_reads) + ".sam"; + + GenerateSAMFile(sam_file, num_reads); + + FILE* old_stdout = stdout; + FILE* old_stderr = stderr; + + for (auto _ : state) { + stdout = fopen("/dev/null", "w"); + stderr = fopen("/dev/null", "w"); + + samtoramntuple_split_by_chromosome(sam_file.c_str(), "bench_split_out", 505, 1); + + fclose(stdout); + fclose(stderr); + stdout = old_stdout; + stderr = old_stderr; + + state.counters["size_MB"] = GetTotalFileSize("bench_split_out_") / (1024.0 * 1024.0); + CleanupFiles("bench_split_out_"); + } + + std::remove(sam_file.c_str()); + state.counters["reads/s"] = benchmark::Counter(num_reads, benchmark::Counter::kIsRate); +} + +BENCHMARK(BM_SamtoolsSplit)->Arg(100000)->Arg(500000)->Arg(1000000)->Unit(benchmark::kMillisecond); +BENCHMARK(BM_ChromosomeSplit)->Arg(100000)->Arg(500000)->Arg(1000000)->Unit(benchmark::kMillisecond); + +BENCHMARK_MAIN(); + From 0ad3b3b5f4b4b41766291895106c6487a709624b Mon Sep 17 00:00:00 2001 From: AdityaPandeyCN Date: Sun, 5 Oct 2025 15:06:47 +0530 Subject: [PATCH 03/13] clang format changes Signed-off-by: AdityaPandeyCN --- benchmark/chromosome_split_benchmark.cxx | 182 ++++++++++++----------- 1 file changed, 93 insertions(+), 89 deletions(-) diff --git a/benchmark/chromosome_split_benchmark.cxx b/benchmark/chromosome_split_benchmark.cxx index 98c1f85..e6274cc 100644 --- a/benchmark/chromosome_split_benchmark.cxx +++ b/benchmark/chromosome_split_benchmark.cxx @@ -8,106 +8,110 @@ #include #include -static void CleanupFiles(const std::string& pattern) { - for (const auto& entry : std::filesystem::directory_iterator(".")) { - if (entry.path().filename().string().find(pattern) != std::string::npos) { - std::remove(entry.path().c_str()); - } - } +static void CleanupFiles(const std::string &pattern) +{ + for (const auto &entry : std::filesystem::directory_iterator(".")) { + if (entry.path().filename().string().find(pattern) != std::string::npos) { + std::remove(entry.path().c_str()); + } + } } -static size_t GetTotalFileSize(const std::string& pattern) { - size_t total = 0; - for (const auto& entry : std::filesystem::directory_iterator(".")) { - if (entry.path().filename().string().find(pattern) != std::string::npos) { - total += std::filesystem::file_size(entry.path()); - } - } - return total; +static size_t GetTotalFileSize(const std::string &pattern) +{ + size_t total = 0; + for (const auto &entry : std::filesystem::directory_iterator(".")) { + if (entry.path().filename().string().find(pattern) != std::string::npos) { + total += std::filesystem::file_size(entry.path()); + } + } + return total; } -static std::vector GetChromosomes(const std::string& sam_file) { - std::vector chroms; - std::ifstream f(sam_file); - std::string line; - - while (std::getline(f, line) && line[0] == '@') { - if (line.find("@SQ\tSN:") == 0) { - size_t start = 7; - size_t end = line.find('\t', start); - chroms.push_back(line.substr(start, end - start)); - } - } - return chroms; +static std::vector GetChromosomes(const std::string &sam_file) +{ + std::vector chroms; + std::ifstream f(sam_file); + std::string line; + + while (std::getline(f, line) && line[0] == '@') { + if (line.find("@SQ\tSN:") == 0) { + size_t start = 7; + size_t end = line.find('\t', start); + chroms.push_back(line.substr(start, end - start)); + } + } + return chroms; } -static void BM_SamtoolsSplit(benchmark::State& state) { - int num_reads = state.range(0); - std::string sam_file = "bench_st_" + std::to_string(num_reads) + ".sam"; - std::string bam_file = "bench_st_" + std::to_string(num_reads) + ".bam"; - std::string sorted_bam = "bench_st_" + std::to_string(num_reads) + "_sorted.bam"; - - GenerateSAMFile(sam_file, num_reads); - - // Convert SAM to BAM - system(("samtools view -bS " + sam_file + " -o " + bam_file + " 2>/dev/null").c_str()); - - // Sort BAM - system(("samtools sort " + bam_file + " -o " + sorted_bam + " 2>/dev/null").c_str()); - - // Index BAM - system(("samtools index " + sorted_bam + " 2>/dev/null").c_str()); - - auto chromosomes = GetChromosomes(sam_file); - - for (auto _ : state) { - // Extract each chromosome - for (const auto& chr : chromosomes) { - std::string cmd = "samtools view -b " + sorted_bam + " " + chr + " > bench_st_" + chr + ".bam 2>/dev/null"; - system(cmd.c_str()); - } - - state.counters["size_MB"] = GetTotalFileSize("bench_st_chr") / (1024.0 * 1024.0); - CleanupFiles("bench_st_chr"); - } - - std::remove(sam_file.c_str()); - std::remove(bam_file.c_str()); - std::remove(sorted_bam.c_str()); - std::remove((sorted_bam + ".bai").c_str()); - state.counters["reads/s"] = benchmark::Counter(num_reads, benchmark::Counter::kIsRate); +static void BM_SamtoolsSplit(benchmark::State &state) +{ + int num_reads = state.range(0); + std::string sam_file = "bench_st_" + std::to_string(num_reads) + ".sam"; + std::string bam_file = "bench_st_" + std::to_string(num_reads) + ".bam"; + std::string sorted_bam = "bench_st_" + std::to_string(num_reads) + "_sorted.bam"; + + GenerateSAMFile(sam_file, num_reads); + + // Convert SAM to BAM + system(("samtools view -bS " + sam_file + " -o " + bam_file + " 2>/dev/null").c_str()); + + // Sort BAM + system(("samtools sort " + bam_file + " -o " + sorted_bam + " 2>/dev/null").c_str()); + + // Index BAM + system(("samtools index " + sorted_bam + " 2>/dev/null").c_str()); + + auto chromosomes = GetChromosomes(sam_file); + + for (auto _ : state) { + // Extract each chromosome + for (const auto &chr : chromosomes) { + std::string cmd = "samtools view -b " + sorted_bam + " " + chr + " > bench_st_" + chr + ".bam 2>/dev/null"; + system(cmd.c_str()); + } + + state.counters["size_MB"] = GetTotalFileSize("bench_st_chr") / (1024.0 * 1024.0); + CleanupFiles("bench_st_chr"); + } + + std::remove(sam_file.c_str()); + std::remove(bam_file.c_str()); + std::remove(sorted_bam.c_str()); + std::remove((sorted_bam + ".bai").c_str()); + state.counters["reads/s"] = benchmark::Counter(num_reads, benchmark::Counter::kIsRate); } -static void BM_ChromosomeSplit(benchmark::State& state) { - int num_reads = state.range(0); - std::string sam_file = "bench_split_" + std::to_string(num_reads) + ".sam"; - - GenerateSAMFile(sam_file, num_reads); - - FILE* old_stdout = stdout; - FILE* old_stderr = stderr; - - for (auto _ : state) { - stdout = fopen("/dev/null", "w"); - stderr = fopen("/dev/null", "w"); - - samtoramntuple_split_by_chromosome(sam_file.c_str(), "bench_split_out", 505, 1); - - fclose(stdout); - fclose(stderr); - stdout = old_stdout; - stderr = old_stderr; - - state.counters["size_MB"] = GetTotalFileSize("bench_split_out_") / (1024.0 * 1024.0); - CleanupFiles("bench_split_out_"); - } - - std::remove(sam_file.c_str()); - state.counters["reads/s"] = benchmark::Counter(num_reads, benchmark::Counter::kIsRate); +static void BM_ChromosomeSplit(benchmark::State &state) +{ + int num_reads = state.range(0); + std::string sam_file = "bench_split_" + std::to_string(num_reads) + ".sam"; + + GenerateSAMFile(sam_file, num_reads); + + FILE *old_stdout = stdout; + FILE *old_stderr = stderr; + + for (auto _ : state) { + stdout = fopen("/dev/null", "w"); + stderr = fopen("/dev/null", "w"); + + samtoramntuple_split_by_chromosome(sam_file.c_str(), "bench_split_out", 505, 1); + + fclose(stdout); + fclose(stderr); + stdout = old_stdout; + stderr = old_stderr; + + state.counters["size_MB"] = GetTotalFileSize("bench_split_out_") / (1024.0 * 1024.0); + CleanupFiles("bench_split_out_"); + } + + std::remove(sam_file.c_str()); + state.counters["reads/s"] = benchmark::Counter(num_reads, benchmark::Counter::kIsRate); } BENCHMARK(BM_SamtoolsSplit)->Arg(100000)->Arg(500000)->Arg(1000000)->Unit(benchmark::kMillisecond); BENCHMARK(BM_ChromosomeSplit)->Arg(100000)->Arg(500000)->Arg(1000000)->Unit(benchmark::kMillisecond); BENCHMARK_MAIN(); - From 576e4baad9e0352e20bed412b8344b200cce57ce Mon Sep 17 00:00:00 2001 From: AdityaPandeyCN Date: Tue, 7 Oct 2025 11:38:57 +0530 Subject: [PATCH 04/13] made benchmarks fair Signed-off-by: AdityaPandeyCN --- benchmark/chromosome_split_benchmark.cxx | 28 +++++++++++++----------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/benchmark/chromosome_split_benchmark.cxx b/benchmark/chromosome_split_benchmark.cxx index e6274cc..8dfb63b 100644 --- a/benchmark/chromosome_split_benchmark.cxx +++ b/benchmark/chromosome_split_benchmark.cxx @@ -48,23 +48,23 @@ static void BM_SamtoolsSplit(benchmark::State &state) { int num_reads = state.range(0); std::string sam_file = "bench_st_" + std::to_string(num_reads) + ".sam"; - std::string bam_file = "bench_st_" + std::to_string(num_reads) + ".bam"; - std::string sorted_bam = "bench_st_" + std::to_string(num_reads) + "_sorted.bam"; GenerateSAMFile(sam_file, num_reads); + auto chromosomes = GetChromosomes(sam_file); - // Convert SAM to BAM - system(("samtools view -bS " + sam_file + " -o " + bam_file + " 2>/dev/null").c_str()); + for (auto _ : state) { + std::string bam_file = "bench_st_tmp.bam"; + std::string sorted_bam = "bench_st_sorted.bam"; - // Sort BAM - system(("samtools sort " + bam_file + " -o " + sorted_bam + " 2>/dev/null").c_str()); + // Convert SAM to BAM + system(("samtools view -bS " + sam_file + " -o " + bam_file + " 2>/dev/null").c_str()); - // Index BAM - system(("samtools index " + sorted_bam + " 2>/dev/null").c_str()); + // Sort BAM + system(("samtools sort " + bam_file + " -o " + sorted_bam + " 2>/dev/null").c_str()); - auto chromosomes = GetChromosomes(sam_file); + // Index BAM + system(("samtools index " + sorted_bam + " 2>/dev/null").c_str()); - for (auto _ : state) { // Extract each chromosome for (const auto &chr : chromosomes) { std::string cmd = "samtools view -b " + sorted_bam + " " + chr + " > bench_st_" + chr + ".bam 2>/dev/null"; @@ -72,13 +72,14 @@ static void BM_SamtoolsSplit(benchmark::State &state) } state.counters["size_MB"] = GetTotalFileSize("bench_st_chr") / (1024.0 * 1024.0); + CleanupFiles("bench_st_chr"); + std::remove(bam_file.c_str()); + std::remove(sorted_bam.c_str()); + std::remove((sorted_bam + ".bai").c_str()); } std::remove(sam_file.c_str()); - std::remove(bam_file.c_str()); - std::remove(sorted_bam.c_str()); - std::remove((sorted_bam + ".bai").c_str()); state.counters["reads/s"] = benchmark::Counter(num_reads, benchmark::Counter::kIsRate); } @@ -115,3 +116,4 @@ BENCHMARK(BM_SamtoolsSplit)->Arg(100000)->Arg(500000)->Arg(1000000)->Unit(benchm BENCHMARK(BM_ChromosomeSplit)->Arg(100000)->Arg(500000)->Arg(1000000)->Unit(benchmark::kMillisecond); BENCHMARK_MAIN(); + From 3948c89d5a795b1ad0b554b9f2f0130131ba24e0 Mon Sep 17 00:00:00 2001 From: AdityaPandeyCN Date: Tue, 7 Oct 2025 11:39:18 +0530 Subject: [PATCH 05/13] clang changes Signed-off-by: AdityaPandeyCN --- benchmark/chromosome_split_benchmark.cxx | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/benchmark/chromosome_split_benchmark.cxx b/benchmark/chromosome_split_benchmark.cxx index 8dfb63b..25524dd 100644 --- a/benchmark/chromosome_split_benchmark.cxx +++ b/benchmark/chromosome_split_benchmark.cxx @@ -72,7 +72,7 @@ static void BM_SamtoolsSplit(benchmark::State &state) } state.counters["size_MB"] = GetTotalFileSize("bench_st_chr") / (1024.0 * 1024.0); - + CleanupFiles("bench_st_chr"); std::remove(bam_file.c_str()); std::remove(sorted_bam.c_str()); @@ -116,4 +116,3 @@ BENCHMARK(BM_SamtoolsSplit)->Arg(100000)->Arg(500000)->Arg(1000000)->Unit(benchm BENCHMARK(BM_ChromosomeSplit)->Arg(100000)->Arg(500000)->Arg(1000000)->Unit(benchmark::kMillisecond); BENCHMARK_MAIN(); - From 81a68c8edb79e9b5b9ccfbe6845ddcb696752b65 Mon Sep 17 00:00:00 2001 From: AdityaPandeyCN Date: Fri, 10 Oct 2025 18:17:38 +0530 Subject: [PATCH 06/13] parallel write code Signed-off-by: AdityaPandeyCN --- benchmark/chromosome_split_benchmark.cxx | 45 ++++ inc/ramcore/SamToNTuple.h | 7 + src/ramcore/SamToNTuple.cxx | 274 ++++++++++++++++++----- 3 files changed, 267 insertions(+), 59 deletions(-) diff --git a/benchmark/chromosome_split_benchmark.cxx b/benchmark/chromosome_split_benchmark.cxx index 25524dd..8b9ac62 100644 --- a/benchmark/chromosome_split_benchmark.cxx +++ b/benchmark/chromosome_split_benchmark.cxx @@ -112,7 +112,52 @@ static void BM_ChromosomeSplit(benchmark::State &state) state.counters["reads/s"] = benchmark::Counter(num_reads, benchmark::Counter::kIsRate); } +static void BM_ChromosomeSplitParallel(benchmark::State &state) +{ + int num_reads = state.range(0); + int num_threads = state.range(1); + std::string sam_file = "bench_split_par_" + std::to_string(num_reads) + ".sam"; + + GenerateSAMFile(sam_file, num_reads); + + FILE *old_stdout = stdout; + FILE *old_stderr = stderr; + + for (auto _ : state) { + stdout = fopen("/dev/null", "w"); + stderr = fopen("/dev/null", "w"); + + samtoramntuple_split_by_chromosome_parallel(sam_file.c_str(), "bench_split_par_out", 505, 1, num_threads); + + fclose(stdout); + fclose(stderr); + stdout = old_stdout; + stderr = old_stderr; + + state.counters["size_MB"] = GetTotalFileSize("bench_split_par_out_") / (1024.0 * 1024.0); + state.counters["threads"] = num_threads; + CleanupFiles("bench_split_par_out_"); + } + + std::remove(sam_file.c_str()); + state.counters["reads/s"] = benchmark::Counter(num_reads, benchmark::Counter::kIsRate); +} + BENCHMARK(BM_SamtoolsSplit)->Arg(100000)->Arg(500000)->Arg(1000000)->Unit(benchmark::kMillisecond); BENCHMARK(BM_ChromosomeSplit)->Arg(100000)->Arg(500000)->Arg(1000000)->Unit(benchmark::kMillisecond); +// Parallel benchmarks with different thread counts +BENCHMARK(BM_ChromosomeSplitParallel) + ->Args({100000, 2}) + ->Args({100000, 4}) + ->Args({100000, 8}) + ->Args({500000, 2}) + ->Args({500000, 4}) + ->Args({500000, 8}) + ->Args({1000000, 2}) + ->Args({1000000, 4}) + ->Args({1000000, 8}) + ->Unit(benchmark::kMillisecond); + BENCHMARK_MAIN(); + diff --git a/inc/ramcore/SamToNTuple.h b/inc/ramcore/SamToNTuple.h index 2149255..d5b92ba 100644 --- a/inc/ramcore/SamToNTuple.h +++ b/inc/ramcore/SamToNTuple.h @@ -13,4 +13,11 @@ void samtoramntuple(const char *datafile, void samtoramntuple_split_by_chromosome(const char *datafile, const char *output_prefix, int compression_algorithm, uint32_t quality_policy); +void samtoramntuple_split_by_chromosome_parallel(const char *datafile, + const char *output_prefix, + int compression_algorithm, + uint32_t quality_policy, + int num_threads = 4); + #endif + diff --git a/src/ramcore/SamToNTuple.cxx b/src/ramcore/SamToNTuple.cxx index e979f3f..965336d 100644 --- a/src/ramcore/SamToNTuple.cxx +++ b/src/ramcore/SamToNTuple.cxx @@ -9,12 +9,16 @@ #include #include #include +#include #include #include #include #include #include +#include +#include +#include void samtoramntuple(const char *datafile, const char *treefile, @@ -126,66 +130,218 @@ void samtoramntuple(const char *datafile, stopwatch.Print(); } -void samtoramntuple_split_by_chromosome(const char *datafile, const char *output_prefix, int compression_algorithm, +void samtoramntuple_split_by_chromosome(const char *datafile, + const char *output_prefix, + int compression_algorithm, uint32_t quality_policy) { - std::ifstream input(datafile); - if (!input) { - std::cerr << "Error: Cannot open " << datafile << std::endl; - return; - } - - std::vector headers; - std::map> chr_files; - std::map chr_temp_filenames; - std::string line; - - while (std::getline(input, line)) { - if (line.empty()) - continue; - - if (line[0] == '@') { - headers.push_back(line); - continue; - } - - size_t pos = line.find('\t'); - if (pos == std::string::npos) - continue; - pos = line.find('\t', pos + 1); - if (pos == std::string::npos) - continue; - - size_t end_pos = line.find('\t', pos + 1); - if (end_pos == std::string::npos) - continue; - - std::string rname = line.substr(pos + 1, end_pos - pos - 1); - if (rname == "*") - continue; - - if (chr_files.find(rname) == chr_files.end()) { - std::string temp_filename = std::string(output_prefix) + "_" + rname + ".tmp.sam"; - chr_temp_filenames[rname] = temp_filename; - chr_files[rname] = std::make_unique(temp_filename); - - for (const auto &header : headers) { - *(chr_files[rname]) << header << "\n"; - } - } - - *(chr_files[rname]) << line << "\n"; - } - - input.close(); - for (auto &[chr, file] : chr_files) { - file->close(); - } + RAMNTupleRecord::InitializeRefs(); + + std::map> chromosome_records; + std::vector> headers; + + ramcore::SamParser parser; + + auto header_callback = [&](const std::string& tag, const std::string& content) { + headers.push_back({tag, content}); + + if (tag == "@SQ") { + size_t sn_pos = content.find("SN:"); + if (sn_pos != std::string::npos) { + sn_pos += 3; + size_t tab_pos = content.find('\t', sn_pos); + std::string ref_name = content.substr(sn_pos, + tab_pos != std::string::npos ? tab_pos - sn_pos : std::string::npos); + RAMNTupleRecord::GetRnameRefs()->GetRefId(ref_name.c_str()); + } + } + }; + + auto record_callback = [&](const ramcore::SamRecord& sam_record, size_t record_num) { + if (sam_record.rname != "*") { + chromosome_records[sam_record.rname].push_back(sam_record); + } + }; + + parser.ParseFile(datafile, header_callback, record_callback); + + for (auto& [chr, records] : chromosome_records) { + std::sort(records.begin(), records.end(), + [](const ramcore::SamRecord& a, const ramcore::SamRecord& b) { + return a.pos < b.pos; + }); + } + + for (const auto& [chr, records] : chromosome_records) { + std::string filename = std::string(output_prefix) + "_" + chr + ".root"; + auto file = TFile::Open(filename.c_str(), "RECREATE"); + + auto model = RAMNTupleRecord::MakeModel(); + ROOT::Experimental::RNTupleWriteOptions writeOptions; + writeOptions.SetCompression(compression_algorithm); + writeOptions.SetMaxUnzippedPageSize(128000); + + auto writer = ROOT::Experimental::RNTupleWriter::Append( + std::move(model), "RAM", *file, writeOptions); + auto entry = writer->CreateEntry(); + auto recordPtr = entry->GetPtr("record").get(); + + for (const auto& sam_record : records) { + recordPtr->SetBit(quality_policy); + recordPtr->SetQNAME(sam_record.qname.c_str()); + recordPtr->SetFLAG(sam_record.flag); + recordPtr->SetREFID(sam_record.rname.c_str()); + recordPtr->SetPOS(sam_record.pos); + recordPtr->SetMAPQ(sam_record.mapq); + recordPtr->SetCIGAR(sam_record.cigar.c_str()); + recordPtr->SetREFNEXT(sam_record.rnext.c_str()); + recordPtr->SetPNEXT(sam_record.pnext); + recordPtr->SetTLEN(sam_record.tlen); + recordPtr->SetSEQ(sam_record.seq.c_str()); + recordPtr->SetQUAL(sam_record.qual.c_str()); + + recordPtr->ResetNOPT(); + for (const auto& opt : sam_record.optional_fields) { + recordPtr->SetOPT(opt.c_str()); + } + + writer->Fill(*entry); + } + + writer.reset(); + + TList h; + h.SetName("headers"); + for (const auto& [tag, content] : headers) { + h.Add(new TNamed(tag.c_str(), content.c_str())); + } + + RAMNTupleRecord::WriteAllRefs(*file); + h.Write(); + file->Close(); + delete file; + } +} - for (const auto &[chr, temp_filename] : chr_temp_filenames) { - std::string output_filename = std::string(output_prefix) + "_" + chr + ".root"; - samtoramntuple(temp_filename.c_str(), output_filename.c_str(), false, false, false, compression_algorithm, - quality_policy); - std::remove(temp_filename.c_str()); - } +void samtoramntuple_split_by_chromosome_parallel(const char *datafile, + const char *output_prefix, + int compression_algorithm, + uint32_t quality_policy, + int num_threads) +{ + ROOT::EnableThreadSafety(); + RAMNTupleRecord::InitializeRefs(); + + std::map> chromosome_records; + std::vector> headers; + + ramcore::SamParser parser; + + auto header_callback = [&](const std::string& tag, const std::string& content) { + headers.push_back({tag, content}); + + if (tag == "@SQ") { + size_t sn_pos = content.find("SN:"); + if (sn_pos != std::string::npos) { + sn_pos += 3; + size_t tab_pos = content.find('\t', sn_pos); + std::string ref_name = content.substr(sn_pos, + tab_pos != std::string::npos ? tab_pos - sn_pos : std::string::npos); + RAMNTupleRecord::GetRnameRefs()->GetRefId(ref_name.c_str()); + } + } + }; + + auto record_callback = [&](const ramcore::SamRecord& sam_record, size_t record_num) { + if (sam_record.rname != "*") { + chromosome_records[sam_record.rname].push_back(sam_record); + } + }; + + parser.ParseFile(datafile, header_callback, record_callback); + + for (auto& [chr, records] : chromosome_records) { + std::sort(records.begin(), records.end(), + [](const ramcore::SamRecord& a, const ramcore::SamRecord& b) { + return a.pos < b.pos; + }); + } + + std::vector chr_names; + for (const auto& [chr, records] : chromosome_records) { + chr_names.push_back(chr); + } + + static std::mutex file_destruction_mutex; + + auto write_chromosome = [&](const std::string& chr) { + const auto& records = chromosome_records[chr]; + + std::string filename = std::string(output_prefix) + "_" + chr + ".root"; + + std::unique_ptr file(TFile::Open(filename.c_str(), "RECREATE")); + + auto model = RAMNTupleRecord::MakeModel(); + ROOT::Experimental::RNTupleWriteOptions writeOptions; + writeOptions.SetCompression(compression_algorithm); + writeOptions.SetMaxUnzippedPageSize(128000); + + auto writer = ROOT::Experimental::RNTupleWriter::Append( + std::move(model), "RAM", *file, writeOptions); + auto entry = writer->CreateEntry(); + auto recordPtr = entry->GetPtr("record").get(); + + for (const auto& sam_record : records) { + recordPtr->SetBit(quality_policy); + recordPtr->SetQNAME(sam_record.qname.c_str()); + recordPtr->SetFLAG(sam_record.flag); + recordPtr->SetREFID(sam_record.rname.c_str()); + recordPtr->SetPOS(sam_record.pos); + recordPtr->SetMAPQ(sam_record.mapq); + recordPtr->SetCIGAR(sam_record.cigar.c_str()); + recordPtr->SetREFNEXT(sam_record.rnext.c_str()); + recordPtr->SetPNEXT(sam_record.pnext); + recordPtr->SetTLEN(sam_record.tlen); + recordPtr->SetSEQ(sam_record.seq.c_str()); + recordPtr->SetQUAL(sam_record.qual.c_str()); + + recordPtr->ResetNOPT(); + for (const auto& opt : sam_record.optional_fields) { + recordPtr->SetOPT(opt.c_str()); + } + + writer->Fill(*entry); + } + + writer.reset(); + + TList h; + h.SetName("headers"); + for (const auto& [tag, content] : headers) { + h.Add(new TNamed(tag.c_str(), content.c_str())); + } + + RAMNTupleRecord::WriteAllRefs(*file); + h.Write(); + + { + std::lock_guard lock(file_destruction_mutex); + file->Close(); + file.reset(); + } + }; + + size_t chr_idx = 0; + while (chr_idx < chr_names.size()) { + std::vector threads; + + for (int i = 0; i < num_threads && chr_idx < chr_names.size(); ++i, ++chr_idx) { + threads.emplace_back(write_chromosome, chr_names[chr_idx]); + } + + for (auto& t : threads) { + t.join(); + } + } } + From 473f39b26c42813be610dc32e043f9e07cf67f4d Mon Sep 17 00:00:00 2001 From: AdityaPandeyCN Date: Wed, 15 Oct 2025 11:10:31 +0530 Subject: [PATCH 07/13] version update Signed-off-by: AdityaPandeyCN --- benchmark/chromosome_split_benchmark.cxx | 39 +----- inc/ramcore/SamToNTuple.h | 13 +- inc/rntuple/RAMNTupleRecord.h | 9 +- src/ramcore/SamToNTuple.cxx | 153 ++++------------------- src/rntuple/RAMNTupleRecord.cxx | 8 +- test/chromosome_split_test.cxx | 15 +-- test/ramcoretests.cxx | 2 +- tools/samtoramntuple.cxx | 2 +- 8 files changed, 54 insertions(+), 187 deletions(-) diff --git a/benchmark/chromosome_split_benchmark.cxx b/benchmark/chromosome_split_benchmark.cxx index 8b9ac62..27bb80a 100644 --- a/benchmark/chromosome_split_benchmark.cxx +++ b/benchmark/chromosome_split_benchmark.cxx @@ -83,36 +83,7 @@ static void BM_SamtoolsSplit(benchmark::State &state) state.counters["reads/s"] = benchmark::Counter(num_reads, benchmark::Counter::kIsRate); } -static void BM_ChromosomeSplit(benchmark::State &state) -{ - int num_reads = state.range(0); - std::string sam_file = "bench_split_" + std::to_string(num_reads) + ".sam"; - - GenerateSAMFile(sam_file, num_reads); - - FILE *old_stdout = stdout; - FILE *old_stderr = stderr; - - for (auto _ : state) { - stdout = fopen("/dev/null", "w"); - stderr = fopen("/dev/null", "w"); - - samtoramntuple_split_by_chromosome(sam_file.c_str(), "bench_split_out", 505, 1); - - fclose(stdout); - fclose(stderr); - stdout = old_stdout; - stderr = old_stderr; - - state.counters["size_MB"] = GetTotalFileSize("bench_split_out_") / (1024.0 * 1024.0); - CleanupFiles("bench_split_out_"); - } - - std::remove(sam_file.c_str()); - state.counters["reads/s"] = benchmark::Counter(num_reads, benchmark::Counter::kIsRate); -} - -static void BM_ChromosomeSplitParallel(benchmark::State &state) +static void BM_ChromosomeSplitThreads(benchmark::State &state) { int num_reads = state.range(0); int num_threads = state.range(1); @@ -127,7 +98,7 @@ static void BM_ChromosomeSplitParallel(benchmark::State &state) stdout = fopen("/dev/null", "w"); stderr = fopen("/dev/null", "w"); - samtoramntuple_split_by_chromosome_parallel(sam_file.c_str(), "bench_split_par_out", 505, 1, num_threads); + samtoramntuple_split_by_chromosome(sam_file.c_str(), "bench_split_par_out", 505, 1, num_threads); fclose(stdout); fclose(stderr); @@ -144,10 +115,8 @@ static void BM_ChromosomeSplitParallel(benchmark::State &state) } BENCHMARK(BM_SamtoolsSplit)->Arg(100000)->Arg(500000)->Arg(1000000)->Unit(benchmark::kMillisecond); -BENCHMARK(BM_ChromosomeSplit)->Arg(100000)->Arg(500000)->Arg(1000000)->Unit(benchmark::kMillisecond); - -// Parallel benchmarks with different thread counts -BENCHMARK(BM_ChromosomeSplitParallel) +// Benchmarks with different thread counts +BENCHMARK(BM_ChromosomeSplitThreads) ->Args({100000, 2}) ->Args({100000, 4}) ->Args({100000, 8}) diff --git a/inc/ramcore/SamToNTuple.h b/inc/ramcore/SamToNTuple.h index d5b92ba..1ea696b 100644 --- a/inc/ramcore/SamToNTuple.h +++ b/inc/ramcore/SamToNTuple.h @@ -10,14 +10,11 @@ void samtoramntuple(const char *datafile, int compression_algorithm, uint32_t quality_policy); -void samtoramntuple_split_by_chromosome(const char *datafile, const char *output_prefix, int compression_algorithm, - uint32_t quality_policy); - -void samtoramntuple_split_by_chromosome_parallel(const char *datafile, - const char *output_prefix, - int compression_algorithm, - uint32_t quality_policy, - int num_threads = 4); +void samtoramntuple_split_by_chromosome(const char *datafile, + const char *output_prefix, + int compression_algorithm, + uint32_t quality_policy, + int num_threads = 4); #endif diff --git a/inc/rntuple/RAMNTupleRecord.h b/inc/rntuple/RAMNTupleRecord.h index eeb66a4..8d608f3 100644 --- a/inc/rntuple/RAMNTupleRecord.h +++ b/inc/rntuple/RAMNTupleRecord.h @@ -18,11 +18,11 @@ #include #include -namespace ROOT::Experimental { +namespace ROOT { class RNTupleModel; class RNTupleWriter; class RNTupleReader; -} // namespace ROOT::Experimental +} // namespace ROOT class RAMNTupleRefs; class RAMNTupleIndex; @@ -222,7 +222,7 @@ class RAMNTupleRecord { static RAMNTupleIndex *GetIndex() { return fgIndex.get(); } // File I/O - static std::unique_ptr + static std::unique_ptr OpenRAMFile(const std::string &filename, const std::string &ntupleName = "RAM"); static void WriteAllRefs(TFile &file); static void ReadAllRefs(const std::string &filename = ""); @@ -230,7 +230,7 @@ class RAMNTupleRecord { static void ReadIndex(const std::string &filename = ""); // RNTuple model creation - static std::unique_ptr MakeModel(); + static std::unique_ptr MakeModel(); void SetCompressionMode(uint32_t flags) { compression_flags = flags; } @@ -280,3 +280,4 @@ class RAMNTupleConverter { }; #endif + diff --git a/src/ramcore/SamToNTuple.cxx b/src/ramcore/SamToNTuple.cxx index 965336d..abdd757 100644 --- a/src/ramcore/SamToNTuple.cxx +++ b/src/ramcore/SamToNTuple.cxx @@ -18,7 +18,6 @@ #include #include #include -#include void samtoramntuple(const char *datafile, const char *treefile, @@ -39,13 +38,13 @@ void samtoramntuple(const char *datafile, auto model = RAMNTupleRecord::MakeModel(); - ROOT::Experimental::RNTupleWriteOptions writeOptions; + ROOT::RNTupleWriteOptions writeOptions; writeOptions.SetCompression(compression_algorithm); writeOptions.SetMaxUnzippedPageSize(64000); - auto writer = ROOT::Experimental::RNTupleWriter::Append( + auto writer = ROOT::RNTupleWriter::Append( std::move(model), "RAM", *rootFile, writeOptions); - auto defaultEntry = writer->CreateEntry(); + auto defaultEntry = writer->GetModel().CreateEntry(); auto recordPtr = defaultEntry->GetPtr("record"); TList headers; @@ -69,27 +68,25 @@ void samtoramntuple(const char *datafile, }; auto record_callback = [&](const ramcore::SamRecord& sam_record, size_t record_num) { - RAMNTupleRecord r; - r.SetBit(quality_policy); - - r.SetQNAME(sam_record.qname.c_str()); - r.SetFLAG(sam_record.flag); - r.SetREFID(sam_record.rname.c_str()); - r.SetPOS(sam_record.pos); - r.SetMAPQ(sam_record.mapq); - r.SetCIGAR(sam_record.cigar.c_str()); - r.SetREFNEXT(sam_record.rnext.c_str()); - r.SetPNEXT(sam_record.pnext); - r.SetTLEN(sam_record.tlen); - r.SetSEQ(sam_record.seq.c_str()); - r.SetQUAL(sam_record.qual.c_str()); - - r.ResetNOPT(); + recordPtr->SetBit(quality_policy); + + recordPtr->SetQNAME(sam_record.qname.c_str()); + recordPtr->SetFLAG(sam_record.flag); + recordPtr->SetREFID(sam_record.rname.c_str()); + recordPtr->SetPOS(sam_record.pos); + recordPtr->SetMAPQ(sam_record.mapq); + recordPtr->SetCIGAR(sam_record.cigar.c_str()); + recordPtr->SetREFNEXT(sam_record.rnext.c_str()); + recordPtr->SetPNEXT(sam_record.pnext); + recordPtr->SetTLEN(sam_record.tlen); + recordPtr->SetSEQ(sam_record.seq.c_str()); + recordPtr->SetQUAL(sam_record.qual.c_str()); + + recordPtr->ResetNOPT(); for (const auto& opt : sam_record.optional_fields) { - r.SetOPT(opt.c_str()); + recordPtr->SetOPT(opt.c_str()); } - *recordPtr = std::move(r); writer->Fill(*defaultEntry); if (index && record_num % 1000 == 0) { @@ -133,101 +130,8 @@ void samtoramntuple(const char *datafile, void samtoramntuple_split_by_chromosome(const char *datafile, const char *output_prefix, int compression_algorithm, - uint32_t quality_policy) -{ - RAMNTupleRecord::InitializeRefs(); - - std::map> chromosome_records; - std::vector> headers; - - ramcore::SamParser parser; - - auto header_callback = [&](const std::string& tag, const std::string& content) { - headers.push_back({tag, content}); - - if (tag == "@SQ") { - size_t sn_pos = content.find("SN:"); - if (sn_pos != std::string::npos) { - sn_pos += 3; - size_t tab_pos = content.find('\t', sn_pos); - std::string ref_name = content.substr(sn_pos, - tab_pos != std::string::npos ? tab_pos - sn_pos : std::string::npos); - RAMNTupleRecord::GetRnameRefs()->GetRefId(ref_name.c_str()); - } - } - }; - - auto record_callback = [&](const ramcore::SamRecord& sam_record, size_t record_num) { - if (sam_record.rname != "*") { - chromosome_records[sam_record.rname].push_back(sam_record); - } - }; - - parser.ParseFile(datafile, header_callback, record_callback); - - for (auto& [chr, records] : chromosome_records) { - std::sort(records.begin(), records.end(), - [](const ramcore::SamRecord& a, const ramcore::SamRecord& b) { - return a.pos < b.pos; - }); - } - - for (const auto& [chr, records] : chromosome_records) { - std::string filename = std::string(output_prefix) + "_" + chr + ".root"; - auto file = TFile::Open(filename.c_str(), "RECREATE"); - - auto model = RAMNTupleRecord::MakeModel(); - ROOT::Experimental::RNTupleWriteOptions writeOptions; - writeOptions.SetCompression(compression_algorithm); - writeOptions.SetMaxUnzippedPageSize(128000); - - auto writer = ROOT::Experimental::RNTupleWriter::Append( - std::move(model), "RAM", *file, writeOptions); - auto entry = writer->CreateEntry(); - auto recordPtr = entry->GetPtr("record").get(); - - for (const auto& sam_record : records) { - recordPtr->SetBit(quality_policy); - recordPtr->SetQNAME(sam_record.qname.c_str()); - recordPtr->SetFLAG(sam_record.flag); - recordPtr->SetREFID(sam_record.rname.c_str()); - recordPtr->SetPOS(sam_record.pos); - recordPtr->SetMAPQ(sam_record.mapq); - recordPtr->SetCIGAR(sam_record.cigar.c_str()); - recordPtr->SetREFNEXT(sam_record.rnext.c_str()); - recordPtr->SetPNEXT(sam_record.pnext); - recordPtr->SetTLEN(sam_record.tlen); - recordPtr->SetSEQ(sam_record.seq.c_str()); - recordPtr->SetQUAL(sam_record.qual.c_str()); - - recordPtr->ResetNOPT(); - for (const auto& opt : sam_record.optional_fields) { - recordPtr->SetOPT(opt.c_str()); - } - - writer->Fill(*entry); - } - - writer.reset(); - - TList h; - h.SetName("headers"); - for (const auto& [tag, content] : headers) { - h.Add(new TNamed(tag.c_str(), content.c_str())); - } - - RAMNTupleRecord::WriteAllRefs(*file); - h.Write(); - file->Close(); - delete file; - } -} - -void samtoramntuple_split_by_chromosome_parallel(const char *datafile, - const char *output_prefix, - int compression_algorithm, - uint32_t quality_policy, - int num_threads) + uint32_t quality_policy, + int num_threads) { ROOT::EnableThreadSafety(); RAMNTupleRecord::InitializeRefs(); @@ -272,8 +176,6 @@ void samtoramntuple_split_by_chromosome_parallel(const char *datafile, chr_names.push_back(chr); } - static std::mutex file_destruction_mutex; - auto write_chromosome = [&](const std::string& chr) { const auto& records = chromosome_records[chr]; @@ -282,13 +184,13 @@ void samtoramntuple_split_by_chromosome_parallel(const char *datafile, std::unique_ptr file(TFile::Open(filename.c_str(), "RECREATE")); auto model = RAMNTupleRecord::MakeModel(); - ROOT::Experimental::RNTupleWriteOptions writeOptions; + ROOT::RNTupleWriteOptions writeOptions; writeOptions.SetCompression(compression_algorithm); writeOptions.SetMaxUnzippedPageSize(128000); - auto writer = ROOT::Experimental::RNTupleWriter::Append( + auto writer = ROOT::RNTupleWriter::Append( std::move(model), "RAM", *file, writeOptions); - auto entry = writer->CreateEntry(); + auto entry = writer->GetModel().CreateEntry(); auto recordPtr = entry->GetPtr("record").get(); for (const auto& sam_record : records) { @@ -324,11 +226,8 @@ void samtoramntuple_split_by_chromosome_parallel(const char *datafile, RAMNTupleRecord::WriteAllRefs(*file); h.Write(); - { - std::lock_guard lock(file_destruction_mutex); - file->Close(); - file.reset(); - } + file->Close(); + file.reset(); }; size_t chr_idx = 0; diff --git a/src/rntuple/RAMNTupleRecord.cxx b/src/rntuple/RAMNTupleRecord.cxx index efba067..fe60c5b 100644 --- a/src/rntuple/RAMNTupleRecord.cxx +++ b/src/rntuple/RAMNTupleRecord.cxx @@ -13,7 +13,7 @@ #include #include -using namespace ROOT::Experimental; +using namespace ROOT; std::unique_ptr RAMNTupleRecord::fgRnameRefs = nullptr; std::unique_ptr RAMNTupleRecord::fgRnextRefs = nullptr; @@ -203,7 +203,7 @@ void RAMNTupleRecord::WriteAllRefs(TFile &file) writeOptions.SetCompression(505); auto metaWriter = RNTupleWriter::Append(std::move(metaModel), "METADATA", file, writeOptions); - auto metaEntry = metaWriter->CreateEntry(); + auto metaEntry = metaWriter->GetModel().CreateEntry(); auto rnamePtr = metaEntry->GetPtr>("rname_refs"); auto rnextPtr = metaEntry->GetPtr>("rnext_refs"); @@ -260,7 +260,7 @@ void RAMNTupleRecord::WriteIndex(TFile &file) writeOptions.SetCompression(505); auto indexWriter = RNTupleWriter::Append(std::move(indexModel), "INDEX", file, writeOptions); - auto indexEntry = indexWriter->CreateEntry(); + auto indexEntry = indexWriter->GetModel().CreateEntry(); auto indexPtr = indexEntry->GetPtr>("index_entries"); *indexPtr = fgIndex->GetEntries(); @@ -534,7 +534,7 @@ void RAMNTupleConverter::ConvertSAMToRAMNTuple(const std::string &sam_file, cons writeOptions.SetCompression(505); // ZSTD level 5 auto writer = RNTupleWriter::Append(std::move(model), "RAM", *file, writeOptions); - auto defaultEntry = writer->CreateEntry(); + auto defaultEntry = writer->GetModel().CreateEntry(); auto recordPtr = defaultEntry->GetPtr("record"); std::ifstream sam(sam_file); diff --git a/test/chromosome_split_test.cxx b/test/chromosome_split_test.cxx index 3d3df46..5d7894b 100644 --- a/test/chromosome_split_test.cxx +++ b/test/chromosome_split_test.cxx @@ -33,16 +33,16 @@ class ChromosomeSplitTest : public ::testing::Test { TEST_F(ChromosomeSplitTest, NoDataLoss) { samtoramntuple("test.sam", "test_regular.root", false, true, true, 505, 1); - auto regularReader = ROOT::Experimental::RNTupleReader::Open("RAM", "test_regular.root"); + auto regularReader = ROOT::RNTupleReader::Open("RAM", "test_regular.root"); Long64_t totalEntries = regularReader->GetNEntries(); - samtoramntuple_split_by_chromosome("test.sam", "test_split", 505, 1); + samtoramntuple_split_by_chromosome("test.sam", "test_split", 505, 1, 4); Long64_t splitEntriesSum = 0; for (const auto &entry : std::filesystem::directory_iterator(".")) { std::string filename = entry.path().filename().string(); if (filename.find("test_split_") == 0 && filename.find(".root") != std::string::npos) { - auto reader = ROOT::Experimental::RNTupleReader::Open("RAM", filename); + auto reader = ROOT::RNTupleReader::Open("RAM", filename); if (reader) { splitEntriesSum += reader->GetNEntries(); } @@ -54,7 +54,7 @@ TEST_F(ChromosomeSplitTest, NoDataLoss) TEST_F(ChromosomeSplitTest, CorrectChromosomeAssignment) { - samtoramntuple_split_by_chromosome("test.sam", "test_split", 505, 1); + samtoramntuple_split_by_chromosome("test.sam", "test_split", 505, 1, 4); for (const auto &entry : std::filesystem::directory_iterator(".")) { std::string filename = entry.path().filename().string(); @@ -63,7 +63,7 @@ TEST_F(ChromosomeSplitTest, CorrectChromosomeAssignment) size_t end = filename.find(".root"); std::string expectedChr = filename.substr(pos + 11, end - pos - 11); - auto reader = ROOT::Experimental::RNTupleReader::Open("RAM", filename); + auto reader = ROOT::RNTupleReader::Open("RAM", filename); ASSERT_NE(reader, nullptr); auto viewRecord = reader->GetView("record"); @@ -79,13 +79,13 @@ TEST_F(ChromosomeSplitTest, CorrectChromosomeAssignment) TEST_F(ChromosomeSplitTest, MetadataPresent) { - samtoramntuple_split_by_chromosome("test.sam", "test_split", 505, 1); + samtoramntuple_split_by_chromosome("test.sam", "test_split", 505, 1, 4); int filesChecked = 0; for (const auto &entry : std::filesystem::directory_iterator(".")) { std::string filename = entry.path().filename().string(); if (filename.find("test_split_") == 0 && filename.find(".root") != std::string::npos) { - auto metaReader = ROOT::Experimental::RNTupleReader::Open("METADATA", filename); + auto metaReader = ROOT::RNTupleReader::Open("METADATA", filename); EXPECT_NE(metaReader, nullptr) << "Missing METADATA in " << filename; if (metaReader) { @@ -98,3 +98,4 @@ TEST_F(ChromosomeSplitTest, MetadataPresent) EXPECT_GT(filesChecked, 0); } + diff --git a/test/ramcoretests.cxx b/test/ramcoretests.cxx index 2974400..458ba50 100644 --- a/test/ramcoretests.cxx +++ b/test/ramcoretests.cxx @@ -43,7 +43,7 @@ TEST_F(ramcoreTest, ConversionProducesEqualEntries) { ASSERT_NE(ttree, nullptr) << "Failed to get TTree"; Long64_t ttreeEntries = ttree->GetEntries(); - auto reader = ROOT::Experimental::RNTupleReader::Open("RAM", rntupleFile); + auto reader = ROOT::RNTupleReader::Open("RAM", rntupleFile); ASSERT_NE(reader, nullptr) << "Failed to open RNTuple"; Long64_t rntupleEntries = reader->GetNEntries(); diff --git a/tools/samtoramntuple.cxx b/tools/samtoramntuple.cxx index f7e6351..81f232d 100644 --- a/tools/samtoramntuple.cxx +++ b/tools/samtoramntuple.cxx @@ -49,7 +49,7 @@ int main(int argc, char* argv[]) { try { if (do_split) { - samtoramntuple_split_by_chromosome(input, output, 505, quality_mode); + samtoramntuple_split_by_chromosome(input, output, 505, quality_mode, 4); } else { std::string ramfile = std::string(output); if (ramfile.find(".root") == std::string::npos && ramfile.find(".ram") == std::string::npos) { From 6d3d6bd91790795126c293128924ab0aea70890b Mon Sep 17 00:00:00 2001 From: AdityaPandeyCN Date: Wed, 15 Oct 2025 11:28:42 +0530 Subject: [PATCH 08/13] version update Signed-off-by: AdityaPandeyCN --- .github/workflows/ci.yml | 2 +- benchmark/chromosome_split_benchmark.cxx | 20 +++--- inc/ramcore/SamToNTuple.h | 7 +-- inc/rntuple/RAMNTupleRecord.h | 1 - src/ramcore/SamToNTuple.cxx | 78 +++++++++++------------- test/chromosome_split_test.cxx | 1 - tools/samtoramntuple.cxx | 2 +- 7 files changed, 50 insertions(+), 61 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 952ca2b..bb65602 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,7 +16,7 @@ jobs: - name: Install ROOT run: | - ROOT_URL="https://root.cern/download/root_v6.34.06.Linux-ubuntu24.04-x86_64-gcc13.3.tar.gz" + ROOT_URL="https://root.cern/download/root_v6.36.00.Linux-ubuntu24.04-x86_64-gcc13.3.tar.gz" wget -O root.tar.gz $ROOT_URL tar -xzf root.tar.gz -C /opt/ echo "/opt/root/bin" >> $GITHUB_PATH diff --git a/benchmark/chromosome_split_benchmark.cxx b/benchmark/chromosome_split_benchmark.cxx index 27bb80a..e3c3e8c 100644 --- a/benchmark/chromosome_split_benchmark.cxx +++ b/benchmark/chromosome_split_benchmark.cxx @@ -117,16 +117,16 @@ static void BM_ChromosomeSplitThreads(benchmark::State &state) BENCHMARK(BM_SamtoolsSplit)->Arg(100000)->Arg(500000)->Arg(1000000)->Unit(benchmark::kMillisecond); // Benchmarks with different thread counts BENCHMARK(BM_ChromosomeSplitThreads) - ->Args({100000, 2}) - ->Args({100000, 4}) - ->Args({100000, 8}) - ->Args({500000, 2}) - ->Args({500000, 4}) - ->Args({500000, 8}) - ->Args({1000000, 2}) - ->Args({1000000, 4}) - ->Args({1000000, 8}) - ->Unit(benchmark::kMillisecond); + ->Args({100000, 2}) + ->Args({100000, 4}) + ->Args({100000, 8}) + ->Args({500000, 2}) + ->Args({500000, 4}) + ->Args({500000, 8}) + ->Args({1000000, 2}) + ->Args({1000000, 4}) + ->Args({1000000, 8}) + ->Unit(benchmark::kMillisecond); BENCHMARK_MAIN(); diff --git a/inc/ramcore/SamToNTuple.h b/inc/ramcore/SamToNTuple.h index 1ea696b..98b6e85 100644 --- a/inc/ramcore/SamToNTuple.h +++ b/inc/ramcore/SamToNTuple.h @@ -10,11 +10,8 @@ void samtoramntuple(const char *datafile, int compression_algorithm, uint32_t quality_policy); -void samtoramntuple_split_by_chromosome(const char *datafile, - const char *output_prefix, - int compression_algorithm, - uint32_t quality_policy, - int num_threads = 4); +void samtoramntuple_split_by_chromosome(const char *datafile, const char *output_prefix, int compression_algorithm, + uint32_t quality_policy, int num_threads = 4); #endif diff --git a/inc/rntuple/RAMNTupleRecord.h b/inc/rntuple/RAMNTupleRecord.h index 8d608f3..55142c1 100644 --- a/inc/rntuple/RAMNTupleRecord.h +++ b/inc/rntuple/RAMNTupleRecord.h @@ -280,4 +280,3 @@ class RAMNTupleConverter { }; #endif - diff --git a/src/ramcore/SamToNTuple.cxx b/src/ramcore/SamToNTuple.cxx index abdd757..5faa974 100644 --- a/src/ramcore/SamToNTuple.cxx +++ b/src/ramcore/SamToNTuple.cxx @@ -37,13 +37,12 @@ void samtoramntuple(const char *datafile, RAMNTupleRecord::InitializeRefs(); auto model = RAMNTupleRecord::MakeModel(); - + ROOT::RNTupleWriteOptions writeOptions; writeOptions.SetCompression(compression_algorithm); writeOptions.SetMaxUnzippedPageSize(64000); - - auto writer = ROOT::RNTupleWriter::Append( - std::move(model), "RAM", *rootFile, writeOptions); + + auto writer = ROOT::RNTupleWriter::Append(std::move(model), "RAM", *rootFile, writeOptions); auto defaultEntry = writer->GetModel().CreateEntry(); auto recordPtr = defaultEntry->GetPtr("record"); @@ -66,35 +65,34 @@ void samtoramntuple(const char *datafile, } } }; - - auto record_callback = [&](const ramcore::SamRecord& sam_record, size_t record_num) { - recordPtr->SetBit(quality_policy); - - recordPtr->SetQNAME(sam_record.qname.c_str()); - recordPtr->SetFLAG(sam_record.flag); - recordPtr->SetREFID(sam_record.rname.c_str()); - recordPtr->SetPOS(sam_record.pos); - recordPtr->SetMAPQ(sam_record.mapq); - recordPtr->SetCIGAR(sam_record.cigar.c_str()); - recordPtr->SetREFNEXT(sam_record.rnext.c_str()); - recordPtr->SetPNEXT(sam_record.pnext); - recordPtr->SetTLEN(sam_record.tlen); - recordPtr->SetSEQ(sam_record.seq.c_str()); - recordPtr->SetQUAL(sam_record.qual.c_str()); - - recordPtr->ResetNOPT(); - for (const auto& opt : sam_record.optional_fields) { - recordPtr->SetOPT(opt.c_str()); - } - - writer->Fill(*defaultEntry); - - if (index && record_num % 1000 == 0) { - RAMNTupleRecord::GetIndex()->AddItem(recordPtr->GetREFID(), - recordPtr->GetPOS() - 1, record_num); - } + + auto record_callback = [&](const ramcore::SamRecord &sam_record, size_t record_num) { + recordPtr->SetBit(quality_policy); + + recordPtr->SetQNAME(sam_record.qname.c_str()); + recordPtr->SetFLAG(sam_record.flag); + recordPtr->SetREFID(sam_record.rname.c_str()); + recordPtr->SetPOS(sam_record.pos); + recordPtr->SetMAPQ(sam_record.mapq); + recordPtr->SetCIGAR(sam_record.cigar.c_str()); + recordPtr->SetREFNEXT(sam_record.rnext.c_str()); + recordPtr->SetPNEXT(sam_record.pnext); + recordPtr->SetTLEN(sam_record.tlen); + recordPtr->SetSEQ(sam_record.seq.c_str()); + recordPtr->SetQUAL(sam_record.qual.c_str()); + + recordPtr->ResetNOPT(); + for (const auto &opt : sam_record.optional_fields) { + recordPtr->SetOPT(opt.c_str()); + } + + writer->Fill(*defaultEntry); + + if (index && record_num % 1000 == 0) { + RAMNTupleRecord::GetIndex()->AddItem(recordPtr->GetREFID(), recordPtr->GetPOS() - 1, record_num); + } }; - + if (!parser.ParseFile(datafile, header_callback, record_callback)) { printf("Failed to parse SAM file %s\n", datafile); return; @@ -127,11 +125,8 @@ void samtoramntuple(const char *datafile, stopwatch.Print(); } -void samtoramntuple_split_by_chromosome(const char *datafile, - const char *output_prefix, - int compression_algorithm, - uint32_t quality_policy, - int num_threads) +void samtoramntuple_split_by_chromosome(const char *datafile, const char *output_prefix, int compression_algorithm, + uint32_t quality_policy, int num_threads) { ROOT::EnableThreadSafety(); RAMNTupleRecord::InitializeRefs(); @@ -175,7 +170,7 @@ void samtoramntuple_split_by_chromosome(const char *datafile, for (const auto& [chr, records] : chromosome_records) { chr_names.push_back(chr); } - + auto write_chromosome = [&](const std::string& chr) { const auto& records = chromosome_records[chr]; @@ -187,9 +182,8 @@ void samtoramntuple_split_by_chromosome(const char *datafile, ROOT::RNTupleWriteOptions writeOptions; writeOptions.SetCompression(compression_algorithm); writeOptions.SetMaxUnzippedPageSize(128000); - - auto writer = ROOT::RNTupleWriter::Append( - std::move(model), "RAM", *file, writeOptions); + + auto writer = ROOT::RNTupleWriter::Append(std::move(model), "RAM", *file, writeOptions); auto entry = writer->GetModel().CreateEntry(); auto recordPtr = entry->GetPtr("record").get(); @@ -225,7 +219,7 @@ void samtoramntuple_split_by_chromosome(const char *datafile, RAMNTupleRecord::WriteAllRefs(*file); h.Write(); - + file->Close(); file.reset(); }; diff --git a/test/chromosome_split_test.cxx b/test/chromosome_split_test.cxx index 5d7894b..91274cb 100644 --- a/test/chromosome_split_test.cxx +++ b/test/chromosome_split_test.cxx @@ -98,4 +98,3 @@ TEST_F(ChromosomeSplitTest, MetadataPresent) EXPECT_GT(filesChecked, 0); } - diff --git a/tools/samtoramntuple.cxx b/tools/samtoramntuple.cxx index 81f232d..55d0be1 100644 --- a/tools/samtoramntuple.cxx +++ b/tools/samtoramntuple.cxx @@ -49,7 +49,7 @@ int main(int argc, char* argv[]) { try { if (do_split) { - samtoramntuple_split_by_chromosome(input, output, 505, quality_mode, 4); + samtoramntuple_split_by_chromosome(input, output, 505, quality_mode, 4); } else { std::string ramfile = std::string(output); if (ramfile.find(".root") == std::string::npos && ramfile.find(".ram") == std::string::npos) { From dd32c7d9391af02e340f80f1f841941a5275a875 Mon Sep 17 00:00:00 2001 From: AdityaPandeyCN Date: Thu, 16 Oct 2025 09:15:21 +0530 Subject: [PATCH 09/13] use parallel writer Signed-off-by: AdityaPandeyCN --- src/ramcore/SamToNTuple.cxx | 118 +++++++++++++++++++++++------------- 1 file changed, 77 insertions(+), 41 deletions(-) diff --git a/src/ramcore/SamToNTuple.cxx b/src/ramcore/SamToNTuple.cxx index 5faa974..84d00e5 100644 --- a/src/ramcore/SamToNTuple.cxx +++ b/src/ramcore/SamToNTuple.cxx @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include #include #include @@ -18,6 +20,8 @@ #include #include #include +#include +#include void samtoramntuple(const char *datafile, const char *treefile, @@ -125,7 +129,8 @@ void samtoramntuple(const char *datafile, stopwatch.Print(); } -void samtoramntuple_split_by_chromosome(const char *datafile, const char *output_prefix, int compression_algorithm, +void samtoramntuple_split_by_chromosome(const char *datafile, const char *output_prefix, + int compression_algorithm, uint32_t quality_policy, int num_threads) { ROOT::EnableThreadSafety(); @@ -159,57 +164,89 @@ void samtoramntuple_split_by_chromosome(const char *datafile, const char *output parser.ParseFile(datafile, header_callback, record_callback); - for (auto& [chr, records] : chromosome_records) { - std::sort(records.begin(), records.end(), - [](const ramcore::SamRecord& a, const ramcore::SamRecord& b) { - return a.pos < b.pos; - }); - } - std::vector chr_names; for (const auto& [chr, records] : chromosome_records) { chr_names.push_back(chr); } + + std::vector sort_threads; + for (const auto& chr : chr_names) { + sort_threads.emplace_back([&chromosome_records, chr]() { + auto& records = chromosome_records[chr]; + std::sort(records.begin(), records.end(), + [](const ramcore::SamRecord& a, const ramcore::SamRecord& b) { + return a.pos < b.pos; + }); + }); + } + for (auto& t : sort_threads) { + t.join(); + } - auto write_chromosome = [&](const std::string& chr) { + auto write_chromosome_parallel = [&](const std::string& chr) { const auto& records = chromosome_records[chr]; std::string filename = std::string(output_prefix) + "_" + chr + ".root"; - std::unique_ptr file(TFile::Open(filename.c_str(), "RECREATE")); - auto model = RAMNTupleRecord::MakeModel(); ROOT::RNTupleWriteOptions writeOptions; - writeOptions.SetCompression(compression_algorithm); - writeOptions.SetMaxUnzippedPageSize(128000); + + writeOptions.SetCompression(ROOT::RCompressionSetting::EAlgorithm::kZSTD, 1); + writeOptions.SetApproxZippedClusterSize(200 * 1024 * 1024); + writeOptions.SetMaxUnzippedClusterSize(1024 * 1024 * 1024); + writeOptions.SetMaxUnzippedPageSize(1024 * 1024); + writeOptions.SetUseBufferedWrite(true); - auto writer = ROOT::RNTupleWriter::Append(std::move(model), "RAM", *file, writeOptions); - auto entry = writer->GetModel().CreateEntry(); - auto recordPtr = entry->GetPtr("record").get(); - - for (const auto& sam_record : records) { - recordPtr->SetBit(quality_policy); - recordPtr->SetQNAME(sam_record.qname.c_str()); - recordPtr->SetFLAG(sam_record.flag); - recordPtr->SetREFID(sam_record.rname.c_str()); - recordPtr->SetPOS(sam_record.pos); - recordPtr->SetMAPQ(sam_record.mapq); - recordPtr->SetCIGAR(sam_record.cigar.c_str()); - recordPtr->SetREFNEXT(sam_record.rnext.c_str()); - recordPtr->SetPNEXT(sam_record.pnext); - recordPtr->SetTLEN(sam_record.tlen); - recordPtr->SetSEQ(sam_record.seq.c_str()); - recordPtr->SetQUAL(sam_record.qual.c_str()); - - recordPtr->ResetNOPT(); - for (const auto& opt : sam_record.optional_fields) { - recordPtr->SetOPT(opt.c_str()); - } - - writer->Fill(*entry); + auto parallel_writer = ROOT::Experimental::RNTupleParallelWriter::Recreate( + std::move(model), "RAM", filename, writeOptions); + + const int contexts_per_file = std::min(4, num_threads); + const size_t records_per_context = (records.size() + contexts_per_file - 1) / contexts_per_file; + + std::vector write_threads; + + for (int ctx = 0; ctx < contexts_per_file && ctx * records_per_context < records.size(); ++ctx) { + write_threads.emplace_back([&, ctx]() { + auto fill_context = parallel_writer->CreateFillContext(); + auto entry = fill_context->GetModel().CreateEntry(); + auto recordPtr = entry->GetPtr("record").get(); + + size_t start = ctx * records_per_context; + size_t end = std::min(start + records_per_context, records.size()); + + for (size_t i = start; i < end; ++i) { + const auto& sam_record = records[i]; + + recordPtr->SetBit(quality_policy); + recordPtr->SetQNAME(sam_record.qname.c_str()); + recordPtr->SetFLAG(sam_record.flag); + recordPtr->SetREFID(sam_record.rname.c_str()); + recordPtr->SetPOS(sam_record.pos); + recordPtr->SetMAPQ(sam_record.mapq); + recordPtr->SetCIGAR(sam_record.cigar.c_str()); + recordPtr->SetREFNEXT(sam_record.rnext.c_str()); + recordPtr->SetPNEXT(sam_record.pnext); + recordPtr->SetTLEN(sam_record.tlen); + recordPtr->SetSEQ(sam_record.seq.c_str()); + recordPtr->SetQUAL(sam_record.qual.c_str()); + + recordPtr->ResetNOPT(); + for (const auto& opt : sam_record.optional_fields) { + recordPtr->SetOPT(opt.c_str()); + } + + fill_context->Fill(*entry); + } + }); + } + + for (auto& t : write_threads) { + t.join(); } - writer.reset(); + parallel_writer.reset(); + + std::unique_ptr file(TFile::Open(filename.c_str(), "UPDATE")); TList h; h.SetName("headers"); @@ -219,9 +256,8 @@ void samtoramntuple_split_by_chromosome(const char *datafile, const char *output RAMNTupleRecord::WriteAllRefs(*file); h.Write(); - + file->Close(); - file.reset(); }; size_t chr_idx = 0; @@ -229,7 +265,7 @@ void samtoramntuple_split_by_chromosome(const char *datafile, const char *output std::vector threads; for (int i = 0; i < num_threads && chr_idx < chr_names.size(); ++i, ++chr_idx) { - threads.emplace_back(write_chromosome, chr_names[chr_idx]); + threads.emplace_back(write_chromosome_parallel, chr_names[chr_idx]); } for (auto& t : threads) { From a07a0e7f5bf96d314dd63580d89afbaa37f045e2 Mon Sep 17 00:00:00 2001 From: AdityaPandeyCN Date: Thu, 16 Oct 2025 09:16:53 +0530 Subject: [PATCH 10/13] clang changes Signed-off-by: AdityaPandeyCN --- src/ramcore/SamToNTuple.cxx | 177 ++++++++++++++++++------------------ 1 file changed, 87 insertions(+), 90 deletions(-) diff --git a/src/ramcore/SamToNTuple.cxx b/src/ramcore/SamToNTuple.cxx index 84d00e5..91d6ee8 100644 --- a/src/ramcore/SamToNTuple.cxx +++ b/src/ramcore/SamToNTuple.cxx @@ -129,8 +129,7 @@ void samtoramntuple(const char *datafile, stopwatch.Print(); } -void samtoramntuple_split_by_chromosome(const char *datafile, const char *output_prefix, - int compression_algorithm, +void samtoramntuple_split_by_chromosome(const char *datafile, const char *output_prefix, int compression_algorithm, uint32_t quality_policy, int num_threads) { ROOT::EnableThreadSafety(); @@ -163,109 +162,107 @@ void samtoramntuple_split_by_chromosome(const char *datafile, const char *output }; parser.ParseFile(datafile, header_callback, record_callback); - + std::vector chr_names; for (const auto& [chr, records] : chromosome_records) { chr_names.push_back(chr); } - + std::vector sort_threads; - for (const auto& chr : chr_names) { - sort_threads.emplace_back([&chromosome_records, chr]() { - auto& records = chromosome_records[chr]; - std::sort(records.begin(), records.end(), - [](const ramcore::SamRecord& a, const ramcore::SamRecord& b) { - return a.pos < b.pos; - }); - }); + for (const auto &chr : chr_names) { + sort_threads.emplace_back([&chromosome_records, chr]() { + auto &records = chromosome_records[chr]; + std::sort(records.begin(), records.end(), + [](const ramcore::SamRecord &a, const ramcore::SamRecord &b) { return a.pos < b.pos; }); + }); } - for (auto& t : sort_threads) { - t.join(); + for (auto &t : sort_threads) { + t.join(); } - auto write_chromosome_parallel = [&](const std::string& chr) { - const auto& records = chromosome_records[chr]; - - std::string filename = std::string(output_prefix) + "_" + chr + ".root"; - - auto model = RAMNTupleRecord::MakeModel(); - ROOT::RNTupleWriteOptions writeOptions; - - writeOptions.SetCompression(ROOT::RCompressionSetting::EAlgorithm::kZSTD, 1); - writeOptions.SetApproxZippedClusterSize(200 * 1024 * 1024); - writeOptions.SetMaxUnzippedClusterSize(1024 * 1024 * 1024); - writeOptions.SetMaxUnzippedPageSize(1024 * 1024); - writeOptions.SetUseBufferedWrite(true); - - auto parallel_writer = ROOT::Experimental::RNTupleParallelWriter::Recreate( - std::move(model), "RAM", filename, writeOptions); - - const int contexts_per_file = std::min(4, num_threads); - const size_t records_per_context = (records.size() + contexts_per_file - 1) / contexts_per_file; - - std::vector write_threads; - - for (int ctx = 0; ctx < contexts_per_file && ctx * records_per_context < records.size(); ++ctx) { - write_threads.emplace_back([&, ctx]() { - auto fill_context = parallel_writer->CreateFillContext(); - auto entry = fill_context->GetModel().CreateEntry(); - auto recordPtr = entry->GetPtr("record").get(); - - size_t start = ctx * records_per_context; - size_t end = std::min(start + records_per_context, records.size()); - - for (size_t i = start; i < end; ++i) { - const auto& sam_record = records[i]; - - recordPtr->SetBit(quality_policy); - recordPtr->SetQNAME(sam_record.qname.c_str()); - recordPtr->SetFLAG(sam_record.flag); - recordPtr->SetREFID(sam_record.rname.c_str()); - recordPtr->SetPOS(sam_record.pos); - recordPtr->SetMAPQ(sam_record.mapq); - recordPtr->SetCIGAR(sam_record.cigar.c_str()); - recordPtr->SetREFNEXT(sam_record.rnext.c_str()); - recordPtr->SetPNEXT(sam_record.pnext); - recordPtr->SetTLEN(sam_record.tlen); - recordPtr->SetSEQ(sam_record.seq.c_str()); - recordPtr->SetQUAL(sam_record.qual.c_str()); - - recordPtr->ResetNOPT(); - for (const auto& opt : sam_record.optional_fields) { - recordPtr->SetOPT(opt.c_str()); - } - - fill_context->Fill(*entry); + auto write_chromosome_parallel = [&](const std::string &chr) { + const auto &records = chromosome_records[chr]; + + std::string filename = std::string(output_prefix) + "_" + chr + ".root"; + + auto model = RAMNTupleRecord::MakeModel(); + ROOT::RNTupleWriteOptions writeOptions; + + writeOptions.SetCompression(ROOT::RCompressionSetting::EAlgorithm::kZSTD, 1); + writeOptions.SetApproxZippedClusterSize(200 * 1024 * 1024); + writeOptions.SetMaxUnzippedClusterSize(1024 * 1024 * 1024); + writeOptions.SetMaxUnzippedPageSize(1024 * 1024); + writeOptions.SetUseBufferedWrite(true); + + auto parallel_writer = + ROOT::Experimental::RNTupleParallelWriter::Recreate(std::move(model), "RAM", filename, writeOptions); + + const int contexts_per_file = std::min(4, num_threads); + const size_t records_per_context = (records.size() + contexts_per_file - 1) / contexts_per_file; + + std::vector write_threads; + + for (int ctx = 0; ctx < contexts_per_file && ctx * records_per_context < records.size(); ++ctx) { + write_threads.emplace_back([&, ctx]() { + auto fill_context = parallel_writer->CreateFillContext(); + auto entry = fill_context->GetModel().CreateEntry(); + auto recordPtr = entry->GetPtr("record").get(); + + size_t start = ctx * records_per_context; + size_t end = std::min(start + records_per_context, records.size()); + + for (size_t i = start; i < end; ++i) { + const auto &sam_record = records[i]; + + recordPtr->SetBit(quality_policy); + recordPtr->SetQNAME(sam_record.qname.c_str()); + recordPtr->SetFLAG(sam_record.flag); + recordPtr->SetREFID(sam_record.rname.c_str()); + recordPtr->SetPOS(sam_record.pos); + recordPtr->SetMAPQ(sam_record.mapq); + recordPtr->SetCIGAR(sam_record.cigar.c_str()); + recordPtr->SetREFNEXT(sam_record.rnext.c_str()); + recordPtr->SetPNEXT(sam_record.pnext); + recordPtr->SetTLEN(sam_record.tlen); + recordPtr->SetSEQ(sam_record.seq.c_str()); + recordPtr->SetQUAL(sam_record.qual.c_str()); + + recordPtr->ResetNOPT(); + for (const auto &opt : sam_record.optional_fields) { + recordPtr->SetOPT(opt.c_str()); } - }); - } - - for (auto& t : write_threads) { - t.join(); - } - - parallel_writer.reset(); - - std::unique_ptr file(TFile::Open(filename.c_str(), "UPDATE")); - - TList h; - h.SetName("headers"); - for (const auto& [tag, content] : headers) { - h.Add(new TNamed(tag.c_str(), content.c_str())); - } - - RAMNTupleRecord::WriteAllRefs(*file); - h.Write(); - - file->Close(); + + fill_context->Fill(*entry); + } + }); + } + + for (auto &t : write_threads) { + t.join(); + } + + parallel_writer.reset(); + + std::unique_ptr file(TFile::Open(filename.c_str(), "UPDATE")); + + TList h; + h.SetName("headers"); + for (const auto &[tag, content] : headers) { + h.Add(new TNamed(tag.c_str(), content.c_str())); + } + + RAMNTupleRecord::WriteAllRefs(*file); + h.Write(); + + file->Close(); }; - + size_t chr_idx = 0; while (chr_idx < chr_names.size()) { std::vector threads; for (int i = 0; i < num_threads && chr_idx < chr_names.size(); ++i, ++chr_idx) { - threads.emplace_back(write_chromosome_parallel, chr_names[chr_idx]); + threads.emplace_back(write_chromosome_parallel, chr_names[chr_idx]); } for (auto& t : threads) { From 055edc7e68fe9497083feb4999b51d601b44d4a0 Mon Sep 17 00:00:00 2001 From: AdityaPandeyCN Date: Thu, 16 Oct 2025 11:52:10 +0530 Subject: [PATCH 11/13] clang changes Signed-off-by: AdityaPandeyCN --- benchmark/chromosome_split_benchmark.cxx | 1 - inc/ramcore/SamToNTuple.h | 1 - src/ramcore/SamToNTuple.cxx | 255 +++++++++++------------ 3 files changed, 127 insertions(+), 130 deletions(-) diff --git a/benchmark/chromosome_split_benchmark.cxx b/benchmark/chromosome_split_benchmark.cxx index e3c3e8c..cae96cd 100644 --- a/benchmark/chromosome_split_benchmark.cxx +++ b/benchmark/chromosome_split_benchmark.cxx @@ -129,4 +129,3 @@ BENCHMARK(BM_ChromosomeSplitThreads) ->Unit(benchmark::kMillisecond); BENCHMARK_MAIN(); - diff --git a/inc/ramcore/SamToNTuple.h b/inc/ramcore/SamToNTuple.h index 98b6e85..1b986c0 100644 --- a/inc/ramcore/SamToNTuple.h +++ b/inc/ramcore/SamToNTuple.h @@ -14,4 +14,3 @@ void samtoramntuple_split_by_chromosome(const char *datafile, const char *output uint32_t quality_policy, int num_threads = 4); #endif - diff --git a/src/ramcore/SamToNTuple.cxx b/src/ramcore/SamToNTuple.cxx index 91d6ee8..5eaf9c6 100644 --- a/src/ramcore/SamToNTuple.cxx +++ b/src/ramcore/SamToNTuple.cxx @@ -132,142 +132,141 @@ void samtoramntuple(const char *datafile, void samtoramntuple_split_by_chromosome(const char *datafile, const char *output_prefix, int compression_algorithm, uint32_t quality_policy, int num_threads) { - ROOT::EnableThreadSafety(); - RAMNTupleRecord::InitializeRefs(); - - std::map> chromosome_records; - std::vector> headers; - - ramcore::SamParser parser; - - auto header_callback = [&](const std::string& tag, const std::string& content) { - headers.push_back({tag, content}); - - if (tag == "@SQ") { - size_t sn_pos = content.find("SN:"); - if (sn_pos != std::string::npos) { - sn_pos += 3; - size_t tab_pos = content.find('\t', sn_pos); - std::string ref_name = content.substr(sn_pos, - tab_pos != std::string::npos ? tab_pos - sn_pos : std::string::npos); - RAMNTupleRecord::GetRnameRefs()->GetRefId(ref_name.c_str()); + ROOT::EnableThreadSafety(); + RAMNTupleRecord::InitializeRefs(); + + std::map> chromosome_records; + std::vector> headers; + + ramcore::SamParser parser; + + auto header_callback = [&](const std::string &tag, const std::string &content) { + headers.push_back({tag, content}); + + if (tag == "@SQ") { + size_t sn_pos = content.find("SN:"); + if (sn_pos != std::string::npos) { + sn_pos += 3; + size_t tab_pos = content.find('\t', sn_pos); + std::string ref_name = + content.substr(sn_pos, tab_pos != std::string::npos ? tab_pos - sn_pos : std::string::npos); + RAMNTupleRecord::GetRnameRefs()->GetRefId(ref_name.c_str()); + } + } + }; + + auto record_callback = [&](const ramcore::SamRecord &sam_record, size_t record_num) { + if (sam_record.rname != "*") { + chromosome_records[sam_record.rname].push_back(sam_record); + } + }; + + parser.ParseFile(datafile, header_callback, record_callback); + + std::vector chr_names; + for (const auto &[chr, records] : chromosome_records) { + chr_names.push_back(chr); + } + + std::vector sort_threads; + for (const auto &chr : chr_names) { + sort_threads.emplace_back([&chromosome_records, chr]() { + auto &records = chromosome_records[chr]; + std::sort(records.begin(), records.end(), + [](const ramcore::SamRecord &a, const ramcore::SamRecord &b) { return a.pos < b.pos; }); + }); + } + for (auto &t : sort_threads) { + t.join(); + } + + auto write_chromosome_parallel = [&](const std::string &chr) { + const auto &records = chromosome_records[chr]; + + std::string filename = std::string(output_prefix) + "_" + chr + ".root"; + + auto model = RAMNTupleRecord::MakeModel(); + ROOT::RNTupleWriteOptions writeOptions; + + writeOptions.SetCompression(ROOT::RCompressionSetting::EAlgorithm::kZSTD, 1); + writeOptions.SetApproxZippedClusterSize(200 * 1024 * 1024); + writeOptions.SetMaxUnzippedClusterSize(1024 * 1024 * 1024); + writeOptions.SetMaxUnzippedPageSize(1024 * 1024); + writeOptions.SetUseBufferedWrite(true); + + auto parallel_writer = + ROOT::Experimental::RNTupleParallelWriter::Recreate(std::move(model), "RAM", filename, writeOptions); + + const int contexts_per_file = std::min(4, num_threads); + const size_t records_per_context = (records.size() + contexts_per_file - 1) / contexts_per_file; + + std::vector write_threads; + + for (int ctx = 0; ctx < contexts_per_file && ctx * records_per_context < records.size(); ++ctx) { + write_threads.emplace_back([&, ctx]() { + auto fill_context = parallel_writer->CreateFillContext(); + auto entry = fill_context->GetModel().CreateEntry(); + auto recordPtr = entry->GetPtr("record").get(); + + size_t start = ctx * records_per_context; + size_t end = std::min(start + records_per_context, records.size()); + + for (size_t i = start; i < end; ++i) { + const auto &sam_record = records[i]; + + recordPtr->SetBit(quality_policy); + recordPtr->SetQNAME(sam_record.qname.c_str()); + recordPtr->SetFLAG(sam_record.flag); + recordPtr->SetREFID(sam_record.rname.c_str()); + recordPtr->SetPOS(sam_record.pos); + recordPtr->SetMAPQ(sam_record.mapq); + recordPtr->SetCIGAR(sam_record.cigar.c_str()); + recordPtr->SetREFNEXT(sam_record.rnext.c_str()); + recordPtr->SetPNEXT(sam_record.pnext); + recordPtr->SetTLEN(sam_record.tlen); + recordPtr->SetSEQ(sam_record.seq.c_str()); + recordPtr->SetQUAL(sam_record.qual.c_str()); + + recordPtr->ResetNOPT(); + for (const auto &opt : sam_record.optional_fields) { + recordPtr->SetOPT(opt.c_str()); + } + + fill_context->Fill(*entry); } - } - }; - - auto record_callback = [&](const ramcore::SamRecord& sam_record, size_t record_num) { - if (sam_record.rname != "*") { - chromosome_records[sam_record.rname].push_back(sam_record); - } - }; - - parser.ParseFile(datafile, header_callback, record_callback); + }); + } - std::vector chr_names; - for (const auto& [chr, records] : chromosome_records) { - chr_names.push_back(chr); - } + for (auto &t : write_threads) { + t.join(); + } - std::vector sort_threads; - for (const auto &chr : chr_names) { - sort_threads.emplace_back([&chromosome_records, chr]() { - auto &records = chromosome_records[chr]; - std::sort(records.begin(), records.end(), - [](const ramcore::SamRecord &a, const ramcore::SamRecord &b) { return a.pos < b.pos; }); - }); - } - for (auto &t : sort_threads) { - t.join(); - } + parallel_writer.reset(); - auto write_chromosome_parallel = [&](const std::string &chr) { - const auto &records = chromosome_records[chr]; - - std::string filename = std::string(output_prefix) + "_" + chr + ".root"; - - auto model = RAMNTupleRecord::MakeModel(); - ROOT::RNTupleWriteOptions writeOptions; - - writeOptions.SetCompression(ROOT::RCompressionSetting::EAlgorithm::kZSTD, 1); - writeOptions.SetApproxZippedClusterSize(200 * 1024 * 1024); - writeOptions.SetMaxUnzippedClusterSize(1024 * 1024 * 1024); - writeOptions.SetMaxUnzippedPageSize(1024 * 1024); - writeOptions.SetUseBufferedWrite(true); - - auto parallel_writer = - ROOT::Experimental::RNTupleParallelWriter::Recreate(std::move(model), "RAM", filename, writeOptions); - - const int contexts_per_file = std::min(4, num_threads); - const size_t records_per_context = (records.size() + contexts_per_file - 1) / contexts_per_file; - - std::vector write_threads; - - for (int ctx = 0; ctx < contexts_per_file && ctx * records_per_context < records.size(); ++ctx) { - write_threads.emplace_back([&, ctx]() { - auto fill_context = parallel_writer->CreateFillContext(); - auto entry = fill_context->GetModel().CreateEntry(); - auto recordPtr = entry->GetPtr("record").get(); - - size_t start = ctx * records_per_context; - size_t end = std::min(start + records_per_context, records.size()); - - for (size_t i = start; i < end; ++i) { - const auto &sam_record = records[i]; - - recordPtr->SetBit(quality_policy); - recordPtr->SetQNAME(sam_record.qname.c_str()); - recordPtr->SetFLAG(sam_record.flag); - recordPtr->SetREFID(sam_record.rname.c_str()); - recordPtr->SetPOS(sam_record.pos); - recordPtr->SetMAPQ(sam_record.mapq); - recordPtr->SetCIGAR(sam_record.cigar.c_str()); - recordPtr->SetREFNEXT(sam_record.rnext.c_str()); - recordPtr->SetPNEXT(sam_record.pnext); - recordPtr->SetTLEN(sam_record.tlen); - recordPtr->SetSEQ(sam_record.seq.c_str()); - recordPtr->SetQUAL(sam_record.qual.c_str()); - - recordPtr->ResetNOPT(); - for (const auto &opt : sam_record.optional_fields) { - recordPtr->SetOPT(opt.c_str()); - } - - fill_context->Fill(*entry); - } - }); - } + std::unique_ptr file(TFile::Open(filename.c_str(), "UPDATE")); - for (auto &t : write_threads) { - t.join(); - } + TList h; + h.SetName("headers"); + for (const auto &[tag, content] : headers) { + h.Add(new TNamed(tag.c_str(), content.c_str())); + } - parallel_writer.reset(); + RAMNTupleRecord::WriteAllRefs(*file); + h.Write(); - std::unique_ptr file(TFile::Open(filename.c_str(), "UPDATE")); + file->Close(); + }; - TList h; - h.SetName("headers"); - for (const auto &[tag, content] : headers) { - h.Add(new TNamed(tag.c_str(), content.c_str())); - } + size_t chr_idx = 0; + while (chr_idx < chr_names.size()) { + std::vector threads; - RAMNTupleRecord::WriteAllRefs(*file); - h.Write(); + for (int i = 0; i < num_threads && chr_idx < chr_names.size(); ++i, ++chr_idx) { + threads.emplace_back(write_chromosome_parallel, chr_names[chr_idx]); + } - file->Close(); - }; - - size_t chr_idx = 0; - while (chr_idx < chr_names.size()) { - std::vector threads; - - for (int i = 0; i < num_threads && chr_idx < chr_names.size(); ++i, ++chr_idx) { - threads.emplace_back(write_chromosome_parallel, chr_names[chr_idx]); - } - - for (auto& t : threads) { - t.join(); - } - } + for (auto &t : threads) { + t.join(); + } + } } - From 9f540bb6d5d7545ebb11785764f6ca4f6c05725b Mon Sep 17 00:00:00 2001 From: AdityaPandeyCN Date: Thu, 16 Oct 2025 16:27:58 +0530 Subject: [PATCH 12/13] samtools threading Signed-off-by: AdityaPandeyCN --- benchmark/chromosome_split_benchmark.cxx | 85 +++++++++++++++++++++--- 1 file changed, 76 insertions(+), 9 deletions(-) diff --git a/benchmark/chromosome_split_benchmark.cxx b/benchmark/chromosome_split_benchmark.cxx index cae96cd..16397ee 100644 --- a/benchmark/chromosome_split_benchmark.cxx +++ b/benchmark/chromosome_split_benchmark.cxx @@ -7,6 +7,7 @@ #include #include #include +#include static void CleanupFiles(const std::string &pattern) { @@ -56,16 +57,12 @@ static void BM_SamtoolsSplit(benchmark::State &state) std::string bam_file = "bench_st_tmp.bam"; std::string sorted_bam = "bench_st_sorted.bam"; - // Convert SAM to BAM system(("samtools view -bS " + sam_file + " -o " + bam_file + " 2>/dev/null").c_str()); - // Sort BAM system(("samtools sort " + bam_file + " -o " + sorted_bam + " 2>/dev/null").c_str()); - // Index BAM system(("samtools index " + sorted_bam + " 2>/dev/null").c_str()); - // Extract each chromosome for (const auto &chr : chromosomes) { std::string cmd = "samtools view -b " + sorted_bam + " " + chr + " > bench_st_" + chr + ".bam 2>/dev/null"; system(cmd.c_str()); @@ -83,6 +80,65 @@ static void BM_SamtoolsSplit(benchmark::State &state) state.counters["reads/s"] = benchmark::Counter(num_reads, benchmark::Counter::kIsRate); } +static void BM_SamtoolsSplitThreaded(benchmark::State &state) +{ + int num_reads = state.range(0); + int num_threads = state.range(1); + std::string sam_file = "bench_st_mt_" + std::to_string(num_reads) + ".sam"; + + GenerateSAMFile(sam_file, num_reads); + auto chromosomes = GetChromosomes(sam_file); + + for (auto _ : state) { + std::string bam_file = "bench_st_mt_tmp.bam"; + std::string sorted_bam = "bench_st_mt_sorted.bam"; + + std::string cmd = "samtools view -@ " + std::to_string(num_threads) + + " -bS " + sam_file + " -o " + bam_file + " 2>/dev/null"; + system(cmd.c_str()); + + cmd = "samtools sort -@ " + std::to_string(num_threads) + + " -m 1G " + bam_file + " -o " + sorted_bam + " 2>/dev/null"; + system(cmd.c_str()); + + cmd = "samtools index -@ " + std::to_string(num_threads) + + " " + sorted_bam + " 2>/dev/null"; + system(cmd.c_str()); + + std::vector threads; + for (const auto &chr : chromosomes) { + threads.emplace_back([&sorted_bam, &chr]() { + + std::string cmd = "samtools view -@ 2 -b " + sorted_bam + " " + chr + + " > bench_st_mt_" + chr + ".bam 2>/dev/null"; + system(cmd.c_str()); + }); + + if (threads.size() >= num_threads) { + for (auto &t : threads) { + t.join(); + } + threads.clear(); + } + } + + for (auto &t : threads) { + t.join(); + } + + state.counters["size_MB"] = GetTotalFileSize("bench_st_mt_chr") / (1024.0 * 1024.0); + state.counters["threads"] = num_threads; + + CleanupFiles("bench_st_mt_chr"); + std::remove(bam_file.c_str()); + std::remove(sorted_bam.c_str()); + std::remove((sorted_bam + ".bai").c_str()); + } + + std::remove(sam_file.c_str()); + state.counters["reads/s"] = benchmark::Counter(num_reads, benchmark::Counter::kIsRate); +} + static void BM_ChromosomeSplitThreads(benchmark::State &state) { int num_reads = state.range(0); @@ -114,18 +170,29 @@ static void BM_ChromosomeSplitThreads(benchmark::State &state) state.counters["reads/s"] = benchmark::Counter(num_reads, benchmark::Counter::kIsRate); } -BENCHMARK(BM_SamtoolsSplit)->Arg(100000)->Arg(500000)->Arg(1000000)->Unit(benchmark::kMillisecond); -// Benchmarks with different thread counts +BENCHMARK(BM_SamtoolsSplit) + ->Arg(100000) + ->Arg(500000) + ->Arg(1000000) + ->Unit(benchmark::kMillisecond); + +BENCHMARK(BM_SamtoolsSplitThreaded) + ->Args({100000, 2}) + ->Args({100000, 4}) + ->Args({500000, 2}) + ->Args({500000, 4}) + ->Args({1000000, 2}) + ->Args({1000000, 4}) + ->Unit(benchmark::kMillisecond); + BENCHMARK(BM_ChromosomeSplitThreads) ->Args({100000, 2}) ->Args({100000, 4}) - ->Args({100000, 8}) ->Args({500000, 2}) ->Args({500000, 4}) - ->Args({500000, 8}) ->Args({1000000, 2}) ->Args({1000000, 4}) - ->Args({1000000, 8}) ->Unit(benchmark::kMillisecond); BENCHMARK_MAIN(); + From 56ec767317352d4e3188160ea2c6aa56cf327b31 Mon Sep 17 00:00:00 2001 From: AdityaPandeyCN Date: Thu, 16 Oct 2025 16:28:49 +0530 Subject: [PATCH 13/13] clang format changes Signed-off-by: AdityaPandeyCN --- benchmark/chromosome_split_benchmark.cxx | 27 +++++++++--------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/benchmark/chromosome_split_benchmark.cxx b/benchmark/chromosome_split_benchmark.cxx index 16397ee..e5f1622 100644 --- a/benchmark/chromosome_split_benchmark.cxx +++ b/benchmark/chromosome_split_benchmark.cxx @@ -93,27 +93,25 @@ static void BM_SamtoolsSplitThreaded(benchmark::State &state) std::string bam_file = "bench_st_mt_tmp.bam"; std::string sorted_bam = "bench_st_mt_sorted.bam"; - std::string cmd = "samtools view -@ " + std::to_string(num_threads) + - " -bS " + sam_file + " -o " + bam_file + " 2>/dev/null"; + std::string cmd = + "samtools view -@ " + std::to_string(num_threads) + " -bS " + sam_file + " -o " + bam_file + " 2>/dev/null"; system(cmd.c_str()); - cmd = "samtools sort -@ " + std::to_string(num_threads) + - " -m 1G " + bam_file + " -o " + sorted_bam + " 2>/dev/null"; + cmd = "samtools sort -@ " + std::to_string(num_threads) + " -m 1G " + bam_file + " -o " + sorted_bam + + " 2>/dev/null"; system(cmd.c_str()); - cmd = "samtools index -@ " + std::to_string(num_threads) + - " " + sorted_bam + " 2>/dev/null"; + cmd = "samtools index -@ " + std::to_string(num_threads) + " " + sorted_bam + " 2>/dev/null"; system(cmd.c_str()); std::vector threads; for (const auto &chr : chromosomes) { threads.emplace_back([&sorted_bam, &chr]() { - - std::string cmd = "samtools view -@ 2 -b " + sorted_bam + " " + chr + - " > bench_st_mt_" + chr + ".bam 2>/dev/null"; + std::string cmd = + "samtools view -@ 2 -b " + sorted_bam + " " + chr + " > bench_st_mt_" + chr + ".bam 2>/dev/null"; system(cmd.c_str()); }); - + if (threads.size() >= num_threads) { for (auto &t : threads) { t.join(); @@ -121,7 +119,7 @@ static void BM_SamtoolsSplitThreaded(benchmark::State &state) threads.clear(); } } - + for (auto &t : threads) { t.join(); } @@ -170,11 +168,7 @@ static void BM_ChromosomeSplitThreads(benchmark::State &state) state.counters["reads/s"] = benchmark::Counter(num_reads, benchmark::Counter::kIsRate); } -BENCHMARK(BM_SamtoolsSplit) - ->Arg(100000) - ->Arg(500000) - ->Arg(1000000) - ->Unit(benchmark::kMillisecond); +BENCHMARK(BM_SamtoolsSplit)->Arg(100000)->Arg(500000)->Arg(1000000)->Unit(benchmark::kMillisecond); BENCHMARK(BM_SamtoolsSplitThreaded) ->Args({100000, 2}) @@ -195,4 +189,3 @@ BENCHMARK(BM_ChromosomeSplitThreads) ->Unit(benchmark::kMillisecond); BENCHMARK_MAIN(); -