In [8]:
import pysam
import pandas as pd

In [114]:
fa = pysam.FastaFile('hg38.fa') 

In [130]:
fa.fetch(reference='chr1', start=10000, end= 10000+ 33)

'taaccctaaccctaaccctaaccctaaccctaa'

In [24]:
ref = pd.DataFrame({'chr': fa.references, 'length': fa.lengths})

In [30]:
ref

Unnamed: 0,chr,length
0,chr1,248956422
1,chr10,133797422
2,chr11,135086622
3,chr11_KI270721v1_random,100316
4,chr12,133275309
...,...,...
450,chrUn_GL000216v2,176608
451,chrUn_GL000218v1,161147
452,chrX,156040895
453,chrY,57227415


In [4]:
fa.close()

In [134]:

import pysam
import math
import numpy as np
from typing import NamedTuple
import array
import gzip


CG_CONTEXT_FORWARD_HASH = {'CGA':'CG', 'CGT':'CG', 'CGC':'CG', 'CGG':'CG', 'CGN':'CG', # 5 CG
                           'CAG':'CHG', 'CTG':'CHG', 'CCG':'CHG',          # 3 CHG
                           'CAA':'CHH', 'CAT':'CHH', 'CAC':'CHH',          # 9 CHH
                           'CTA':'CHH', 'CTT':'CHH', 'CTC':'CHH',
                           'CCA':'CHH', 'CCT':'CHH', 'CCC':'CHH'
                           }

CG_CONTEXT_REVERSE_HASH = {'ACG':'CG', 'TCG':'CG', 'CCG':'CG', 'GCG':'CG', 'NCG':'CG', # 5 CG
                           'CAG':'CHG', 'CTG':'CHG', 'CGG':'CHG',          # 3 CHG
                           'AAG':'CHH', 'ATG':'CHH', 'AGG':'CHH',          # 9 CHH
                           'TAG':'CHH', 'TTG':'CHH', 'TGG':'CHH',
                           'GAG':'CHH', 'GTG':'CHH', 'GGG':'CHH'
                           }

DI_CONTEXT_REVERSE_HASH = {'AG':'CT', 'TG':'CA', 'CG':'CG', 'GG':'CC'}

# parse 3-letter context, CG/CHG/CHH etc.

class CGContext:
    
    def __init__(self, chr, pos, base, threeletter):
        pass

# interval must be involved in single chr
class GenomicInterval(NamedTuple):
    chr: str
    chr_length: int
    start: int
    end: int

# `ref` includes ref bases from `start`-2 to `end`-2
class FaGenomicInterval(NamedTuple):
    chr: str
    start: int
    end: int
    bases: str


class GenomicIntervalGenerator:
    
    def __init__(self, fa: pysam.FastaFile, 
                 chrs = "all",
                 start: int = 0,
                 end: int = math.inf,
                 step: int = 10_000,
                 ) -> None:
        
        self.chrs = fa.references
        self.lens = fa.lengths
        if chrs == "all":
            self.chrs_selected = self.chrs
        else:
            self.chrs_selected = chrs
        
        # print(self.chrs_selected)
        self.start = start
        self.end = end
        self.step = step

        assert(step > 0 and start < end)
        assert(len(self.chrs) > 0 and len(self.chrs) == len(self.lens))


    def __repr__(self):
        return f'GenomicIntervalGenerator({len(self.chrs)} contig(s) with step {self.step})'
    
    def __iter__(self) -> GenomicInterval:
        for (chr, len) in zip(self.chrs, self.lens):
            if chr not in self.chrs_selected:
                continue

            end2 = min(self.end, len)
            start = self.start
            end = start + self.step
            while (True):
                # end += self.step
                if start < end2:
                    end = min(end, end2)
                    yield GenomicInterval(chr=chr, chr_length=len, start=start, end=end)
                    start = end
                    end += self.step
                else:
                    break

class MyFastaFile(pysam.FastaFile):

    def rich_fetch(self, intrv: GenomicInterval, padding:int=2) -> FaGenomicInterval:

        bases = self.fetch(reference=intrv.chr, 
                           start=max(0, intrv.start-2), 
                           end=max(intrv.chr_length, intrv.end+2)
                           ).upper()
        
        # padding N
        if intrv.start < padding:
            bases += "N"*(padding-intrv.start)
        if intrv.end > intrv.chr_length-padding:
            bases += "N"*(intrv.chr_length-intrv.end)

        return bases


