diff --git a/.gitignore b/.gitignore index 147772b..01f4618 100644 --- a/.gitignore +++ b/.gitignore @@ -100,3 +100,7 @@ ENV/ # mypy .mypy_cache/ *.DS_Store + +# do not check in the executable and bam file +fastqpreprocessing/src/fastqprocess +src/sctools/test/data/bam_with_tags_test.bam diff --git a/Dockerfile b/Dockerfile index ec5bab5..65e8baa 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,6 +5,9 @@ LABEL maintainer="Ambrose J. Carr " \ description="python 3.7.7 with pysam, sctools, requests, and a basic science stack" COPY requirements.txt . + +RUN apt-get update && apt-get install -y patch && apt-get install -y libhdf5-dev + RUN pip3 install -r requirements.txt RUN mkdir /sctools/ @@ -13,6 +16,24 @@ COPY . /sctools RUN pip3 install /sctools +ARG libStatGen_version="1.0.14" + +RUN wget https://github.com/HumanCellAtlas/sctools/archive/kmk-fastqprocessing.zip + +RUN unzip kmk-fastqprocessing.zip && \ + cd sctools-kmk-fastqprocessing/fastqpreprocessing &&\ + wget https://github.com/statgen/libStatGen/archive/v${libStatGen_version}.tar.gz &&\ + tar -zxvf v${libStatGen_version}.tar.gz &&\ + mv libStatGen-${libStatGen_version} libStatGen &&\ + patch libStatGen/fastq/FastQFile.cpp patches/FastQFile.cpp.patch &&\ + patch libStatGen/Makefile patches/Makefile.patch &&\ + patch libStatGen/general/Makefile patches/general.Makefile.patch &&\ + make -C libStatGen &&\ + mkdir src/obj &&\ + make -C src/ + +RUN cp sctools-kmk-fastqprocessing/fastqpreprocessing/src/fastqprocess /usr/local/bin/ + WORKDIR usr/local/bin/sctools diff --git a/fastqpreprocessing/.gitignore b/fastqpreprocessing/.gitignore new file mode 100644 index 0000000..868ec69 --- /dev/null +++ b/fastqpreprocessing/.gitignore @@ -0,0 +1,8 @@ +*~ +*.o +*.a +*.bak +dox/ +dox_errors.txt +*# +*nohup.txt diff --git a/fastqpreprocessing/patches/FastQFile.cpp.patch b/fastqpreprocessing/patches/FastQFile.cpp.patch new file mode 100644 index 0000000..884fc7d --- /dev/null +++ b/fastqpreprocessing/patches/FastQFile.cpp.patch @@ -0,0 +1,18 @@ +--- libStatGen-1.0.14/fastq/FastQFile.cpp 2015-07-08 20:03:23.000000000 +0000 ++++ ../libStatGen/FastQFile.cpp 2020-09-17 19:35:48.797593411 +0000 +@@ -489,6 +489,7 @@ + // Check to see if the sequenceIdentifier is a repeat by adding + // it to the set and seeing if it already existed. + std::pair::iterator,bool> insertResult; ++ /* + insertResult = + myIdentifierMap.insert(std::make_pair(mySequenceIdentifier.c_str(), + myLineNum)); +@@ -505,6 +506,7 @@ + reportErrorOnLine(); + return(false); + } ++ */ + } + + // Valid, return true. diff --git a/fastqpreprocessing/patches/Makefile.patch b/fastqpreprocessing/patches/Makefile.patch new file mode 100644 index 0000000..0e03b43 --- /dev/null +++ b/fastqpreprocessing/patches/Makefile.patch @@ -0,0 +1,22 @@ +--- libStatGen-1.0.14/Makefile 2015-07-08 20:03:23.000000000 +0000 ++++ ../libStatGen/Makefile 2020-09-03 14:15:41.904210140 +0000 +@@ -2,7 +2,8 @@ + + .PHONY: package + +-SUBDIRS=general bam fastq glf samtools vcf ++#SUBDIRS=general bam fastq glf samtools vcf ++SUBDIRS=general fastq samtools bam + + include Makefiles/Makefile.base + +@@ -16,7 +17,8 @@ + general: samtools + + # other subdirectories depend on general +-bam fastq glf vcf: general ++#bam fastq glf vcf: general ++bam fastq : general + + RELEASE_FILE?=libStatGen.$(VERSION).tgz + diff --git a/fastqpreprocessing/patches/general.Makefile.patch b/fastqpreprocessing/patches/general.Makefile.patch new file mode 100644 index 0000000..51d153c --- /dev/null +++ b/fastqpreprocessing/patches/general.Makefile.patch @@ -0,0 +1,11 @@ +--- libStatGen-1.0.14/general/Makefile 2020-09-17 20:29:00.320563968 +0000 ++++ ../libStatGen/Makefile.general 2020-09-17 20:57:47.982915972 +0000 +@@ -8,7 +8,7 @@ + # an error, but allow unused results and variables for the + # time being. + # +- USER_WARNINGS ?= -Werror $(shell if [ X$(CCVERSION) \> X4.2.0 ] ; then echo " -Wno-strict-overflow" ; fi) ++ USER_WARNINGS ?= $(shell if [ X$(CCVERSION) \> X4.2.0 ] ; then echo " -Wno-strict-overflow" ; fi) + #-Wno-strict-overflow + # -Wno-unused-variable $(shell if [ X$(CCVERSION) \> X4.2.0 ] ; then echo " -Wno-unused-result" ; fi) + endif diff --git a/fastqpreprocessing/src/Makefile b/fastqpreprocessing/src/Makefile new file mode 100644 index 0000000..2328064 --- /dev/null +++ b/fastqpreprocessing/src/Makefile @@ -0,0 +1,28 @@ +IDIR =../libStatGen/include + +CC = g++ -std=c++17 -O4 +CFLAGS = -I$(IDIR) -L../libStatGen + +ODIR=obj +LDIR =../libStatGen/ + +TARGET = fastqprocess +LIBS = -lStatGen -lz -lpthread -lstdc++fs + +_DEPS = fastqprocess.h utilities.h +#DEPS = $(patsubst %,$(IDIR)/%,$(_DEPS)) + +_OBJ = fastqprocess.o utilities.o main.o +OBJ = $(patsubst %,$(ODIR)/%,$(_OBJ)) + + +$(ODIR)/%.o: %.cpp $(DEPS) + $(CC) -c -o $@ $< $(CFLAGS) + +$(TARGET): $(OBJ) $(_DEPS) + $(CC) -o $@ $^ $(CFLAGS) $(LIBS) + +.PHONY: clean + +clean: + rm -f $(ODIR)/*.o *~ core $(INCDIR)/*~ diff --git a/fastqpreprocessing/src/example-run.sh b/fastqpreprocessing/src/example-run.sh new file mode 100755 index 0000000..a8fa175 --- /dev/null +++ b/fastqpreprocessing/src/example-run.sh @@ -0,0 +1,13 @@ +#!/bin/bash +./fastqprocess --verbose \ + --bam-size 0.001 \ + --barcode-length 16 \ + --umi-length 10 \ + --sample-id L8TX \ + --white-list ../../../data/L8TX/737K-august-2016.txt \ + --I1 ../../../data/L8TX/A_I1.fastq.gz \ + --R1 ../../../data/L8TX/A_R1.fastq.gz \ + --R2 ../../../data/L8TX/A_R2.fastq.gz \ + --I1 ../../../data/L8TX/B_I1.fastq.gz \ + --R1 ../../../data/L8TX/B_R1.fastq.gz \ + --R2 ../../../data/L8TX/B_R2.fastq.gz \ diff --git a/fastqpreprocessing/src/fastqprocess.cpp b/fastqpreprocessing/src/fastqprocess.cpp new file mode 100644 index 0000000..93d6e3b --- /dev/null +++ b/fastqpreprocessing/src/fastqprocess.cpp @@ -0,0 +1,496 @@ +/** + * @file fastqprocess.cpp + * @brief functions for file processing + * @author Kishori Konwar + * @date 2020-08-27 + ***********************************************/ + +#include "fastqprocess.h" +#include "utilities.h" +#include + +/// maximum file length +#define MAX_FILE_LENGTH 500 + +/// number of samrecords per buffer in each reader +#define SAMRECORD_BUFFER_SIZE 100000 + +/// mutex +std::mutex mtx; + +/// array of semaphores for readers +sem_t *semaphores = 0; + +/// array of semaphores for rwriters +sem_t *semaphores_workers = 0; + +/** @copydoc create_record_holders */ +SAM_RECORD_BINS * create_samrecord_holders(int16_t nthreads, + int32_t block_size, + const std::string sample_id, + int16_t num_files) { + + // samrecord data to hold buffer for the reader + SAM_RECORD_BINS *samrecord_data = new SAM_RECORD_BINS; + if ((samrecord_data->samrecords = new SamRecord *[nthreads]) == 0) { + std::cerr << "Failed to allocate memory for the " + "samRecords pointer arrays" << std::endl; + return 0; + } + + // one samrecord per reader thread to re-use repeatedly while writing + for (int i = 0; i < nthreads; i++) { + if ((samrecord_data->samrecords[i] = new SamRecord[block_size]) == 0) { + std::cerr << "Failed to allocate memory for the " + "samRecords" << std::endl; + return 0; + } + } + + // for each reader thread keep the number of records to write out + if ((samrecord_data->num_records = new int[nthreads]) == 0) { + std::cerr << "Failed to allocate memory for the num " + "records array" << std::endl; + return 0; + } + + // for each thread we allocate an array of indices (to final output files) + if ((samrecord_data->file_index = new vector *[nthreads]) == 0) { + std::cerr << "Failed to allocate memory for the pointer for " + "array of vectors" << std::endl; + return 0; + } + + // for each read thread allocate the index vector + for (int i = 0; i < nthreads; i++) { + if ((samrecord_data->file_index[i] = new vector[num_files]) == 0) { + std::cerr << "Failed to allocate memory for the vectors for " + "index of file" << std::endl; + return 0; + } + } + + // set the remaining data + samrecord_data->block_size = block_size; + samrecord_data->sample_id = sample_id; + samrecord_data->num_files = num_files; + samrecord_data->stop = false; + return samrecord_data; +} + +/** @copydoc process_inputs */ +void process_inputs(const INPUT_OPTIONS &options, + const WHITE_LIST_DATA *white_list_data) { + + int block_size = SAMRECORD_BUFFER_SIZE; + + // number of files based on the input size + int num_files = get_num_blocks(options); + // create the data for the threads + SAM_RECORD_BINS *samrecord_data = + create_samrecord_holders(options.R1s.size(), block_size, + options.sample_id, num_files); + semaphores_workers = new sem_t[num_files]; + for (int i = 0; i < num_files; i++) { + sem_init((semaphores_workers + i), 0, 0); + } + + // create the bam file writers semaphores + semaphores = new sem_t[num_files]; + for (int i = 0; i < num_files; i++) { + sem_init((semaphores + i), 0, 0); + } + + // execute the bam file writers threads + std::thread *writers = new std::thread[num_files]; + for (int i = 0; i < num_files; i++) { + writers[i] = std::thread(bam_writers, i, samrecord_data); + } + + // execute the fastq readers threads + std::thread *readers = new std::thread[options.R1s.size()]; + for (int i = 0; i < options.R1s.size(); i++) { + std::string I1; + // if there is no I1 file then send an empty file name + if (options.I1s.size() > 0) { + I1 = std::string(options.I1s[i].c_str()); + } else { + I1 = std::string(""); + } + readers[i] = std::thread(process_file, i, I1, + options.R1s[i].c_str(), options.R2s[i].c_str(), + options.barcode_length, options.umi_length, + white_list_data, samrecord_data); + } + + // every reader thread joins. + for (int i = 0; i < options.R1s.size(); i++) { + readers[i].join(); + } + // set the stop flag for the writers + samrecord_data->stop = true; + + // ask the writers to make one more loop in the while loop + for (int j = 0; j < samrecord_data->num_files; j++) { + if (sem_post(&semaphores[j]) == -1) + error("sem_post: semaphores"); + } + + // wait for the writers to stop after they have seen the stop flag + for (int i = 0; i < samrecord_data->num_files; i++) { + writers[i].join(); + } + + // destroy the semaphores + for (int i = 0; i < samrecord_data->num_files; i++) { + sem_destroy(&semaphores[i]); + } + + // destroy the semaphores for semaphores_workers + for (int i = 0; i < samrecord_data->num_files; i++) { + sem_destroy(&semaphores_workers[i]); + } + + // delete the records + delete [] samrecord_data->num_records; + + // delete reader and writer threads + delete [] readers; + delete [] writers; +} + +/** @copydoc bam_writers */ +void bam_writers(int windex, SAM_RECORD_BINS *samrecord_data) { + SamFile samOut; + std::string outputfile; + + // name of the output file + char buf[MAX_FILE_LENGTH]; + sprintf(buf, "subfile_%d.bam", windex); + outputfile = buf; + + // open to write the outputfile + samOut.OpenForWrite(outputfile.c_str()); + + // Write the sam header. + SamFileHeader samHeader; + + // add the HD tags for the header + samHeader.setHDTag("VN", "1.6"); + samHeader.setHDTag("SO", "unsorted"); + + // add the RG group tags + SamHeaderRG *headerRG = new SamHeaderRG; + headerRG->setTag("ID", "A"); + headerRG->setTag("SM", samrecord_data->sample_id.c_str()); + samHeader.addRG(headerRG); + + // add the header to the output bam + samOut.WriteHeader(samHeader); + + // keep writing forever, until there is a flag to stop + while (true) { + // wait until some data is ready from a reader thread + if (sem_wait(&semaphores[windex]) == -1) + error("sem_wait:semaphores"); + + // write out the record buffers for the reader thread "active_thread_num" + // that signalled that buffer is ready to be written + SamRecord *samRecord = + samrecord_data->samrecords[samrecord_data->active_thread_num]; + // go through the index of the samrecords that are stored for the current + // writer, i.e., "windex" or the corresponding BAM file + for (auto index : samrecord_data->file_index[samrecord_data->active_thread_num][windex]) { + samOut.WriteRecord(samHeader, samRecord[index]); + } + + // lets the reads thread know that I am done writing the + // buffer that are destined to be my file + if (sem_post(&semaphores_workers[windex]) == -1) + error("sem_post: semaphores_workers"); + + // time to stop variable is valid + if (samrecord_data->stop) break; + } + + // close the bamfile + samOut.Close(); +} + +/** + * @brief fillSamRecord fill a SamRecord with the sequence and TAGs data + * + * @param samRecord the SamRecord to fill with the data + * @param fastQFileI1 the I1 fastq file + * @param fastQFileR1 the R1 fastq file + * @param fastQFileR2 the R2 fastq file + * @param samRecord the SamRecord to fill with the data + * @param barcode_length length of UMI + * @param has_I1_file_list a boolean indicating if I1 files are avaiable +*/ +void fillSamRecord(SamRecord *samRecord, FastQFile &fastQFileI1, + FastQFile &fastQFileR1, FastQFile &fastQFileR2, + unsigned int barcode_length, unsigned int umi_length, + bool has_I1_file_list) { + // check the sequence names matching + std::string a = std::string(fastQFileR1.myRawSequence.c_str()); + std::string b = std::string(fastQFileR1.myQualityString.c_str()); + + // extract the raw barcode and UMI + std::string barcode = a.substr(0, barcode_length); + std::string UMI = a.substr(barcode_length, umi_length); + + // extract raw barcode and UMI quality string + std::string barcodeQString = b.substr(0, barcode_length); + std::string UMIQString = b.substr(barcode_length, umi_length); + + // reset the samrecord + samRecord->resetRecord(); + + // add read group and the sam flag + samRecord->addTag("RG", 'Z', "A"); + samRecord->setFlag(4); + + // add idenfifier, sequence and quality score of the alignments + samRecord->setReadName(fastQFileR2.mySequenceIdentifier.c_str()); + samRecord->setSequence(fastQFileR2.myRawSequence.c_str()); + samRecord->setQuality(fastQFileR2.myQualityString.c_str()); + + // add barcode and quality + samRecord->addTag("CR", 'Z', barcode.c_str()); + samRecord->addTag("CY", 'Z', barcodeQString.c_str()); + + // add UMI + samRecord->addTag("UR", 'Z', UMI.c_str()); + samRecord->addTag("UY", 'Z', UMIQString.c_str()); + + // add raw sequence and quality sequence for the index + if (has_I1_file_list) { + std::string indexseq = std::string(fastQFileI1.myRawSequence.c_str()); + std::string indexSeqQual = std::string(fastQFileI1.myQualityString.c_str()); + samRecord->addTag("SR", 'Z', indexseq.c_str()); + samRecord->addTag("SY", 'Z', indexSeqQual.c_str()); + } +} +/** + @brief getBukcetIndex computes the index for the bucket (of bam file) + * for a barcode and also add the correct barcode to the SamRecord + * + * @param barcode the barcode for which a bucket is computed + * @param samRecord the partially filled samrecord to add the corrected barcode + * @param white_list_data the white list data for barcode correction + * @param samrecord_data shared data for the various readers + * @param n_barcode_corrected a variable keeping track of the number of barcodes corrected + * @param n_barcode_correct a the number of barcodes so far are already correct + * @param n_barcode_errrosv keeping track of the number of barcodes that are incorrectible + * + * @return the bucket number where the current SamRecord should go to +*/ +int32_t getBucketIndex(const std::string &barcode, SamRecord *samRecord, + const WHITE_LIST_DATA* white_list_data, SAM_RECORD_BINS *samrecord_data, + int *n_barcode_corrected, int *n_barcode_correct, int *n_barcode_errors) { + + std::string correct_barcode; + // bucket barcode is used to pick the target bam file + // This is done because in the case of incorrigible barcodes + // we need a mechanism to uniformly distribute the alignments + // so that no bam is oversized to putting all such barcode less + // sequences into one particular. Incorregible barcodes are simply + // added withouth the CB tag + std::string bucket_barcode; + if (white_list_data->mutations.find(barcode) != + white_list_data->mutations.end()) { + + // if -1 then the raw barcode is the correct barcode + if (white_list_data->mutations.at(barcode) == -1) { + correct_barcode = barcode; + *n_barcode_correct += 1; + } else { + // it is a 1-mutation of some whitelist barcode so get the + // barcode by indexing into the vector of whitelist barcodes + correct_barcode = + white_list_data->barcodes.at(white_list_data->mutations.at(barcode)); + *n_barcode_corrected += 1; + } + // is used for computing the file index + bucket_barcode = correct_barcode; + + // corrected barcode should be added to the samrecord + samRecord->addTag("CB", 'Z', correct_barcode.c_str()); + } else { // not possible to correct the raw barcode + *n_barcode_errors += 1; + bucket_barcode = barcode; + } + // destination bam file index computed based on the bucket_barcode + int32_t bucket = std::hash{}(bucket_barcode.c_str()) % + samrecord_data->num_files; + + return bucket; +} + +/** + * @brief submit_block_tobe_written this function is for a reader to send + * signal to the writer to empty the current list of SamRecords that are + * ready to be written out + * + * @param samrecord_data the samrecord data + * @param tindex the index of the thread +*/ +void submit_block_tobe_written(SAM_RECORD_BINS *samrecord_data, int tindex) { + mtx.lock(); + + // it sets itself as the active thread who wants the + // readers to clear the data + samrecord_data->active_thread_num = tindex; + + // send a signal to every writer thread, i.e., write out data + // data to any file where the samheader should be written to + for (int32_t j = 0; j < samrecord_data->num_files; j++) { + if (sem_post(&semaphores[j]) == -1) + error("sem_post: semaphores"); + } + + // there is where I wait while the writers are writing + for (int32_t j = 0; j < samrecord_data->num_files; j++) { + if (sem_wait(&semaphores_workers[j]) == -1) + error("sem_wait: semaphores_workers"); + } + + // they are done writing + for (int j = 0; j < samrecord_data->num_files; j++) { + samrecord_data->file_index[tindex][j].clear(); + } + + // not records to write in the current + samrecord_data->num_records[tindex] = 0; + + // release the mutex, so that other readers might want to write + mtx.unlock(); +} + +void process_file(int tindex, std::string filenameI1, String filenameR1, + String filenameR2, unsigned int barcode_length, + unsigned int umi_length, + const WHITE_LIST_DATA* white_list_data, + SAM_RECORD_BINS *samrecord_data) { + + /// setting the shortest sequence allowed to be read + FastQFile fastQFileI1(4, 4); + FastQFile fastQFileR1(4, 4); + FastQFile fastQFileR2(4, 4); + + // open the I1 file + bool has_I1_file_list = true; + if (!filenameI1.empty()) { + if (fastQFileI1.openFile(String(filenameI1.c_str()), BaseAsciiMap::UNKNOWN) != + FastQStatus::FASTQ_SUCCESS) { + std::cerr << "Failed to open file: " << filenameI1.c_str(); + abort(); + } + } else { + has_I1_file_list = false; + } + + // open the R1 file + if (fastQFileR1.openFile(filenameR1, BaseAsciiMap::UNKNOWN) != + FastQStatus::FASTQ_SUCCESS) { + std::cerr << "Failed to open file: " << filenameR1.c_str(); + abort(); + } + + // open the R2 file + if (fastQFileR2.openFile(filenameR2, BaseAsciiMap::UNKNOWN) != + FastQStatus::FASTQ_SUCCESS) { + std::cerr << "Failed to open file: " << filenameR2.c_str(); + abort(); + } + + // point to the array of records already allocated for this reader + SamRecord *samRecord = samrecord_data->samrecords[tindex]; + int block_size = samrecord_data->block_size; + + // Keep reading the file until there are no more fastq sequences to process. + int i = 0; + int n_barcode_errors = 0; + int n_barcode_corrected = 0; + int n_barcode_correct = 0; + int r = 0; + printf("Opening the thread in %d\n", tindex); + + + while (fastQFileR1.keepReadingFile()) { + if ((!has_I1_file_list || + ( + has_I1_file_list && + fastQFileI1.readFastQSequence() == FastQStatus::FASTQ_SUCCESS + ) + ) && + fastQFileR1.readFastQSequence() == FastQStatus::FASTQ_SUCCESS && + fastQFileR2.readFastQSequence() == FastQStatus::FASTQ_SUCCESS) { + i = i + 1; + + // prepare the rth samrecord with the sequence, sequence quality + SamRecord *samrec = samRecord + r; + // barcode and UMI and their quality sequences + fillSamRecord(samrec, fastQFileI1, fastQFileR1, fastQFileR2, + barcode_length, umi_length, has_I1_file_list); + + // extract the raw barcode and UMI + std::string a = std::string(fastQFileR1.myRawSequence.c_str()); + std::string barcode = a.substr(0, barcode_length); + + // bucket barcode is used to pick the target bam file + // This is done because in the case of incorrigible barcodes + // we need a mechanism to uniformly distribute the alignments + // so that no bam is oversized to putting all such barcode less + // sequences into one particular. Incorregible barcodes are simply + // added withouth the CB tag + int32_t bucket = getBucketIndex(barcode, samrec, white_list_data, + samrecord_data, &n_barcode_corrected, + &n_barcode_correct, &n_barcode_errors); + + samrecord_data->num_records[tindex]++; + // store the index of the record in the right vector that is done + // to serve as a bucket B to hold the indices of samrecords that are + // going to bamfile B + samrecord_data->file_index[tindex][bucket].push_back(r); + + samrecord_data->num_records[tindex]++; + // write a block of samrecords + r = r + 1; + + // Once block_size amount of samrecord is read, or there + // is no more sequence to be read from the file then it is time to + // signal the writer threads to clear the buffer to the bam files + // only one reader should be successfull in doing so. + + // However, if a block of data is not read or end of the FASTQ file is + // seen then continue reading the FASTQ files and keep creating the + // sam records in its buffer. This is the same behavior across all + // reader threads + if (r == block_size || !fastQFileR1.keepReadingFile()) { + submit_block_tobe_written(samrecord_data, tindex); + + // start reading a new block of FASTQ sequences + r = 0; + } + if (i % 10000000 == 0) { + printf("%d\n", i); + std::string a = std::string(fastQFileR1.myRawSequence.c_str()); + printf("%s\n", fastQFileR1.mySequenceIdLine.c_str()); + printf("%s\n", fastQFileR2.mySequenceIdLine.c_str()); + } + } // if successful read of a sequence + } + + // Finished processing all of the sequences in the file. + // Close the input files. + if (has_I1_file_list) fastQFileI1.closeFile(); + + fastQFileR1.closeFile(); + fastQFileR2.closeFile(); + printf("Total barcodes:%d\n correct:%d\ncorrected:%d\nuncorrectible" + ":%d\nuncorrected:%lf\n", + i, n_barcode_correct, n_barcode_corrected, n_barcode_errors, + n_barcode_errors/static_cast(i) *100); +} diff --git a/fastqpreprocessing/src/fastqprocess.h b/fastqpreprocessing/src/fastqprocess.h new file mode 100644 index 0000000..ff32800 --- /dev/null +++ b/fastqpreprocessing/src/fastqprocess.h @@ -0,0 +1,121 @@ +/** + * @file fastqprocess.h + * @brief functions for file processing + * @author Kishori Konwar + * @date 2020-08-27 + ***********************************************/ + +#ifndef __FASTQ_PROCESS_H__ +#define __FASTQ_PROCESS_H__ + +#include +#include "FastQStatus.h" +#include "BaseAsciiMap.h" +#include "SamFile.h" +#include "SamValidation.h" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include "utilities.h" + + +/// Samrecord bins to be accessed by all threads +typedef struct SamRecordBins { + /// array or array of samrecords + /// one array for each reader samrecords[r]. + /// Note that every samrecord[r][i] for 0 <= i < num_records[r] + /// is destined to be written to one specfic output bam file. + /// These index of the bam file is one of the vectors "file_index[r][b]", + /// where b is one of the bam files index + SamRecord **samrecords; + + /// number of records in individual reader threads that + /// can be written, i.e., num_records[r] stores the number of + /// such records in samrecrods[r] + int32_t *num_records; + + /// An array of arrays of vector, one array "file_index[r]" for reader thread r. + /// The value vector in file_index[r][i] stores the indices of the samrecords[r] + /// where the record in samrecords[r][i], for 0 <= i < num_records[r] + /// should be written. This information is used by the writer threads. + vector **file_index; + + /// sample name + std::string sample_id; + int32_t block_size; + + /// number of output bam files, and one writer thread per bam file + int16_t num_files; + /// flag to stop the writer + bool stop; + /// the thread (reader) that is currently wanting to write + int32_t active_thread_num; +} SAM_RECORD_BINS; + + +/** + * @brief Processes the input fastq files + * + * @detail + * This function creates a set of readers (as many as there are files), + * a set of writers to write the individual bam files, a set of + * semaphores for readers to signal to writers when buffer of records + * are ready, and another set of semaphores for writers to signal + * readers then the buffer has been emptied out and, therefore, reader + * can go ahead and fill with more records. + * + * @params options user input options + * @params white_list_data data-structure to store barcode correction + * map and vector of correct barcodes +*/ +void process_inputs(const INPUT_OPTIONS & options, \ + const WHITE_LIST_DATA * white_list_data) ; + +/** + * @brief Process one triplet of file R1/R2 and I1 in a thread + * + * @detail + * This function will be run by a thread for each set of R1/R2 and I1 + * files. + * + * @param tindex reader thread index + * @param filename name of I1 file + * @param filename1 name of R1 file + * @param filename2 name of R2 file + * @param barcode_length length of a barcode + * @param umi_length length of UMI + * @param white_list_data data-structure barcode-correction based on + * white list + * @param samrecord_bins bins for samrecords from the reader threads +*/ + +void process_file(int32_t tindex, std::string filename, String filename1, \ + String filename2, unsigned int barcode_length, \ + unsigned int umi_length, \ + const WHITE_LIST_DATA *white_list_data, \ + SAM_RECORD_BINS * samrecord_bins) ; + +/** + * @brief Function for the writer thread + * + * @detail + * Dependeing on the number of output bam files there are as many + * writer thread as there are output bam files. Each writer thread + * writers into only one bam file + * + * @param windex index of the writer thread + * @param samrecord_bins bins for samrecords from the reader threads +*/ +void bam_writers(int32_t windex, SAM_RECORD_BINS *samrecord_bins) ; +#endif diff --git a/fastqpreprocessing/src/main.cpp b/fastqpreprocessing/src/main.cpp new file mode 100644 index 0000000..551a05b --- /dev/null +++ b/fastqpreprocessing/src/main.cpp @@ -0,0 +1,26 @@ +/** + * @file main.cpp + * @brief The main function for the fastqprocessing step + * @author Kishori Konwar + * @date 2020-08-27 + ***********************************************/ + +#include "fastqprocess.h" +#include "utilities.h" + +/* Flag set by ‘--verbose’. */ +int main (int argc, char **argv) +{ + + INPUT_OPTIONS options; + + read_options(argc, argv, options); + + std::cout << "reading whitelist file " << options.white_list_file << "..."; + WHITE_LIST_DATA *white_list_data = read_white_list(options.white_list_file); + std::cout << "done" << std::endl; + + process_inputs(options, white_list_data); + return 0; +} + diff --git a/fastqpreprocessing/src/utilities.cpp b/fastqpreprocessing/src/utilities.cpp new file mode 100644 index 0000000..6b1a48e --- /dev/null +++ b/fastqpreprocessing/src/utilities.cpp @@ -0,0 +1,287 @@ +/** + * @file utilities.cpp + * @brief Utility functions for file processing + * @author Kishori Konwar + * @date 2020-08-27 + ***********************************************/ + +#include "utilities.h" +#include +#include +#include + +using namespace std; +namespace fs = std::experimental::filesystem; + +/** @copydoc filesize */ +int32_t filesize(const char *filename) { + FILE *f = fopen(filename, "rb"); /* open the file in read only */ + + int32_t size = 0; + if (fseek(f, 0, SEEK_END) ==0 ) /* seek was successful */ + size = ftell(f); + fclose(f); + return size; +} + +/** @copydoc read_white_list */ +WHITE_LIST_DATA *read_white_list(const string &white_list_file) { + char ATCG[] = {'A', 'C', 'G', 'T', 'N'}; + + fstream newfile; + WHITE_LIST_DATA *white_list_data = new WHITE_LIST_DATA; + + // open a file to perform read operation using file object + newfile.open(white_list_file, ios::in); + int k = 0; + if (newfile.is_open()) { // checking whether the file is open + string tp; + + // read data from file object and put it into string. + while (getline(newfile, tp)) { + //insert the barcode into the list + white_list_data->barcodes.push_back(tp); + + for (int i=0; i < tp.size(); i++) { + for (int j=0; j < 5; j++) { + char c = tp[i]; + tp[i] = ATCG[j]; + /* if the mutation in any of the positions + is already present update the new correct index, + else insert the barcode or update the index + This is done to have the same values for corrected barcodes + as in the python implementation + */ + if (white_list_data->mutations.find(tp) == + white_list_data->mutations.end()) { + white_list_data->mutations.insert({tp, k}); + } else { + white_list_data->mutations[tp] = k; + } + tp[i] = c; + } + } + + /* -1 suggests it is already a whitelisted barcode + This is used, instead of the actual index, because when + the barcode is seen with -1 then no correction is necessary. + This is likely to avoid lots of map look up as most barcodes + not erroneous. + */ + white_list_data->mutations.at(tp) = -1; + k++; + } + // close the file object. + newfile.close(); + } + + return white_list_data; +} + +/** @copydoc read_options */ +void read_options(int argc, char **argv, INPUT_OPTIONS &options) { + int c; + int i; + + int verbose_flag; + + static struct option long_options[] = { + /* These options set a flag. */ + {"verbose", no_argument, &verbose_flag, 1}, + /* These options don’t set a flag. + We distinguish them by their indices. */ + {"barcode-length", required_argument, 0, 'b'}, + {"umi-length", required_argument, 0, 'u'}, + {"bam-size", required_argument, 0, 'B'}, + {"sample-id", required_argument, 0, 's'}, + {"I1", required_argument, 0, 'I'}, + {"R1", required_argument, 0, 'R'}, + {"R2", required_argument, 0, 'r'}, + {"white-list", required_argument, 0, 'w'}, + {0, 0, 0, 0} + }; + + // help messages when the user types -h + const char *help_messages[] = { + "verbose messages ", + "barcode length [required]", + "UMI length [required]", + "output BAM file in GB [optional: default 1 GB]", + "sample id [required]", + "I1 [optional]", + "R1 [required]", + "R2 [required]", + "whitelist (from cellranger) of barcodes [required]", + }; + + + while (true) { + /* getopt_long stores the option index here. */ + int option_index = 0; + + c = getopt_long(argc, argv, "b:u:B:s:I:R:r:w:", + long_options, &option_index); + + /* Detect the end of the options. */ + if (c == -1) + break; + + // process the option or arguments + switch (c) { + case 0: + /* If this option set a flag, do nothing else now. */ + if (long_options[option_index].flag != 0) + break; + printf("option %s", long_options[option_index].name); + if (optarg) + printf(" with arg %s", optarg); + printf("\n"); + break; + case 'b': + options.barcode_length = atoi(optarg); + break; + case 'u': + options.umi_length = atoi(optarg); + break; + case 'B': + options.bam_size = atof(optarg); + break; + case 's': + options.sample_id = string(optarg); + break; + case 'I': + options.I1s.push_back(string(optarg)); + break; + case 'R': + options.R1s.push_back(string(optarg)); + break; + case 'r': + options.R2s.push_back(string(optarg)); + break; + case 'w': + options.white_list_file = string(optarg); + break; + case '?': + case 'h': + i = 0; + printf("Usage: %s [options] \n", argv[0]); + while (long_options[i].name != 0) { + printf("\t--%-20s %-25s %-35s\n", long_options[i].name, + long_options[i].has_arg == no_argument? + "no argument" : "required_argument", + help_messages[i]); + i = i + 1; + } + /* getopt_long already printed an error message. */ + return; + break; + default: + abort(); + } + } + + // Check the options + + // number of R1 and R2 files should be equal + if ((options.R1s.size() != options.R2s.size()) || (options.R1s.size() == 0)) { + std::cout << "ERROR: Unequal number of R1 and R2 fastq files in input\n"; + std::cerr << "ERROR: Unequal number of R1 and R2 fastq files in input\n"; + exit(1); + } + + if ((options.I1s.size() != options.R1s.size()) && (options.I1s.size() != 0)) { + std::cout << "ERROR: Either the number of I1 input files are equal\n" + " to the number of R1 input files, or no I1 input files\n" + " should not be provided at all.\n"; + std::cerr << "ERROR: Either the number of I1 input files are equal\n" + " to the number of R1 input files, or no I1 input files\n" + " should not be provided at all.\n"; + exit(1); + } + // Bam file size must be positive + if (options.bam_size <= 0) { + std::cout << "ERROR: Size of a bam file (in GB) cannot be negative\n"; + std::cerr << "ERROR: Size of a bam file (in GB) cannot be negative\n"; + exit(1); + } + + // must have a sample id + if (options.sample_id.size() == 0) { + std::cout << "ERROR: Must provide a sample id or name\n"; + std::cerr << "ERROR: Must provide a sample id or name\n"; + exit(1); + } + + // barcode length must be positive + if (options.barcode_length <= 0) { + std::cout << "ERROR: Barcode length must be a positive integer\n"; + std::cerr << "ERROR: Barcode length must be a positive integer\n"; + exit(1); + } + + // UMI length must be positive + if (options.umi_length <= 0) { + std::cout << "ERROR: UMI length must be a positive integer\n"; + std::cerr << "ERROR: UMI length must be a positive integer\n"; + exit(1); + } + + // just prints out the files + if (verbose_flag) { + if (options.I1s.size()) { + _print_file_info(options.I1s, std::string("I1")); + } + + if (options.R1s.size()) { + _print_file_info(options.R1s, std::string("R1")); + } + + if (options.R2s.size()) { + _print_file_info(options.R2s, std::string("R2")); + } + } +} + + +/** @copydoc _print_file_info */ +void _print_file_info(const std::vector &fastqs, + const std::string &type) { + + if (fastqs.size()) { + std::cout << "INFO " << type << " files:" << std::endl; + for (int i= 0; i < fastqs.size(); i++) { + if (fs::exists(fastqs[i].c_str())) { + std::cout << "\t " << fastqs[i] << " exists, flie size " + << filesize(fastqs[i].c_str()) << std::endl; + } else { + std::cout << "ERROR " << fastqs[i] << " is missing!\n"; + std::cerr << "ERROR " << fastqs[i] << " is missing!\n"; + exit(1); + } + } + } +} + + + +/** @copydoc get_num_blocks */ +int32_t get_num_blocks(const INPUT_OPTIONS &options) { + double tot_size = 0; + for (int i= 0; i < options.R1s.size(); i++) { + if (options.I1s.size()) { + tot_size += filesize(options.I1s[i].c_str()); + } + tot_size += filesize(options.R1s[i].c_str()); + tot_size += filesize(options.R2s[i].c_str()); + } + + return ceil((tot_size/(1024*1024*1024)) + /static_cast(options.bam_size)); +} + +/** @copydoc error */ +void error(char *msg) { + perror(msg); + exit(1); +} + diff --git a/fastqpreprocessing/src/utilities.h b/fastqpreprocessing/src/utilities.h new file mode 100644 index 0000000..c37943d --- /dev/null +++ b/fastqpreprocessing/src/utilities.h @@ -0,0 +1,123 @@ +/** + * @file utilities.h + * @brief Utility functions for file processing + * @author Kishori Konwar + * @date 2020-08-26 + ***********************************************/ +#ifndef __OPTIMUS_UTILITES__ +#define __OPTIMUS_UTILITES__ +#include +#include +#include +#include +#include +#include +#include +#include + +typedef std::pair STRING_BOOL_PAIR; + +typedef std::vector STRING_VECTOR; + +typedef std::unordered_map STRING_INT32_MAP; + +// structure for correcting the barcodes +typedef struct _white_list_data { + /// an unordered map from whitelist barcodes and 1-mutations + /// to the index of the correct barcode + STRING_INT32_MAP mutations; + /// vector of whitelist barcodes + STRING_VECTOR barcodes; +} WHITE_LIST_DATA; + +/// Structure to hold input options +typedef struct _input_options { + /// Initialize some of the values + _input_options() { + barcode_length = -1; + umi_length = -1; + sample_id = ""; + bam_size = 1.0; + } + /// I1, R1 and R2 files name + std::vector I1s, R1s, R2s; + /// Barcode white list file + std::string white_list_file; + //// chemistry dependent (V2/V3) barcode and UMI length + int barcode_length, umi_length; + /// Bam file size to split by (in GB) + double bam_size; + /// sample name + std::string sample_id; +} INPUT_OPTIONS; + + +/** + * @brief Compute the number of bam files + * + * @details + * Adds up the size of the input file and divides by the size of + * user specified bam file size + + * @param options Input options structure that contains file name +*/ +int32_t get_num_blocks(const INPUT_OPTIONS &options); + +/** + * @brief Build barcode correction map white list barcodes & mutations + * + * @details + * A barcode is computed by checking if it is either in the white + * list or 1-mutation away from any white listed barcode. To check + * whether a barcode is correct or to correct it, if 1-mutation away from + * a barcode in the white list, we build a + * a map is created with the barcodes and the 1-mutation. The keys are + * barcodes or mutation and the values are index of the crrect barcode + * + * @param whilte_list_file white list file from 10x genomics' cellranger + * @return a stricture containing the barcode/1-mutation barcode to index + * of the correct barcode +*/ +WHITE_LIST_DATA *read_white_list(const std::string &white_list_file); + +/** + * @brief Reads the options to the program + * + * @param argc no of arguments to the main function + * @param argv arguments array to the main function + * @param options the structure for holding the options for getopt +*/ +void read_options(int, char **, INPUT_OPTIONS &); + +/** + * @brief Computes the size of a file in bytes + * + * @param filename file name whose size is computed + * @return size of the file in bytes +*/ +int32_t filesize(const char *filename); + + +/** + * @brief Computes the size of a file in bytes + * + * @param filename file name whose size is computed + * @return size of the file in bytes +*/ +int32_t getFileSize(const std::string &fileName); + +/** + * @brief Print system error and exit + * + * @param msg error string to print +*/ +void error (char *msg); + +/** + * @brief examines the existence and size of the input files + * +*/ +void _print_file_info(const std::vector &fastqs, \ + const std::string &type); + +#endif diff --git a/fastqpreprocessing/utils/big-run.sh b/fastqpreprocessing/utils/big-run.sh new file mode 100755 index 0000000..82a1e3e --- /dev/null +++ b/fastqpreprocessing/utils/big-run.sh @@ -0,0 +1 @@ +./fastqproc ../../L8TX/L8TX_180221_01_F12_R1.fastq.gz ../../L8TX/L8TX_180221_01_F12_I1.fastq.gz ../../L8TX/L8TX_180221_01_F12_R2.fastq.gz ../../L8TX/L8TX_171026_01_F03_R1.fastq.gz ../../L8TX/L8TX_171026_01_F03_I1.fastq.gz ../../L8TX/L8TX_171026_01_F03_R2.fastq.gz diff --git a/fastqpreprocessing/utils/check_barcode_partition.py b/fastqpreprocessing/utils/check_barcode_partition.py new file mode 100644 index 0000000..572205b --- /dev/null +++ b/fastqpreprocessing/utils/check_barcode_partition.py @@ -0,0 +1,39 @@ +import pysam +import argparse + +parser = argparse.ArgumentParser() +parser.add_argument("--bam", nargs="+", dest="bams", help="BAM files") + + +def check_disjoint_cbs(): + global parser + opts = parser.parse_args() + barcodes = {} + tot_alignments = 0 + + for bam in opts.bams: + print("reading " + bam) + barcodes[bam] = {} + with pysam.AlignmentFile(bam, "rb", check_sq=False) as input_alignments: + for alignment in input_alignments: + tot_alignments += 1 + if alignment.has_tag("CB"): + barcodes[bam][alignment.get_tag("CB")] = True + + for bam in opts.bams: + print("checking " + bam) + files = set(opts.bams) + otherbams = files.difference(set([bam])) + for cb in barcodes[bam].keys(): + for obam in otherbams: + if cb in barcodes[obam]: + print("not a partition") + return + + print("total alignments : ", tot_alignments) + print("is a partition") + return + + +if __name__ == "__main__": + check_disjoint_cbs() diff --git a/fastqpreprocessing/utils/create_fastq.sh b/fastqpreprocessing/utils/create_fastq.sh new file mode 100755 index 0000000..92f00cc --- /dev/null +++ b/fastqpreprocessing/utils/create_fastq.sh @@ -0,0 +1,10 @@ +zcat ../../L8TX/L8TX_180221_01_F12_R2.fastq.gz | head -n 4000000 > a_R1.fastq + +gzip a_R1.fastq + +cp a_R1.fastq.gz b_R2.fastq.gz +cp a_R1.fastq.gz b_I1.fastq.gz +cp a_R1.fastq.gz b_R1.fastq.gz + +cp a_R1.fastq.gz a_R2.fastq.gz +cp a_R1.fastq.gz a_I1.fastq.gz diff --git a/fastqpreprocessing/utils/example-run.sh b/fastqpreprocessing/utils/example-run.sh new file mode 100755 index 0000000..7006b13 --- /dev/null +++ b/fastqpreprocessing/utils/example-run.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +./fastqprocess --verbose \ + --bam-size 0.001 \ + --barcode-length 16 \ + --umi-length 10 \ + --sample-id L8TX \ + --white-list ../../../data/L8TX/737K-august-2016.txt \ + --I1 ../../../data/L8TX/A_I1.fastq.gz \ + --R1 ../../../data/L8TX/A_R1.fastq.gz \ + --R2 ../../../data/L8TX/A_R2.fastq.gz \ + --I1 ../../../data/L8TX/B_I1.fastq.gz \ + --R1 ../../../data/L8TX/B_R1.fastq.gz \ + --R2 ../../../data/L8TX/B_R2.fastq.gz \ diff --git a/fastqpreprocessing/utils/run.sh b/fastqpreprocessing/utils/run.sh new file mode 100755 index 0000000..bb61f61 --- /dev/null +++ b/fastqpreprocessing/utils/run.sh @@ -0,0 +1,12 @@ +#!/bin/bash + + +# --tool=memcheck \ +# --leak-check=full \ +# --log-file=valgrind-out.txt \ + +valgrind \ + --tool=massif \ + --time-unit=B \ + ./fastqproc a_R1.fastq.gz a_I1.fastq.gz a_R2.fastq.gz \ + b_R1.fastq.gz b_I1.fastq.gz b_R2.fastq.gz diff --git a/requirements.txt b/requirements.txt index 19390fe..713c499 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,4 @@ numpy==1.17.5 requests==2.20.0 setuptools==40.4.3 setuptools_scm==3.1.0 -tables==3.4.2 \ No newline at end of file +tables==3.4.4 diff --git a/src/sctools/consts.py b/src/sctools/consts.py index 9a57e08..e07980c 100644 --- a/src/sctools/consts.py +++ b/src/sctools/consts.py @@ -10,25 +10,25 @@ # BAM tag constants -RAW_SAMPLE_BARCODE_TAG_KEY = 'SR' -QUALITY_SAMPLE_BARCODE_TAG_KEY = 'SY' +RAW_SAMPLE_BARCODE_TAG_KEY = "SR" +QUALITY_SAMPLE_BARCODE_TAG_KEY = "SY" -MOLECULE_BARCODE_TAG_KEY = 'UB' -RAW_MOLECULE_BARCODE_TAG_KEY = 'UR' -QUALITY_MOLECULE_BARCODE_TAG_KEY = 'UY' +MOLECULE_BARCODE_TAG_KEY = "UB" +RAW_MOLECULE_BARCODE_TAG_KEY = "UR" +QUALITY_MOLECULE_BARCODE_TAG_KEY = "UY" -CELL_BARCODE_TAG_KEY = 'CB' -RAW_CELL_BARCODE_TAG_KEY = 'CR' -QUALITY_CELL_BARCODE_TAG_KEY = 'CY' +CELL_BARCODE_TAG_KEY = "CB" +RAW_CELL_BARCODE_TAG_KEY = "CR" +QUALITY_CELL_BARCODE_TAG_KEY = "CY" -GENE_NAME_TAG_KEY = 'GE' -NUMBER_OF_HITS_TAG_KEY = 'NH' +GENE_NAME_TAG_KEY = "GE" +NUMBER_OF_HITS_TAG_KEY = "NH" -ALIGNMENT_LOCATION_TAG_KEY = 'XF' -INTRONIC_ALIGNMENT_LOCATION_TAG_VALUE = 'INTRONIC' -CODING_ALIGNMENT_LOCATION_TAG_VALUE = 'CODING' -UTR_ALIGNMENT_LOCATION_TAG_VALUE = 'UTR' -INTERGENIC_ALIGNMENT_LOCATION_TAG_VALUE = 'INTERGENIC' +ALIGNMENT_LOCATION_TAG_KEY = "XF" +INTRONIC_ALIGNMENT_LOCATION_TAG_VALUE = "INTRONIC" +CODING_ALIGNMENT_LOCATION_TAG_VALUE = "CODING" +UTR_ALIGNMENT_LOCATION_TAG_VALUE = "UTR" +INTERGENIC_ALIGNMENT_LOCATION_TAG_VALUE = "INTERGENIC" # bam.py constants diff --git a/src/sctools/count.py b/src/sctools/count.py index 74753a1..b8d2e74 100644 --- a/src/sctools/count.py +++ b/src/sctools/count.py @@ -263,13 +263,13 @@ def from_sorted_tagged_bam( primary_alignment = alignments[0] if ( primary_alignment.has_tag(gene_name_tag) - and primary_alignment.has_tag('XF') - and primary_alignment.get_tag('XF') != 'INTERGENIC' + and primary_alignment.has_tag("XF") + and primary_alignment.get_tag("XF") != "INTERGENIC" ): gene_name = primary_alignment.get_tag(gene_name_tag) # overlaps multiple genes, drop query, and unfortunately there only one # one alignment for this query - if len(gene_name.split(',')) != 1: + if len(gene_name.split(",")) != 1: continue else: continue # drop query @@ -278,12 +278,12 @@ def from_sorted_tagged_bam( for alignment in alignments: if ( alignment.has_tag(gene_name_tag) - and alignment.has_tag('XF') - and alignment.get_tag('XF') != 'INTERGENIC' + and alignment.has_tag("XF") + and alignment.get_tag("XF") != "INTERGENIC" ): # consider its gene name only if it has only gene name gene_name = alignment.get_tag(gene_name_tag) - if len(gene_name.split(',')) == 1: + if len(gene_name.split(",")) == 1: implicated_gene_names.add(alignment.get_tag(gene_name_tag)) if len(implicated_gene_names) == 1: # only one gene diff --git a/src/sctools/encodings.py b/src/sctools/encodings.py index 64a6cfa..85f1cef 100644 --- a/src/sctools/encodings.py +++ b/src/sctools/encodings.py @@ -152,28 +152,28 @@ class TwoBitEncodingMap: """ map_ = { - ord('A'): 0, - ord('C'): 1, - ord('T'): 2, - ord('G'): 3, - ord('a'): 0, - ord('c'): 1, - ord('t'): 2, - ord('g'): 3, + ord("A"): 0, + ord("C"): 1, + ord("T"): 2, + ord("G"): 3, + ord("a"): 0, + ord("c"): 1, + ord("t"): 2, + ord("g"): 3, } - iupac_ambiguous: Set[int] = {ord(c) for c in 'MRWSYKVHDBNmrwsykvhdbn'} + iupac_ambiguous: Set[int] = {ord(c) for c in "MRWSYKVHDBNmrwsykvhdbn"} def __getitem__(self, byte: int) -> int: try: return self.map_[byte] except KeyError: if byte not in self.iupac_ambiguous: - raise KeyError(f'{chr(byte)} is not a valid IUPAC nucleotide code') + raise KeyError(f"{chr(byte)} is not a valid IUPAC nucleotide code") return random.randint(0, 3) encoding_map: TwoBitEncodingMap = TwoBitEncodingMap() - decoding_map: Mapping[int, bytes] = {0: b'A', 1: b'C', 2: b'T', 3: b'G'} + decoding_map: Mapping[int, bytes] = {0: b"A", 1: b"C", 2: b"T", 3: b"G"} bits_per_base: int = 2 @classmethod @@ -185,7 +185,7 @@ def encode(cls, bytes_encoded: bytes) -> int: return encoded def decode(self, integer_encoded: int) -> bytes: - decoded = b'' + decoded = b"" for _ in range(self.sequence_length): decoded = self.decoding_map[integer_encoded & 3] + decoded integer_encoded >>= 2 @@ -239,16 +239,16 @@ class ThreeBitEncodingMap: # C: 1, A: 2, G: 3, T: 4, N: 6; # note, not using 0 map_ = { - ord('C'): 1, - ord('A'): 2, - ord('G'): 3, - ord('T'): 4, - ord('N'): 6, - ord('c'): 1, - ord('a'): 2, - ord('g'): 3, - ord('t'): 4, - ord('n'): 6, + ord("C"): 1, + ord("A"): 2, + ord("G"): 3, + ord("T"): 4, + ord("N"): 6, + ord("c"): 1, + ord("a"): 2, + ord("g"): 3, + ord("t"): 4, + ord("n"): 6, } def __getitem__(self, byte: int) -> int: @@ -258,7 +258,7 @@ def __getitem__(self, byte: int) -> int: return 6 # any non-standard nucleotide gets "N" encoding_map: ThreeBitEncodingMap = ThreeBitEncodingMap() - decoding_map: Mapping[int, bytes] = {1: b'C', 2: b'A', 3: b'G', 4: b'T', 6: b'N'} + decoding_map: Mapping[int, bytes] = {1: b"C", 2: b"A", 3: b"G", 4: b"T", 6: b"N"} bits_per_base: int = 3 @classmethod @@ -271,7 +271,7 @@ def encode(cls, bytes_encoded: bytes) -> int: @classmethod def decode(cls, integer_encoded: int) -> bytes: - decoded = b'' + decoded = b"" while integer_encoded: decoded = cls.decoding_map[integer_encoded & 7] + decoded integer_encoded >>= 3 diff --git a/src/sctools/fastq.py b/src/sctools/fastq.py index 7518cb5..c6749de 100644 --- a/src/sctools/fastq.py +++ b/src/sctools/fastq.py @@ -61,7 +61,7 @@ class Record: """ - __slots__ = ['_name', '_sequence', '_name2', '_quality'] + __slots__ = ["_name", "_sequence", "_name2", "_quality"] def __init__(self, record: Iterable[AnyStr]): # use the setter functions @@ -75,9 +75,9 @@ def name(self) -> AnyStr: def name(self, value): """fastq record name""" if not isinstance(value, (bytes, str)): - raise TypeError('FASTQ name must be bytes') - elif not value.startswith(b'@'): - raise ValueError('FASTQ name must start with @') + raise TypeError("FASTQ name must be bytes") + elif not value.startswith(b"@"): + raise ValueError("FASTQ name must start with @") else: self._name = value @@ -89,7 +89,7 @@ def sequence(self) -> AnyStr: def sequence(self, value): """FASTQ nucleotide sequence""" if not isinstance(value, (bytes, str)): - raise TypeError('FASTQ sequence must be str or bytes') + raise TypeError("FASTQ sequence must be str or bytes") else: self._sequence = value @@ -101,7 +101,7 @@ def name2(self) -> AnyStr: def name2(self, value): """second FASTQ record name field (rarely used)""" if not isinstance(value, (bytes, str)): - raise TypeError('FASTQ name2 must be str or bytes') + raise TypeError("FASTQ name2 must be str or bytes") else: self._name2 = value @@ -113,15 +113,15 @@ def quality(self) -> AnyStr: def quality(self, value): """FASTQ record base call quality scores""" if not isinstance(value, (bytes, str)): - raise TypeError('FASTQ quality must be str or bytes') + raise TypeError("FASTQ quality must be str or bytes") else: self._quality = value def __bytes__(self): - return b''.join((self.name, self.sequence, self.name2, self.quality)) + return b"".join((self.name, self.sequence, self.name2, self.quality)) def __str__(self): - return b''.join((self.name, self.sequence, self.name2, self.quality)).decode() + return b"".join((self.name, self.sequence, self.name2, self.quality)).decode() def __repr__(self): return "Name: %s\nSequence: %s\nName2: %s\nQuality: %s\n" % ( @@ -167,10 +167,10 @@ class StrRecord(Record): """ def __bytes__(self): - return ''.join((self.name, self.sequence, self.name2, self.quality)).encode() + return "".join((self.name, self.sequence, self.name2, self.quality)).encode() def __str__(self): - return ''.join((self.name, self.sequence, self.name2, self.quality)) + return "".join((self.name, self.sequence, self.name2, self.quality)) # todo is this method necessary? @property @@ -181,9 +181,9 @@ def name(self) -> str: def name(self, value): """FASTQ record name""" if not isinstance(value, (bytes, str)): - raise TypeError('FASTQ name must be str or bytes') - if not value.startswith('@'): - raise ValueError('FASTQ name must start with @') + raise TypeError("FASTQ name must be str or bytes") + if not value.startswith("@"): + raise ValueError("FASTQ name must start with @") else: self._name = value @@ -239,14 +239,14 @@ def __iter__(self) -> Iterator[Tuple[str]]: tuple of length 4 containing the name, sequence, name2, and quality for a FASTQ record """ - record_type = StrRecord if self._mode == 'r' else Record + record_type = StrRecord if self._mode == "r" else Record for record in self._record_grouper(super().__iter__()): yield record_type(record) # namedtuple that defines the start and end position of a barcode sequence and provides the name # for both a quality and sequence tag -EmbeddedBarcode = namedtuple('Tag', ['start', 'end', 'sequence_tag', 'quality_tag']) +EmbeddedBarcode = namedtuple("Tag", ["start", "end", "sequence_tag", "quality_tag"]) def extract_barcode( @@ -273,8 +273,8 @@ def extract_barcode( seq = record.sequence[embedded_barcode.start : embedded_barcode.end] qual = record.quality[embedded_barcode.start : embedded_barcode.end] return ( - (embedded_barcode.sequence_tag, seq, 'Z'), - (embedded_barcode.quality_tag, qual, 'Z'), + (embedded_barcode.sequence_tag, seq, "Z"), + (embedded_barcode.quality_tag, qual, "Z"), ) @@ -354,7 +354,7 @@ def __init__( self.embedded_barcodes = other_embedded_barcodes else: raise TypeError( - 'if passed, other_embedded_barcodes must be a list or tuple' + "if passed, other_embedded_barcodes must be a list or tuple" ) self._error_mapping = ErrorsToCorrectBarcodesMap.single_hamming_errors_from_whitelist( @@ -399,6 +399,6 @@ def extract_cell_barcode(self, record: Tuple[str], cb: EmbeddedBarcode): seq_tag, qual_tag = extract_barcode(record, cb) try: corrected_cb = self._error_mapping.get_corrected_barcode(seq_tag[1]) - return seq_tag, qual_tag, (consts.CELL_BARCODE_TAG_KEY, corrected_cb, 'Z') + return seq_tag, qual_tag, (consts.CELL_BARCODE_TAG_KEY, corrected_cb, "Z") except KeyError: return seq_tag, qual_tag diff --git a/src/sctools/groups.py b/src/sctools/groups.py index 511bc61..2a3592f 100644 --- a/src/sctools/groups.py +++ b/src/sctools/groups.py @@ -26,13 +26,13 @@ def write_aggregated_picard_metrics_by_row(file_names, output_name): metrics = {} d = pd.DataFrame() for file_name in file_names: - cell_id = os.path.basename(file_name).split('_qc')[0] + cell_id = os.path.basename(file_name).split("_qc")[0] metrics[cell_id] = {} parsed = picard.parse(file_name) - class_name = parsed['metrics']['class'].split('.')[2] + class_name = parsed["metrics"]["class"].split(".")[2] # Alignment metrics return multiple lines, # but only output PAIRED-READS/third line - contents = parsed['metrics']['contents'] + contents = parsed["metrics"]["contents"] if class_name == "AlignmentSummaryMetrics": # parse out PE, R1 and R2. If the reads are unpaired, the contents # will be a single dict rather than a list of dicts. @@ -40,12 +40,12 @@ def write_aggregated_picard_metrics_by_row(file_names, output_name): contents = [contents] rows = {} for m in contents: - cat = m['CATEGORY'] + cat = m["CATEGORY"] rows.update( { - k + '.' + cat: v + k + "." + cat: v for k, v in m.items() - if k not in ['SAMPLE', 'LIBRARY', 'READ_GROUP', 'CATEGORY'] + if k not in ["SAMPLE", "LIBRARY", "READ_GROUP", "CATEGORY"] } ) # sometimes(very rare), insertion metrics also return multiple lines @@ -64,14 +64,14 @@ def write_aggregated_picard_metrics_by_row(file_names, output_name): { k: rows[k] for k in rows - if k not in ['SAMPLE', 'LIBRARY', 'READ_GROUP', 'CATEGORY'] + if k not in ["SAMPLE", "LIBRARY", "READ_GROUP", "CATEGORY"] } ) - df = pd.DataFrame.from_dict(metrics, orient='columns') - df.insert(0, 'Class', class_name) + df = pd.DataFrame.from_dict(metrics, orient="columns") + df.insert(0, "Class", class_name) d = d.append(df) d_T = d.T - d_T.to_csv(output_name + '.csv') + d_T.to_csv(output_name + ".csv") def write_aggregated_picard_metrics_by_table(file_names, output_name): @@ -88,12 +88,12 @@ def write_aggregated_picard_metrics_by_table(file_names, output_name): return if the program completes successfully. """ for file_name in file_names: - cell_id = os.path.basename(file_name).split('_qc')[0] - class_name = os.path.basename(file_name).split('.')[1] + cell_id = os.path.basename(file_name).split("_qc")[0] + class_name = os.path.basename(file_name).split(".")[1] parsed = picard.parse(file_name) - dat = pd.DataFrame.from_dict(parsed['metrics']['contents']) - dat.insert(0, 'Sample', cell_id) - dat.to_csv(output_name + "_" + class_name + '.csv', index=False) + dat = pd.DataFrame.from_dict(parsed["metrics"]["contents"]) + dat.insert(0, "Sample", cell_id) + dat.to_csv(output_name + "_" + class_name + ".csv", index=False) def write_aggregated_qc_metrics(file_names, output_name): @@ -113,8 +113,8 @@ def write_aggregated_qc_metrics(file_names, output_name): dat = pd.read_csv(file_name, index_col=0) print(dat.index) print(df.head()) - df = pd.concat([df, dat], axis=1, join='outer') - df.to_csv(output_name + '.csv', index=True) + df = pd.concat([df, dat], axis=1, join="outer") + df.to_csv(output_name + ".csv", index=True) def parse_hisat2_log(file_names, output_name): @@ -134,22 +134,22 @@ def parse_hisat2_log(file_names, output_name): metrics = {} tag = "NONE" for file_name in file_names: - if '_qc' in file_name: - cell_id = os.path.basename(file_name).split('_qc')[0] + if "_qc" in file_name: + cell_id = os.path.basename(file_name).split("_qc")[0] tag = "HISAT2G" - elif '_rsem' in file_name: - cell_id = os.path.basename(file_name).split('_rsem')[0] + elif "_rsem" in file_name: + cell_id = os.path.basename(file_name).split("_rsem")[0] tag = "HISAT2T" with open(file_name) as f: dat = f.readlines() - d = [x.strip().split(':') for x in dat] + d = [x.strip().split(":") for x in dat] # remove the first row of each section. d.pop(0) - metrics[cell_id] = {x[0]: x[1].strip().split(' ')[0] for x in d} - df = pd.DataFrame.from_dict(metrics, orient='columns') + metrics[cell_id] = {x[0]: x[1].strip().split(" ")[0] for x in d} + df = pd.DataFrame.from_dict(metrics, orient="columns") df.insert(0, "Class", tag) df_T = df.T - df_T.to_csv(output_name + '.csv') + df_T.to_csv(output_name + ".csv") def parse_rsem_cnt(file_names, output_name): @@ -167,7 +167,7 @@ def parse_rsem_cnt(file_names, output_name): """ metrics = {} for file_name in file_names: - cell_id = os.path.basename(file_name).split('_rsem')[0] + cell_id = os.path.basename(file_name).split("_rsem")[0] i = 0 with open(file_name) as f: while i < 3: @@ -189,7 +189,7 @@ def parse_rsem_cnt(file_names, output_name): "strand": read_type, "uncertain reads": n_uncertain, } - df = pd.DataFrame.from_dict(metrics, orient='columns') + df = pd.DataFrame.from_dict(metrics, orient="columns") df.insert(0, "Class", "RSEM") df_T = df.T - df_T.to_csv(output_name + '.csv') + df_T.to_csv(output_name + ".csv") diff --git a/src/sctools/gtf.py b/src/sctools/gtf.py index fd075ec..7f574a9 100644 --- a/src/sctools/gtf.py +++ b/src/sctools/gtf.py @@ -294,7 +294,7 @@ def get_mitochondrial_gene_names( f"Malformed GTF file detected. Record is of type gene but does not have a " f'"gene_name" field: {record}' ) - if re.match('^mt-', gene_name, re.IGNORECASE): + if re.match("^mt-", gene_name, re.IGNORECASE): if gene_id not in mitochondrial_gene_ids: mitochondrial_gene_ids.add(gene_id) diff --git a/src/sctools/metrics/gatherer.py b/src/sctools/metrics/gatherer.py index 10d47c5..91f7287 100644 --- a/src/sctools/metrics/gatherer.py +++ b/src/sctools/metrics/gatherer.py @@ -73,7 +73,7 @@ def bam_file(self) -> str: """the bam file that metrics are generated from""" return self._bam_file - def extract_metrics(self, mode='rb') -> None: + def extract_metrics(self, mode="rb") -> None: """extract metrics from the provided bam file and write the results to csv. Parameters @@ -113,7 +113,7 @@ class GatherCellMetrics(MetricGatherer): __doc__ += extra_docs - def extract_metrics(self, mode: str = 'rb') -> None: + def extract_metrics(self, mode: str = "rb") -> None: """Extract cell metrics from self.bam_file Parameters @@ -186,7 +186,7 @@ class GatherGeneMetrics(MetricGatherer): __doc__ += extra_docs - def extract_metrics(self, mode: str = 'rb') -> None: + def extract_metrics(self, mode: str = "rb") -> None: """Extract gene metrics from self.bam_file Parameters @@ -208,7 +208,7 @@ def extract_metrics(self, mode: str = 'rb') -> None: metric_aggregator = GeneMetrics() # in case of multi-genes ignore as in the counting stage - if gene_tag and len(gene_tag.split(',')) > 1: + if gene_tag and len(gene_tag.split(",")) > 1: continue # break up gene ids by cell barcodes diff --git a/src/sctools/metrics/merge.py b/src/sctools/metrics/merge.py index a789d75..aa4d483 100644 --- a/src/sctools/metrics/merge.py +++ b/src/sctools/metrics/merge.py @@ -48,8 +48,8 @@ class MergeMetrics: def __init__(self, metric_files: Sequence[str], output_file: str): self._metric_files = metric_files - if not output_file.endswith('.csv.gz'): - output_file += '.csv.gz' + if not output_file.endswith(".csv.gz"): + output_file += ".csv.gz" self._output_file = output_file def execute(self) -> None: @@ -68,7 +68,7 @@ def execute(self) -> None: pd.read_csv(f, index_col=0) for f in self._metric_files ] concatenated_frame: pd.DataFrame = pd.concat(metric_dataframes, axis=0) - concatenated_frame.to_csv(self._output_file, compression='gzip') + concatenated_frame.to_csv(self._output_file, compression="gzip") class MergeGeneMetrics(MergeMetrics): @@ -84,26 +84,26 @@ def execute(self) -> None: """ count_data_to_sum = [ - 'n_reads', - 'noise_reads', - 'perfect_molecule_barcodes', - 'reads_mapped_exonic', - 'reads_mapped_intronic', - 'reads_mapped_utr', - 'reads_mapped_uniquely', - 'reads_mapped_multiple', - 'duplicate_reads', - 'spliced_reads', - 'antisense_reads', - 'n_molecules', - 'n_fragments', - 'fragments_with_single_read_evidence', - 'molecules_with_single_read_evidence', - 'number_cells_detected_multiple', - 'number_cells_expressing', + "n_reads", + "noise_reads", + "perfect_molecule_barcodes", + "reads_mapped_exonic", + "reads_mapped_intronic", + "reads_mapped_utr", + "reads_mapped_uniquely", + "reads_mapped_multiple", + "duplicate_reads", + "spliced_reads", + "antisense_reads", + "n_molecules", + "n_fragments", + "fragments_with_single_read_evidence", + "molecules_with_single_read_evidence", + "number_cells_detected_multiple", + "number_cells_expressing", ] - sum_operations = {c: 'sum' for c in count_data_to_sum} + sum_operations = {c: "sum" for c in count_data_to_sum} def weighted_average(data_frame: pd.DataFrame) -> pd.Series: """Calculate the average of each metric, weighted by number of reads per chunk @@ -119,15 +119,15 @@ def weighted_average(data_frame: pd.DataFrame) -> pd.Series: The average of each metric across chunks, weighted by the number of reads per chunk """ - weights = data_frame['n_reads'].values + weights = data_frame["n_reads"].values columns_to_average_by_read = [ - 'molecule_barcode_fraction_bases_above_30_mean', - 'molecule_barcode_fraction_bases_above_30_variance', - 'genomic_reads_fraction_bases_quality_above_30_mean', - 'genomic_reads_fraction_bases_quality_above_30_variance', - 'genomic_read_quality_mean', - 'genomic_read_quality_variance', + "molecule_barcode_fraction_bases_above_30_mean", + "molecule_barcode_fraction_bases_above_30_variance", + "genomic_reads_fraction_bases_quality_above_30_mean", + "genomic_reads_fraction_bases_quality_above_30_variance", + "genomic_read_quality_mean", + "genomic_read_quality_variance", ] return pd.Series( @@ -155,12 +155,12 @@ def recalculate_operation(data_frame) -> pd.DataFrame: """ return pd.DataFrame( data={ - 'reads_per_molecule': data_frame['n_reads'] - / data_frame['n_molecules'], - 'fragments_per_molecule': data_frame['n_fragments'] - / data_frame['n_molecules'], - 'reads_per_fragment': data_frame['n_reads'] - / data_frame['n_fragments'], + "reads_per_molecule": data_frame["n_reads"] + / data_frame["n_molecules"], + "fragments_per_molecule": data_frame["n_fragments"] + / data_frame["n_molecules"], + "reads_per_fragment": data_frame["n_reads"] + / data_frame["n_fragments"], } ) @@ -188,4 +188,4 @@ def recalculate_operation(data_frame) -> pd.DataFrame: nucleus = merged # write the data - nucleus.to_csv(self._output_file, compression='gzip') + nucleus.to_csv(self._output_file, compression="gzip") diff --git a/src/sctools/metrics/writer.py b/src/sctools/metrics/writer.py index 3ce8a84..2379418 100644 --- a/src/sctools/metrics/writer.py +++ b/src/sctools/metrics/writer.py @@ -49,18 +49,18 @@ def __init__(self, output_stem: str, compress=True): # check and fix extension: if compress: - if not output_stem.endswith('.csv.gz'): - output_stem += '.csv.gz' + if not output_stem.endswith(".csv.gz"): + output_stem += ".csv.gz" else: - if not output_stem.endswith('.csv'): - output_stem += '.csv' + if not output_stem.endswith(".csv"): + output_stem += ".csv" self._filename: str = output_stem # open the file if compress: - self._open_fid: TextIO = gzip.open(self._filename, 'wt') + self._open_fid: TextIO = gzip.open(self._filename, "wt") else: - self._open_fid: TextIO = open(self._filename, 'w') + self._open_fid: TextIO = open(self._filename, "w") self._header: List[str] = None @property @@ -78,8 +78,8 @@ def write_header(self, record: Mapping[str, Any]) -> None: producing a dictionary of keys to metric values. """ - self._header = list(key for key in record.keys() if not key.startswith('_')) - self._open_fid.write(',' + ','.join(self._header) + '\n') + self._header = list(key for key in record.keys() if not key.startswith("_")) + self._open_fid.write("," + ",".join(self._header) + "\n") def write(self, index: str, record: Mapping[str, Number]) -> None: """Write the array of metric values for a cell or gene to file. @@ -97,10 +97,10 @@ def write(self, index: str, record: Mapping[str, Number]) -> None: # genes and cells can be None, call repr to convert to string when this induces a TypeError try: - self._open_fid.write(index + ',' + ','.join(ordered_fields) + '\n') + self._open_fid.write(index + "," + ",".join(ordered_fields) + "\n") except TypeError: index = repr(index) - self._open_fid.write(index + ',' + ','.join(ordered_fields) + '\n') + self._open_fid.write(index + "," + ",".join(ordered_fields) + "\n") def close(self) -> None: """Close the metrics file.""" diff --git a/src/sctools/reader.py b/src/sctools/reader.py index 0f18352..bc26f1c 100644 --- a/src/sctools/reader.py +++ b/src/sctools/reader.py @@ -53,17 +53,17 @@ def infer_open(file_: str, mode: str) -> Callable: partial """ - with open(file_, 'rb') as f: + with open(file_, "rb") as f: data: bytes = f.read(3) # gz and bzip treat 'r' = bytes, 'rt' = string - if data[:2] == b'\x1f\x8b': # gzip magic number + if data[:2] == b"\x1f\x8b": # gzip magic number inferred_openhook: Callable = gzip.open - inferred_mode: str = 'rt' if mode == 'r' else mode + inferred_mode: str = "rt" if mode == "r" else mode - elif data == b'BZh': # bz2 magic number + elif data == b"BZh": # bz2 magic number inferred_openhook: Callable = bz2.open - inferred_mode: str = 'rt' if mode == 'r' else mode + inferred_mode: str = "rt" if mode == "r" else mode else: inferred_openhook: Callable = open @@ -89,7 +89,7 @@ class Reader: """ - def __init__(self, files='-', mode='r', header_comment_char=None): + def __init__(self, files="-", mode="r", header_comment_char=None): if isinstance(files, str): self._files = [files] elif isinstance(files, Iterable): # test items of iterable @@ -97,16 +97,16 @@ def __init__(self, files='-', mode='r', header_comment_char=None): if all(isinstance(f, str) for f in files): self._files = files else: - raise TypeError('All passed files must be type str') + raise TypeError("All passed files must be type str") else: - raise TypeError('Files must be a string filename or a list of such names.') + raise TypeError("Files must be a string filename or a list of such names.") # set open mode: - if mode not in {'r', 'rb'}: + if mode not in {"r", "rb"}: raise ValueError("Mode must be one of 'r', 'rb'") self._mode = mode - if isinstance(header_comment_char, str) and mode == 'rb': + if isinstance(header_comment_char, str) and mode == "rb": self._header_comment_char = header_comment_char.encode() else: self._header_comment_char = header_comment_char diff --git a/src/sctools/stats.py b/src/sctools/stats.py index ebe6dc4..a303f5f 100644 --- a/src/sctools/stats.py +++ b/src/sctools/stats.py @@ -46,7 +46,7 @@ def base4_entropy(x, axis=1): else: x = np.divide(x, np.sum(x, axis=axis)) - with np.errstate(divide='ignore'): + with np.errstate(divide="ignore"): r = np.log(x) / np.log(4) # convention: 0 * log(0) = 0, != -INF. @@ -72,7 +72,7 @@ class OnlineGaussianSufficientStatistic: """ - __slots__ = ['_count', '_mean', '_mean_squared_error'] + __slots__ = ["_count", "_mean", "_mean_squared_error"] def __init__(self): self._mean_squared_error: float = 0.0