Skip to content

Commit

Permalink
Spin off merge threads in bison_herd. Also, free excess memory alloca…
Browse files Browse the repository at this point in the history
…ted for the genome.
  • Loading branch information
dpryan79 committed Jan 27, 2015
1 parent 0e15ec0 commit 8579050
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 8 deletions.
9 changes: 8 additions & 1 deletion README.md
Expand Up @@ -295,12 +295,19 @@ even when limited to the same resources.
* Somehow, the methylation extractor was still defaulting to a minimum phred
score of 10, when the documentation said it was defaulting to 5.

* CRAM files can now be produced and processed. Both bison and bison_herd
* CRAM files can now be produced and processed. Both bison and `bison_herd`
will output in CRAM format if the -C option is given.

* The header @PG line is now rewritten to contain the actual command
executed and the bison/herd version.

* Excess space allocated to hold the genome is now returned.

* Output BAM/CRAM files can now be sorted on the fly. The method for this is
similar to that used by samtools, where temporary files are written and
then merged. This merge step is performed in parallel if multiple output
files are being written by `bison_herd`.

###0.3.3

* Allow mixed and discordant alignments.
Expand Down
2 changes: 2 additions & 0 deletions bison.h
Expand Up @@ -131,6 +131,7 @@ typedef struct {
int argc;
uint64_t maxMem;
int sort; //To sort or not
int multipleFiles; //For bison_herd, if 1, there are multiple input files
} t_config;

