In [None]:
%%writefile telomemore/generatecsvfiles.py
import re
from collections import defaultdict, Counter
import pysam
from pathlib import Path
from typing import Tuple


class GenerateCsvFiles:
    '''Counts telomeres from bam file generated by cell ranger'''
    
    def __init__(self, bam_file: str):
        self.bam = pysam.AlignmentFile(bam_file, 'rb')
        
    def _number_telomers(self, pattern: str, string: str) -> int:
        counts = re.findall(pattern, string)
        return len(counts)

    def _telomere_counts(self, pattern: str, cutoff: int) -> Tuple[dict, dict, int]:
        telomeres_cells = defaultdict(int)
        total_reads_cells = defaultdict(int)
        missed_barcodes = 0
        
        for read in self.bam:
            try:
                total_reads_cells[read.get_tag('CB')] += 1 
                # adds the key to the dict and initializes it to 0 
                telomeres_cells[read.get_tag('CB')]
                if self._number_telomers(pattern, read.seq) >= cutoff:
                    telomeres_cells[read.get_tag('CB')] += 1
            except Exception:
                missed_barcodes += 1

        return telomeres_cells, total_reads_cells, missed_barcodes
    
    def _write_file(self, out_directory: str, pattern: str) -> None:
        telomere_file = Path(out_directory, 'telomere_counts.csv')
        totalreads_file = Path(out_directory, 'total_reads.csv')
        missed_barcods_file = Path(out_directory, 'missed_barcodes.txt')
        
        telomeres_dict, total_dict, missed_barcodes = self._telomere_counts(pattern)
        
        with open(telomere_file, 'a+') as telomere:
            for key, value in telomeres_dict.items():
                print(f'{key},{value}', file=telomere)
                
        with open(totalreads_file, 'a+') as total:
            for key, value in total_dict.items():
                print(f'{key},{value}', file=total)
                
        with open(missed_barcods_file, 'w+') as missed:
            print(f'Number of missed barcodes = {missed_barcodes}', file=missed)
            
    def run_telomere_count(self, out_directory: str, telomere_pattern: str) -> None:
        '''Generetes a csv file with barcode and number of putative telomeres.'''
        
        print('Starting to write file.')
        self._write_file(out_directory, telomere_pattern)
        print('File written.')
        
    def _k_mer_per_read(self, pattern: str) -> Tuple[dict, int]:
        '''Counts number of kmer in each read and stores the number in list.'''
        kmer_counts = []
        exception = 0
        for read in self.bam:
            try:
                counts = self._number_telomers(pattern, read.seq)
                kmer_counts.append(counts)
            except Exception:
                exception += 1
                
        counter = Counter(kmer_counts)
        return counter, exception
    
    def number_kmers(self, out_directory: str, pattern: str) -> None:
        '''Counts number of kmer in each read and writes results to file.'''
        count_file = Path(out_directory) / f'{pattern}_k_mer_counts.csv'
        missed_file = Path(out_directory) / f'{pattern}_missed_reads.txt'
        
        print(f'Searching for {pattern}...')
        counter, exception = self._k_mer_per_read(pattern)
        
        print(f'Writing {pattern} file...')
        with open(count_file, 'a+') as counts:
            for key, value in counter.items():
                print(f'{key},{value}', file=counts)
            
        with open(missed_file, 'w+') as missed:
            print(f'Number of sequences that could not be read: {exception}', file=missed)
        
        print(f'{pattern} files written.')

In [1]:
%%writefile telomemore/utils.py
import re
from pathlib import Path
import pysam

class BamFile:
    '''Searches through a folder and stores all bam files in a list.'''
    
    def __init__(self, folder: str):
        self.folder = Path(folder)
        self.files = sorted([bam for bam in self.folder.rglob('*.bam') if bam.is_file()])

class BaseKmerFinder:
    '''Superclass for finding kmer in bam file.'''
    
    def __init__(self, folder: str, pattern: str, out_folder: str):
        self.bam = BamFile(folder)
        self.pattern = re.compile(pattern)
        self.out_folder = Path(out_folder).resolve()
        
    def _number_telomers(self, sequence: str) -> int:
        counts = re.findall(self.pattern, sequence)
        return len(counts)

    

Overwriting telomemore/utils.py


In [62]:
%%writefile telomemore/countkmers.py
from telomemore.utils import BaseKmerFinder
from typing import Tuple
from collections import Counter, defaultdict
import pysam
from pathlib import Path
import pandas as pd

