Skip to content

Commit

Permalink
Merge pull request #92 from HumanCellAtlas/partition-fastq
Browse files Browse the repository at this point in the history
added option to output-format as either FASTQ or BAM
  • Loading branch information
khajoue2 committed Jan 31, 2022
2 parents 047be89 + e300b33 commit e6caa88
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 16 deletions.
8 changes: 6 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ LABEL maintainer="Ambrose J. Carr <acarr@broadinstitute.org>" \
COPY requirements.txt .

RUN apt-get update && apt-get install -y patch && apt-get install -y libhdf5-dev && apt-get install -y vim

RUN pip3 install --upgrade pip
RUN pip3 install -r requirements.txt

RUN mkdir /sctools/
Expand All @@ -26,14 +26,18 @@ RUN cd /sctools/fastqpreprocessing &&\
tar -jxvf htslib-${htslib_version}.tar.bz2 &&\
mv libStatGen-${libStatGen_version} libStatGen

RUN cd /sctools/fastqpreprocessing &&\
wget http://www.cs.unc.edu/Research/compgeom/gzstream/gzstream.tgz &&\
tar -zxvf gzstream.tgz

RUN cd /sctools/fastqpreprocessing &&\
patch -f libStatGen/fastq/FastQFile.cpp patches/FastQFile.cpp.patch &&\
patch -f libStatGen/general/BgzfFileType.cpp patches/BgzfFileType.cpp.patch &&\
patch libStatGen/Makefile patches/Makefile.patch &&\
patch libStatGen/general/Makefile patches/general.Makefile.patch &&\
make -C libStatGen

RUN cd /sctools/fastqpreprocessing && make -C htslib-${htslib_version}/
RUN cd /sctools/fastqpreprocessing && make -C htslib-${htslib_version}/ && make -C gzstream

RUN cd /sctools/fastqpreprocessing && mkdir bin src/obj && make -C src/ install

Expand Down
7 changes: 4 additions & 3 deletions fastqpreprocessing/src/Makefile
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
IDIR =../libStatGen/include
IDIR2 =../htslib-1.13
IDIR3 =../gzstream

CC = g++ -std=c++17 -fPIC -DHTSLIB -Wall -O4 -Wwrite-strings

CFLAGS = -I$(IDIR) -L../libStatGen
CFLAGS = -I$(IDIR) -L../libStatGen -L../gzstream

ODIR=obj
LDIR =../libStatGen/

LIBS = -lStatGen -lz -lpthread -lstdc++fs
LIBS = -lStatGen -lz -lpthread -lstdc++fs -lgzstream

_DEPS = fastqprocess.h utilities.h input_options.h

Expand All @@ -32,7 +33,7 @@ _COMMON_OBJ = utilities.o input_options.o
OBJ = $(patsubst %,$(ODIR)/%,$(_COMMON_OBJ))

$(ODIR)/%.o: %.cpp $(_DEPS)
$(CC) -c -o $@ $< -I$(IDIR) -I. -I$(IDIR2)
$(CC) -c -o $@ $< -I$(IDIR) -I. -I$(IDIR2) -I$(IDIR3)