class Coverage(NamedTuple):
    watson: np.array
    crick: np.array

def reverse_read(read) -> bool:
    return read.is_reverse

def forward_read(read) -> bool:
    return not read.is_reverse

class MyAlignmentFile(pysam.AlignmentFile):

    # def __init__(self, )

    def Watson_Crick_coverage(self, intrv: GenomicInterval, quality_threshold: int) -> Coverage:
        cov_watson = self.count_coverage(contig=intrv.chr, 
                                         start=intrv.start, 
                                         stop=intrv.end, 
                                         quality_threshold=quality_threshold,
                                         read_callback=forward_read
                                         )
        cov_crick = self.count_coverage(contig=intrv.chr, 
                                         start=intrv.start, 
                                         stop=intrv.end, 
                                         quality_threshold=quality_threshold,
                                         read_callback=reverse_read
                                         )
        return Coverage(watson=np.array(cov_watson), crick=np.array(cov_crick))



def methylExtractor(params: NamedTuple) -> None:
    
    fa = MyFastaFile(params.fafile)
    bam = MyAlignmentFile(params.bamfile, 'rb')


    # outputs
    outfile_atcg = gzip.open(params.out_atcg, 'wt')

    intervals = iter(GenomicIntervalGenerator(fa, 
                                              chrs= "chr1", 
                                              start = 100000, 
                                              end = 100000+ 55,
                                              step=10))

    for intrv in intervals:

        # context size
        consize = params.context_size
        # ref sequences
        bases = fa.rich_fetch(intrv, padding=consize-1).upper()


        # bam coverages
        covs = bam.Watson_Crick_coverage(intrv, quality_threshold=params.quality_threshold)

        cov_sum_W = np.sum(covs.watson, axis=0)
        cov_sum_C = np.sum(covs.crick, axis=0)

        chr = intrv.chr
        # pos
        for i in range(intrv.end-intrv.start):
            if cov_sum_W[i]+cov_sum_C[i] == 0:
                continue
            

            j = i+2
            base = bases[j]
            if base == 'C':
                # CG/CHG/CHH
                CG_context = CG_CONTEXT_FORWARD_HASH[bases[j:(j+consize)]]
                # dinucleatide context CA/CT/...
                dicontext = bases[j:(j+2)]
            elif base == 'G':
                CG_context = CG_CONTEXT_REVERSE_HASH[bases[(j-consize+1):(j+1)]]
                dicontext = DI_CONTEXT_REVERSE_HASH[bases[(j-1):(j+1)]]
            else:
                CG_context = dicontext = "--"

            # write files
            outfile_atcg.write(f'{intrv.chr}\t{base}\t{intrv.start+i}\t{CG_context}\t{dicontext}\t{covs.watson[0,i]}\t{covs.watson[3,i]}\t{covs.watson[1,i]}\t{covs.watson[2,i]}\t{covs.crick[0,i]}\t{covs.crick[3,i]}\t{covs.crick[1,i]}\t{covs.crick[2,i]}\n')

    
    # close file handers

    fa.close()
    bam.close()
    outfile_atcg.close()



In [135]:

class Parameters(NamedTuple):
    fafile: str
    bamfile: str
    out_atcg: str
    quality_threshold: int
    context_size: int
    
params = Parameters(fafile='./hg38.fa', 
                    bamfile='./A549.bam',
                    out_atcg='./A549.ATCGmap.gz',
                    quality_threshold=15, # base seq quality
                    context_size=3, # CHG etc.
                    )
methylExtractor(params)

In [102]:


params = Parameters(fafile='./hg38.fa', bamfile='./A549.bam')

fa = pysam.FastaFile(params.fafile)
interval = iter(GenomicIntervalGenerator(fa, 
                                         chrs = ["chr1", "chr2"],
                                         start= 100_000_001, 
                                         end = 200_000_002, 
                                         step=50_000_000)
                                         )

interval

<generator object GenomicIntervalGenerator.__iter__ at 0x7f9586f30260>

In [None]:
next(interval)

In [38]:
fa.close()

In [141]:
'N' in "CNG"

True