class CountKmers(BaseKmerFinder):
    '''Finds occurances of kmers in bam file and stores'''
    
    def __init__(self, input_folder: str, pattern: str, output_folder: str):
        super().__init__(input_folder, pattern, output_folder)
        
    def _k_mer_per_read(self, bamfile: Path) -> Tuple[dict, int]:
        '''Counts number of kmer in each read and stores the number in list.'''
        kmer_counts = []
        missed_reads = 0
        for read in bamfile:
            try:
                counts = self._number_telomers(read.seq)
                kmer_counts.append(counts)
            except Exception:
                missed_reads += 1

        counter = Counter(kmer_counts)
        return counter, missed_reads
    
    def run_kmer_counter(self):
        '''Outputs a csv file with information about kmer occurance for each bam file in the folder.'''
        
        for file in self.bam.files:
            sam = pysam.AlignmentFile(file, 'rb')
            
            print(f'Looking for {self.pattern.pattern} in {file}')
            
            counter, missed_reads = self._k_mer_per_read(sam)
            
            # TODO the folder name instead of file name
            kmer_file = self.out_folder / f'{file}_{str(self.pattern.pattern)}_kmer.csv'
            exception_file = self.out_folder / f'{file}_missed_reads.txt'
            
            number = [key for key in counter.keys()]
            count = [value for value in counter.values()]
            
            pd.DataFrame({'number': number, 'count': count}).to_csv(kmer_file, index=False)
            
            with open(exception_file, 'w+') as missed:
                print(f'Reads that could not be read in {str(file)}: {missed_reads}', file=missed)
            
            print(f'Done with {file.stem}')

Overwriting telomemore/countkmers.py


In [63]:
#%%writefile telomemore/count_telomeres.py
from telomemore.utils import BaseKmerFinder
from typing import Tuple
from collections import Counter, defaultdict
import pysam
from pathlib import Path
import pandas as pd

class CountTelomeres(BaseKmerFinder):
    '''Counts number of telomeres in all bam files in the input folder'''

    def __init__(self, input_folder: str, pattern: str, output_folder: str, cutoff: int):
        super().__init__(input_folder, pattern, output_folder)
        self.cutoff = cutoff
        

    def _telomere_counts(self, sam: pysam.AlignmentFile) -> Tuple[dict, dict, int]:
        '''Counts number of telomeres and returns the total reads per cells, telomeres per cells and reads with missed barcodes.'''
        telomeres_cells = defaultdict(int)
        total_reads_cells = defaultdict(int)
        missed_barcodes = 0
    
        for read in sam:
            try:
                total_reads_cells[read.get_tag('CB')] += 1 
                # adds the key to the dict and initializes it to 0 
                telomeres_cells[read.get_tag('CB')]
                if self._number_telomers(read.seq) >= self.cutoff:
                    telomeres_cells[read.get_tag('CB')] += 1
            except Exception:
                missed_barcodes += 1

        return telomeres_cells, total_reads_cells, missed_barcodes
        
    
    def run_telomer_count(self) -> None:
        '''Generates teleomere counts for each bam file in the input folder. Writes the results as csv files.'''
        
        for file in self.bam.files:
            
            telomere_file = self.out_folder / f'{str(file)}_telomeres_{self.pattern.pattern}.csv'
            totalreads_file = self.out_folder / f'{str(file)}_total_reads.csv'
            missed_barcods_file = self.out_folder / f'{str(file)}_missed_barcodes.txt'
            
            sam = pysam.AlignmentFile(file, 'rb')
            
            print(f'Looking for {self.pattern.pattern} in {file}')
            
            telomeres_dict, total_dict, missed_barcodes = self._telomere_counts(sam)

            with open(telomere_file, 'a+') as telomere:
                for key, value in telomeres_dict.items():
                    print(f'{key},{value}', file=telomere)

            with open(totalreads_file, 'a+') as total:
                for key, value in total_dict.items():
                    print(f'{key},{value}', file=total)

            with open(missed_barcods_file, 'w+') as missed:
                print(f'Number of missed barcodes = {missed_barcodes}', file=missed)
                
            print(f'Done with {file}')






test = CountTelomeres('/Users/williamrosenbaum/Bioinformatics/smallTseq/', 'AATT', 'TESTAR/', cutoff=1)

test.run_telomer_count()

Looking for AATT in /Users/williamrosenbaum/Bioinformatics/smallTseq/testar_lite/bowtie2/processed/aligned/sample1_umi_tools_extract_cutadapt_bowtie2.bam
Done with /Users/williamrosenbaum/Bioinformatics/smallTseq/testar_lite/bowtie2/processed/aligned/sample1_umi_tools_extract_cutadapt_bowtie2.bam
Looking for AATT in /Users/williamrosenbaum/Bioinformatics/smallTseq/testar_lite/bowtie2/processed/aligned/sample2_umi_tools_extract_cutadapt_bowtie2.bam
Done with /Users/williamrosenbaum/Bioinformatics/smallTseq/testar_lite/bowtie2/processed/aligned/sample2_umi_tools_extract_cutadapt_bowtie2.bam
Looking for AATT in /Users/williamrosenbaum/Bioinformatics/smallTseq/testar_lite/bowtie2/processed/aligned/sample3_umi_tools_extract_cutadapt_bowtie2.bam
Done with /Users/williamrosenbaum/Bioinformatics/smallTseq/testar_lite/bowtie2/processed/aligned/sample3_umi_tools_extract_cutadapt_bowtie2.bam
Looking for AATT in /Users/williamrosenbaum/Bioinformatics/smallTseq/testar_lite/umi_tools/processed/dedup

In [None]:
class WriteCSV:
    pass

class Plot:
    pass



In [61]:
import re

test = re.compile('AAA')

test.pattern


'AAA'