$(TARGET1): $(OBJ) $(TARGET1_OBJ)
$(CC) -o $@ $^ $(CFLAGS) $(LIBS)
Expand Down
8 changes: 5 additions & 3 deletions fastqpreprocessing/src/datatypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ typedef struct _tags_holder {
char *allocated_memory(int size) {
return 0;
}

char *double_memory() {
return 0;
}
Expand Down Expand Up @@ -106,9 +105,13 @@ typedef struct _input_options_fastqprocess {

// I1, R1 and R2 files name
std::vector<std::string> I1s, R1s, R2s;
// Barcode white list file

// Barcode white list file
std::string white_list_file;

// Output format
std::string output_format;

// chemistry dependent (V2/V3) barcode and UMI length
int barcode_length, umi_length;

Expand All @@ -119,7 +122,6 @@ typedef struct _input_options_fastqprocess {
std::string sample_id;
} INPUT_OPTIONS_FASTQPROCESS;


/**
* @brief Reads the options to the fastqprocess program
*
Expand Down
89 changes: 87 additions & 2 deletions fastqpreprocessing/src/fastqprocess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

#include "fastqprocess.h"
#include "utilities.h"

#include <gzstream.h>
#include <iostream>
#include <fstream>

#include <cstdint>

/// maximum file length
Expand Down Expand Up @@ -104,7 +109,15 @@ void process_inputs(const INPUT_OPTIONS_FASTQPROCESS &options,
// execute the bam file writers threads
std::thread *writers = new std::thread[num_files];
for (int i = 0; i < num_files; i++) {
writers[i] = std::thread(bam_writers, i, samrecord_data);
if (options.output_format=="BAM") {
writers[i] = std::thread(bam_writers, i, samrecord_data);
} else if (options.output_format=="FASTQ") {
writers[i] = std::thread(fastq_writers, i, samrecord_data);
} else {
std::cout << "ERROR: Output-format must be either FASTQ or BAM\n";
std::cerr << "ERROR: Output-format must be either FASTQ or BAM\n";
exit(1);
}
}

// execute the fastq readers threads
Expand Down Expand Up @@ -159,6 +172,78 @@ void process_inputs(const INPUT_OPTIONS_FASTQPROCESS &options,
delete [] writers;
}

/** @copydoc bam_writers */
void fastq_writers(int windex, SAM_RECORD_BINS *samrecord_data) {
std::string outputfile;
char buf[MAX_FILE_LENGTH];

// open to write the outputfile
// name of the output R1 fastq file
sprintf(buf, "fastq_R1_%d.fastq.gz", windex);
outputfile = buf;
//ofstream r1_out(outputfile.c_str(), ios::out);
ogzstream r1_out(outputfile.c_str());
//if (!r1_out.is_open()) {
if (!r1_out.good()) {
error_message("ERROR: Failed open R1 fastq file\n");
exit(1);
}

// name of the output R1 fastq file
sprintf(buf, "fastq_R2_%d.fastq.gz", windex);
outputfile = buf;
//ofstream r2_out(outputfile.c_str(), ios::out);
ogzstream r2_out(outputfile.c_str());
//if (!r2_out.is_open()) {
if (!r2_out.good()) {
error_message("ERROR: Failed open R2 fastq file\n");
exit(1);
}

// keep writing forever, until there is a flag to stop
while (true) {
// wait until some data is ready from a reader thread
if (sem_wait(&semaphores[windex]) == -1)
error("sem_wait:semaphores");

// write out the record buffers for the reader thread "active_thread_num"
// that signalled that buffer is ready to be written
SamRecord *samRecord =
samrecord_data->samrecords[samrecord_data->active_thread_num];
// go through the index of the samrecords that are stored for the current
// writer, i.e., "windex" or the corresponding BAM file

for (auto index : samrecord_data->file_index[samrecord_data->active_thread_num][windex]) {
// samOut.WriteRecord(samHeader, samRecord[index]);
r1_out << "@" << samRecord[index].getReadName() << std::endl
<< samRecord[index].getString("CR").c_str() << samRecord[index].getString("UR") << std::endl
<< "+" << std::endl
<< samRecord[index].getString("CY") << samRecord[index].getString("UY") << std::endl;
}

for (auto index : samrecord_data->file_index[samrecord_data->active_thread_num][windex]) {
// samOut.WriteRecord(samHeader, samRecord[index]);
r2_out << "@" << samRecord[index].getReadName() << std::endl
<< samRecord[index].getSequence() << std::endl
<< "+" << std::endl
<< samRecord[index].getQuality() << std::endl;
}

// lets the reads thread know that I am done writing the
// buffer that are destined to be my file
if (sem_post(&semaphores_workers[windex]) == -1)
error("sem_post: semaphores_workers");

// time to stop variable is valid
if (samrecord_data->stop) break;
}

// close the fastq files
r1_out.close();
r2_out.close();
}


/** @copydoc bam_writers */
void bam_writers(int windex, SAM_RECORD_BINS *samrecord_data) {
SamFile samOut;
Expand Down Expand Up @@ -232,7 +317,7 @@ void fillSamRecord(SamRecord *samRecord, FastQFile &fastQFileI1,
FastQFile &fastQFileR1, FastQFile &fastQFileR2,
unsigned int barcode_length, unsigned int umi_length,
bool has_I1_file_list) {
// check the sequence names matching
// check the sequence names matching
std::string a = std::string(fastQFileR1.myRawSequence.c_str());
std::string b = std::string(fastQFileR1.myQualityString.c_str());

Expand Down
13 changes: 13 additions & 0 deletions fastqpreprocessing/src/fastqprocess.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,17 @@ void process_file(int32_t tindex, std::string filename, String filename1, \
* @param samrecord_bins bins for samrecords from the reader threads
*/
void bam_writers(int32_t windex, SAM_RECORD_BINS *samrecord_bins);

/**
* @brief Function for the writer thread
*
* @detail
* Dependeing on the number of output bam files there are as many
* writer thread as there are output bam files. Each writer thread
* writers into only one bam file
*
* @param windex index of the writer thread
* @param samrecord_bins bins for samrecords from the reader threads
*/
void fastq_writers(int32_t windex, SAM_RECORD_BINS *samrecord_bins);
#endif
18 changes: 13 additions & 5 deletions fastqpreprocessing/src/input_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@
using namespace std;
namespace fs = std::experimental::filesystem;

void error_message(const char *msg) {
std::cerr << msg;
}

/** @copydoc read_options_tagsort */
void read_options_tagsort(int argc, char **argv, INPUT_OPTIONS_TAGSORT &options) {
int c;
Expand Down Expand Up @@ -241,6 +237,7 @@ void read_options_fastqprocess(int argc, char **argv, INPUT_OPTIONS_FASTQPROCESS
{"R1", required_argument, 0, 'R'},
{"R2", required_argument, 0, 'r'},
{"white-list", required_argument, 0, 'w'},
{"output-format", required_argument, 0, 'F'},
{0, 0, 0, 0}
};

Expand All @@ -255,13 +252,14 @@ void read_options_fastqprocess(int argc, char **argv, INPUT_OPTIONS_FASTQPROCESS
"R1 [required]",
"R2 [required]",
"whitelist (from cellranger) of barcodes [required]",
"output-format : either FASTQ or BAM [required]",
};


/* getopt_long stores the option index here. */
int option_index = 0;
while ((c = getopt_long(argc, argv,
"b:u:B:s:I:R:r:w:v",
"b:u:B:s:I:R:r:w:F:v",
long_options,
&option_index)) !=- 1
)
Expand Down Expand Up @@ -304,6 +302,9 @@ void read_options_fastqprocess(int argc, char **argv, INPUT_OPTIONS_FASTQPROCESS
case 'w':
options.white_list_file = string(optarg);
break;
case 'F':
options.output_format = string(optarg);
break;
case '?':
case 'h':
i = 0;
Expand Down Expand Up @@ -373,6 +374,13 @@ void read_options_fastqprocess(int argc, char **argv, INPUT_OPTIONS_FASTQPROCESS
exit_with_error = true;
}

// output options must be FASTQ or BAM
if (options.output_format!="FASTQ" && options.output_format!="BAM") {
std::cout << "ERROR: Output-format must be either FASTQ or BAM\n";
std::cerr << "ERROR: Output-format must be either FASTQ or BAM\n";
exit_with_error = true;
}

// barcode length must be positive
if (options.barcode_length <= 0) {
std::cout << "ERROR: Barcode length must be a positive integer\n";
Expand Down
7 changes: 7 additions & 0 deletions fastqpreprocessing/src/utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ int64_t get_num_blocks(const INPUT_OPTIONS_FASTQPROCESS &options) {
if (options.I1s.size()) {
tot_size += filesize(options.I1s[i].c_str());
}
printf("file %s : %ld bytes\n", options.R1s[i].c_str(), filesize(options.R1s[i].c_str()));
tot_size += filesize(options.R1s[i].c_str());
tot_size += filesize(options.R2s[i].c_str());
}
Expand Down Expand Up @@ -148,3 +149,9 @@ std::vector<std::string> read_lines(const std::string &file_name) {
return lines;
}

/** @copydoc error_message **/
void error_message(const char *msg) {
std::cerr << msg;
}


12 changes: 11 additions & 1 deletion fastqpreprocessing/src/utilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
#include <fstream>
#include <algorithm>
#include <experimental/filesystem>

#include "datatypes.h"
using namespace std;

/**
* @brief Compute the number of bam files
*
Expand Down Expand Up @@ -109,4 +110,13 @@ inline void freeStlContainer(T& p_container)
*/
}

/**
* @brief this function prints the message to the stderr
*
* @param msg: the error message
*/

void error_message(const char *msg);


#endif

0 comments on commit e6caa88

Please sign in to comment.