typedef struct {
Expand Down Expand Up @@ -699,6 +700,7 @@ typedef struct {
int offset; //File number offset for the temp files
char *opref; //output file name prefix
bam1_t **buf; //alignment buffer
htsFile *fp; //The output file
} alignmentBuffer;

/******************************************************************************
Expand Down
3 changes: 3 additions & 0 deletions common.c
Expand Up @@ -193,6 +193,9 @@ void read_genome() {
fclose(fp);
free(files[j]);
}
//free excess space
chromosomes.genome = realloc(chromosomes.genome, sizeof(char)*offset);

free(line);
free(fullpath);
free(files);
Expand Down
6 changes: 5 additions & 1 deletion herd/main.c
Expand Up @@ -187,6 +187,7 @@ int main(int argc, char *argv[]) {
config.argv = argv;
config.maxMem = 512<<20;
config.sort = 0;
config.multipleFiles = 0;
global_header = NULL;
unmapped1 = NULL;
unmapped2 = NULL;
Expand Down Expand Up @@ -360,7 +361,10 @@ int main(int argc, char *argv[]) {
}
free(tmp);
wordfree(&p_wordexp);
if(multi_file>1) config.reorder=1; //We need to force --reorder if there are multiple input files
if(multi_file>1) {
config.reorder = 1; //We need to force --reorder if there are multiple input files
config.multipleFiles = 1;
}
#ifdef DEBUG
if(multi_file>1) {
fprintf(stderr, "In DEBUG mode, you can't input multiple file-sets!\n");
Expand Down
44 changes: 40 additions & 4 deletions herd/writer.c
@@ -1,6 +1,21 @@
#include "../bison.h"
#include <sys/time.h>

int nMergeThreads;
pthread_t *tids;

void *mergeTempWrapper(void *pa) {
alignmentBuffer *abuf = (alignmentBuffer *) pa;

//Perform the actual merging
mergeTemp(abuf);

sam_close(abuf->fp);
free(abuf->buf);
free(abuf);
return NULL;
}

/******************************************************************************
*
* Update the CpG/CHG/CHH metrics according to the methylation calls in a read
Expand Down Expand Up @@ -73,6 +88,7 @@ void herd_setup(char *fname1, char *fname2) {
quit(2,-1);
}
if(!config.quiet) fprintf(stderr, "Alignments will be written to %s\n",config.outname);
//If we have multiple files AND we're sorting then the API only allows a single compression thread
if(config.n_compression_threads > 1) hts_set_threads(OUTPUT_BAM, config.n_compression_threads);
global_header = modifyHeader(global_header, config.argc, config.argv);
sam_hdr_write(OUTPUT_BAM, global_header);
Expand Down Expand Up @@ -102,6 +118,9 @@ void * bam_writer(void *a) {
alignmentBuffer *abuf = NULL;
assert(times);

nMergeThreads = 0;
tids = NULL;

//If we write output in the exact same order as the input, we need to know
//how many times to write from each master_processor_thread before going to the next
if(config.directional){
Expand All @@ -121,6 +140,7 @@ void * bam_writer(void *a) {
assert(abuf);
abuf->maxMem = config.maxMem;
abuf->opref = strdup(config.basename);
abuf->fp = OUTPUT_BAM;
}

while(nfinished < config.nmthreads) {
Expand All @@ -147,11 +167,21 @@ void * bam_writer(void *a) {
if(is_finished(to_write_node[i])) goto finished;
if(unmapped1 != NULL) pclose(unmapped1);
if(unmapped2 != NULL) pclose(unmapped2);
if(config.sort) mergeTemp(abuf); //This ends up blocking until the merge is complete
sam_close(OUTPUT_BAM);
if(config.sort) {
tids = realloc(tids, sizeof(pthread_t)*(++nMergeThreads));
assert(tids);
pthread_create(&tids[nMergeThreads-1], NULL, mergeTempWrapper, (void*) abuf);
}
current_file++;
herd_setup(fnames1[current_file], fnames2[current_file]);
if(config.sort) abuf->opref = strdup(config.basename);
if(config.sort) {
//Reinitialize the buffer
abuf = calloc(1, sizeof(alignmentBuffer));
assert(abuf);
abuf->maxMem = config.maxMem;
abuf->opref = strdup(config.basename);
abuf->fp = OUTPUT_BAM;
}
i=0;
j=0;
}
Expand Down Expand Up @@ -206,9 +236,15 @@ void * bam_writer(void *a) {
//This isn't elegant, but...
finished:
if(config.sort) {
mergeTemp(abuf);
if(!config.quiet) {
fprintf(stderr, "Waiting for merge threads to complete...");
fflush(stderr);
}
mergeTemp(abuf); //Can't split off a new thread, since the close function will break!
free(abuf->buf);
free(abuf);
for(i=0; i<nMergeThreads; i++) pthread_join(tids[i], 0);
if(!config.quiet) fprintf(stderr, "\n");
}

if(t_reads != 0) print_metrics(); //There seems to be a race condition for the last sample in a list. This gets around that.
Expand Down
1 change: 1 addition & 0 deletions master.c
Expand Up @@ -1152,6 +1152,7 @@ void * master_processer_thread(void *a) {
assert(abuf);
abuf->maxMem = config.maxMem;
abuf->opref = strdup(config.basename);
abuf->fp = OUTPUT_BAM;
}

//Metrics
Expand Down
4 changes: 2 additions & 2 deletions sort.c
Expand Up @@ -117,7 +117,7 @@ alignmentBuffer *pushAlignmentBuffer(alignmentBuffer *buf, bam1_t *b) {
return buf;
}

//Merge stuff, output is to OUTPUT_BAM!
//Merge stuff
void mergeTemp(alignmentBuffer *buf) {
int i, found, first, nFinished = 0, nFiles = buf->offset;
char *opref = buf->opref;
Expand Down Expand Up @@ -162,7 +162,7 @@ void mergeTemp(alignmentBuffer *buf) {
}
}
assert(first >= 0);
sam_write1(OUTPUT_BAM, global_header, bs[first]);
sam_write1(buf->fp, global_header, bs[first]);
if(sam_read1(fps[first], global_header, bs[first]) <= 1) {
bam_destroy1(bs[first]);
bs[first] = NULL;
Expand Down

0 comments on commit 8579050

Please sign in to comment.