Permalink
Browse files

Multithreading change from OpenMP to C++11 thread

Also adds support for additional inputfiletypes
Code reused from the brownie project
  • Loading branch information...
Giles Miclotte
Giles Miclotte committed Feb 2, 2016
1 parent 25d2b28 commit b448d66dde25da69fc8b2e49492607bae9242f27
View
@@ -2,3 +2,4 @@
build
bin
Profiling
*.kdev*
View
@@ -1,4 +1,4 @@
cmake_minimum_required (VERSION 2.6)
cmake_minimum_required (VERSION 2.6.3)
project (Jabba)
@@ -10,10 +10,44 @@ add_definitions("-DJABBA_MAJOR_VERSION=${${PROJECT_NAME}_MAJOR_VERSION}")
add_definitions("-DJABBA_MINOR_VERSION=${${PROJECT_NAME}_MINOR_VERSION}")
add_definitions("-DJABBA_PATCH_LEVEL=${${PROJECT_NAME}_PATCH_LEVEL}")
# set compiler
set(CMAKE_CXX_COMPILER /usr/bin/g++)
# set g++ specific flags
#set(CMAKE_CXX_FLAGS "-std=c++11 -ggdb -g3 -pedantic -Wall -Wextra -Wno-sign-compare -fopenmp -lpthread")
set(CMAKE_CXX_FLAGS "-std=c++11 -O3 -pedantic -Wall -Wextra -Wno-sign-compare -fopenmp -lpthread -DNDEBUG -o $@")
# set the default configuration to Release
if(NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "Release" CACHE STRING
"Choose the type of build, options are: Debug Release RelWithDebInfo MinSizeRel." FORCE)
endif(NOT CMAKE_BUILD_TYPE)
# set the module path
set(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake")
# set some definitions
add_definitions("-DMAXKMERLENGTH=127")
add_definitions("-DCATEGORIES=2")
add_definitions("-DBROWNIE_MAJOR_VERSION=${${PROJECT_NAME}_MAJOR_VERSION}")
add_definitions("-DBROWNIE_MINOR_VERSION=${${PROJECT_NAME}_MINOR_VERSION}")
add_definitions("-DBROWNIE_PATCH_LEVEL=${${PROJECT_NAME}_PATCH_LEVEL}")
# set windows specific flags
if (MSVC)
add_definitions("-D_SCL_SECURE_NO_WARNINGS")
add_definitions("-D_CRT_SECURE_NO_WARNINGS")
endif (MSVC)
# set g++ specific flags
if (CMAKE_COMPILER_IS_GNUCXX OR CMAKE_COMPILER_IS_GNUCC)
set(CMAKE_CXX_FLAGS "-Wno-deprecated -std=c++11 -static-libgcc -static-libstdc++")
set(CMAKE_CXX_FLAGS_DEBUG "-O0 -g3 -Wall -pedantic -Wno-long-long -Wno-deprecated -Wall -Wextra -Wno-sign-compare")
set(CMAKE_CXX_FLAGS_RELEASE "-O3")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O2 -g3")
set(CMAKE_C_FLAGS_DEBUG "-O0 -g3 -Wall -pedantic -Wno-long-long")
endif (CMAKE_COMPILER_IS_GNUCXX OR CMAKE_COMPILER_IS_GNUCC)
add_subdirectory(src)
# uncomment the portion below to disable assertions
if (CMAKE_BUILD_TYPE STREQUAL Release)
add_definitions(-DNDEBUG)
else (CMAKE_BUILD_TYPE STREQUAL Release)
add_definitions(-DDEBUG)
endif (CMAKE_BUILD_TYPE STREQUAL Release)
set(CMAKE_VERBOSE_MAKEFILE ON)
add_subdirectory(src)
View
@@ -8,5 +8,5 @@ cd ..
mkdir -p bin
cp -b ./build/src/Jabba ./bin/Jabba
cd src
doxygen ../Profiling/Doxyfile > ../Profiling/doxygen_output
pmccabe -vt *.hpp *.cpp > ../Profiling/pmccabe_output
#doxygen ../Profiling/Doxyfile > ../Profiling/doxygen_output
#pmccabe -vt *.hpp *.cpp > ../Profiling/pmccabe_output
View
@@ -1 +1,3 @@
add_executable(Jabba GraphChain.cpp IntraNodeChain.cpp InterNodeChain.cpp Graph.cpp ReadLibrary.cpp SeedFinder.cpp AlignedRead.cpp Settings.cpp Nucleotide.cpp TString.cpp Alignment.cpp mummer/qsufsort.c mummer/sparseSA.cpp)
add_executable(Jabba GraphChain.cpp IntraNodeChain.cpp InterNodeChain.cpp Graph.cpp SeedFinder.cpp AlignedRead.cpp Settings.cpp Nucleotide.cpp TString.cpp Alignment.cpp mummer/qsufsort.c mummer/sparseSA.cpp ReadCorrection.cpp ReadCorrectionHandler.cpp library.cpp util.cpp)
target_link_libraries(Jabba readfile pthread)
add_subdirectory(readfile)
View
@@ -22,7 +22,6 @@
#include <deque>
#include <map>
#include <omp.h>
void Graph::addNode(std::string const &sequence, std::vector<int> const &in_edges,
std::vector<int> const &out_edges)
View
@@ -26,7 +26,7 @@
#include "Settings.hpp"
class Graph{
class Graph {
private:
int size_; //number of nodes
int k_; //size of k-mers, overlap between nodes is k-1
View
@@ -20,14 +20,13 @@
#include "GraphChain.hpp"
#include <sstream>
#include <omp.h> //openmp
#include "Settings.hpp"
#include "Seed.hpp"
#include "InterNodeChain.hpp"
#include "AlignedRead.hpp"
#include "Read.hpp"
#include "ReadCorrectionHandler.hpp"
void GraphChain::extractNbs(std::string const &arcs, std::vector<int> &lnbs, std::vector<int> &rnbs) {
std::istringstream iss(arcs);
@@ -49,137 +48,37 @@ void GraphChain::extractNbs(std::string const &arcs, std::vector<int> &lnbs, std
}
}
void GraphChain::readGraph(Input const &graph_input) {
void GraphChain::readGraph(ReadLibrary const &graph_input) {
std::cout << "Reading the graph... " << std::endl;
ReadLibrary graph(graph_input);
if (!graph.is_open()) {
std::cerr << std::endl << "Error: file " << graph_input.filename_
<< " is not open." << std::endl;
exit(1);
}
while (graph.has_next()) {
std::vector<Read> nodes;
graph.getReadBatch(nodes, 1);
Read read = nodes[0];
ReadFile *readFile = graph_input.allocateReadFile();
readFile->open(graph_input.getInputFilename());
while (true) {
ReadRecord record;
readFile->getNextRecord(record);
std::vector<int> lnbs;
std::vector<int> rnbs;
extractNbs(read.get_meta(), lnbs, rnbs);
graph_.addNode(read.get_sequence(), lnbs, rnbs);
extractNbs(record.preRead, lnbs, rnbs);
graph_.addNode(record.read, lnbs, rnbs);
if (!readFile->good())
break;
}
std::cout << "Done." << std::endl;
}
void GraphChain::alignReads() {
for (Input library : settings_.get_input()) {
std::cout << library.basename_ << std::endl;
alignReads(library);
}
}
void GraphChain::alignReads(Input const &library) {
//open read input files
ReadLibrary read_library(library);
std::string of_name = settings_.get_directory() + "/Jabba-"
+ library.basename_;
std::string ucf_name = settings_.get_directory() + "/Jabba_uncorrected-"
+ library.basename_;
int lastindex = of_name.find_last_of(".");
if (lastindex != std::string::npos) {
of_name = of_name.substr(0, lastindex) + ".fasta";
} else {
of_name = of_name + ".fasta";
}
std::ofstream read_output_file;
read_output_file.open(of_name.c_str());
std::ofstream read_uncorrected_file;
read_uncorrected_file.open(ucf_name.c_str());
if (!read_library.is_open()) {
std::cerr << std::endl << "Error: file " << library.filename_
<< " is not open." << std::endl;
exit(1);
}
if (!read_output_file.is_open()) {
std::cerr << std::endl << "Error: file " << of_name << " is not open." << std::endl;
exit(1);
}
if (!read_uncorrected_file.is_open()) {
std::cerr << std::endl << "Error: file " << ucf_name << " is not open." << std::endl;
exit(1);
}
//reads some reads from file and then processes them
int batch_nr = 0;
int read_count = 0;
while (read_library.has_next()) {
batch_nr++;
int batch_size = 1024;
std::cout << "Reading batch " << batch_nr << std::endl;
std::vector<Read> reads;
read_library.getReadBatch(reads, batch_size);
std::vector<std::string> corrected_reads = processBatch(reads,
read_count);
batch_size = reads.size();
for (int i = 0; i < batch_size; ++i) {
if (corrected_reads[i].size() > 0) {
read_output_file << ">" << reads[i].get_meta() << std::endl
<< corrected_reads[i] << std::endl;
} else {
read_uncorrected_file << ">" << reads[i].get_meta() << std::endl
<< reads[i].get_sequence() << std::endl;
}
}
read_count += batch_size;
}
read_uncorrected_file.close();
read_output_file.close();
}
std::vector<std::string> GraphChain::processBatch(
std::vector<Read> const &reads, int read_count)
{
std::cout << "Processing batch of size: " << reads.size() << std::endl;
int batch_size = reads.size();
std::vector<std::string> corrected_reads;
corrected_reads.resize(batch_size);
#pragma omp parallel
{
int reads_per_thread = (batch_size / omp_get_num_threads());
if(batch_size % omp_get_num_threads() >= omp_get_thread_num()){
++reads_per_thread;
}
int local_read_count = omp_get_thread_num() * reads_per_thread;
if(batch_size % omp_get_num_threads() == omp_get_thread_num()){
--reads_per_thread;
}
if(batch_size % omp_get_num_threads() < omp_get_thread_num()){
reads_per_thread += batch_size % omp_get_num_threads();
}
for (int i = 0; i < reads_per_thread; ++i) {
Read read = reads[local_read_count + i];
read.set_id(read_count + local_read_count + i);
//std::cout << "Processing read " << read.get_id() << std::endl;
//std::cerr << "Processing read " << read.get_id() << std::endl;
Alignment alignment(250, 30, 1, -4, -2, -3);
InterNodeChain iernc(read, graph_, settings_, alignment);
AlignedRead ar(read, settings_.get_output_mode());
corrected_reads[read.get_id() - read_count] = iernc.chainSeeds(ar);
}
}
return corrected_reads;
}
GraphChain::GraphChain(int argc, char * argv[]) :
settings_(argc, argv), graph_(settings_)
{
//read graph
Input graph = settings_.get_graph();
graph_.set_k(settings_.get_dbg_k());
readGraph(graph);
readGraph(settings_.get_graph());
graph_.init_seed_finder();
omp_set_num_threads(settings_.get_num_threads());
ReadCorrectionHandler rch(graph_, settings_);
rch.doErrorCorrection(settings_.get_libraries());
}
int main(int argc, char * argv[]) {
GraphChain gc(argc, argv);
gc.alignReads();
//gc.alignReads();
return 0;
}
View
@@ -21,9 +21,9 @@
#define GRAPHCHAIN_HPP
#include "Settings.hpp"
#include "ReadLibrary.hpp"
#include "SeedFinder.hpp"
#include "Graph.hpp"
#include "library.h"
class GraphChain {
private:
@@ -36,12 +36,11 @@ class GraphChain {
void extractNbs(std::string const &arcs,
std::vector<int> &lnbs, std::vector<int> &rnbs);
//read the graph file
void readGraph(Input const &graph_input);
void readGraph(ReadLibrary const &graph_input);
//process the given input file
void alignReads(Input const &library);
//void alignReads(Input const &library);
//process a batch of reads
std::vector<std::string> processBatch(
std::vector<Read> const &reads, int read_count);
//std::vector<std::string> processBatch(std::vector<Read> const &reads, int read_count);
public:
/*
* ctors
View
@@ -22,8 +22,8 @@
#include <string>
typedef enum {FASTQ, FASTA, FASTQ_GZ, FASTA_GZ, SAM, SAM_GZ, BAM, RAW, RAW_GZ}
FileType;
// typedef enum {FASTQ, FASTA, FASTQ_GZ, FASTA_GZ, SAM, SAM_GZ, BAM, RAW, RAW_GZ}
// FileType;
std::ostream &operator<<(std::ostream &out, const FileType &fileType);
struct Input {
View
@@ -0,0 +1,22 @@
#include "ReadCorrectionHandler.hpp"
#include "Alignment.hpp"
#include "InterNodeChain.hpp"
#include "Read.hpp"
#include "AlignedRead.hpp"
#include "Seed.hpp"
void ReadCorrection::correctRead(ReadRecord& record)
{
Read read(record.preRead, record.getRead());
read.set_id(0);
Alignment alignment(250, 30, 1, -4, -2, -3);
InterNodeChain iernc(read, graph_, settings_, alignment);
AlignedRead ar(read, settings_.get_output_mode());
record.read = iernc.chainSeeds(ar);
}
void ReadCorrection::correctChunk(vector<ReadRecord>& readChunk)
{
for (auto& it : readChunk)
correctRead(it);
}
@@ -0,0 +1,48 @@
#include "ReadCorrectionHandler.hpp"
void ReadCorrectionHandler::workerThread(size_t myID, LibraryContainer& libraries)
{
ReadCorrection readCorrection(graph_, settings_);
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// local storage of reads
std::vector<ReadRecord> myReadBuf;
while (true) {
size_t blockID, recordID;
bool result = libraries.getRecordChunk(myReadBuf, blockID, recordID);
readCorrection.correctChunk(myReadBuf);
if (result)
libraries.commitRecordChunk(myReadBuf, blockID, recordID);
else
break;
}
}
void ReadCorrectionHandler::doErrorCorrection(LibraryContainer& libraries)
{
const unsigned int& numThreads = settings_.get_num_threads();
std::cout << "Number of threads: " << numThreads << std::endl;
libraries.startIOThreads(settings_.get_thread_work_size(),
10 * settings_.get_thread_work_size() * settings_.get_num_threads(),
true);
// start worker threads
std::vector<std::thread> workerThreads(numThreads);
for (size_t i = 0; i < workerThreads.size(); i++)
workerThreads[i] = std::thread(&ReadCorrectionHandler::workerThread,
this, i, std::ref(libraries));
std::cout << "Worker threads started." << std::endl;
// wait for worker threads to finish
for_each(workerThreads.begin(), workerThreads.end(), std::mem_fn(&std::thread::join));
libraries.joinIOThreads();
}
ReadCorrectionHandler::ReadCorrectionHandler(Graph& g, const Settings& s) :
graph_(g), settings_(s)
{
Util::startChrono();
}
ReadCorrectionHandler::~ReadCorrectionHandler()
{
}
Oops, something went wrong.

0 comments on commit b448d66

Please sign in to comment.