In [1]:
import numpy as np
import pysam
from tempfile import NamedTemporaryFile
SIGNIFICANCE = 0.05

In [2]:
class samstats:    
    """generate statistics for a samfile"""
    def __init__(self, samfile, ref_seq): 
        """
        Args:
            samfile: pysam.AlignmentFile
            ref_seq: (str) reference sequence
        """
        self.samfile=samfile
        self.ref_seq = ref_seq
        total_reads = !samtools view {samfile.filename} | wc -l #for some reason, samfile.unmapped does not work. 
        self.file_stats = {
            "total_reads": int(total_reads[0]),
            "mapped_reads": samfile.mapped, 
            "total_nts": sum(r.get_tag("ZQ") for r in samfile.fetch())
        }
        self.read_stats = [self.readstats(r) for r in samfile.fetch() if not r.is_unmapped]
    
    def readstats(self, read):
        """statistics for a single read. 
        
        Args: 
            read: pysam.AlignedRead
        """
        s = {
            "read_len": read.get_tag("ZQ"),
            "mapping_quality": read.mapping_quality, 
            "aln_len": read.alen,
            "aln_score": read.get_tag("AS"), 
            "aln_evalue": read.get_tag("ZE"),
            "aln_editdistance": read.get_tag("NM"),
            "mapped_nts": sum([l for op, l in read.cigartuples if op == 0]),
            "ins": sum(l for op, l in read.cigartuples if op == 1),
            "del": sum(l for op, l in read.cigartuples if op == 2),
            "subst": sum(1 for query, ref in read.get_aligned_pairs(matches_only=True) 
                         if read.seq[query] != self.ref_seq[ref]),
            "is_significant": e2p(read.get_tag("ZE")) < SIGNIFICANCE / self.file_stats["total_reads"]
        }
        assert s["read_len"] >= s["mapped_nts"]
        assert s["aln_len"] >= s["mapped_nts"]
        assert len(read.query_sequence) == s["read_len"]
        return s       
    
    def sumstat(self, stat): 
        """sum of the stat <stat> for all reads."""
        return sum(r[stat] for r in self.read_stats)
        
    def print_summary(self): 
        lines = [
            ["mapped_reads/total_reads", self.file_stats["mapped_reads"], self.file_stats["total_reads"]],
            ["significant_reads/total_reads", self.sumstat("is_significant"), self.file_stats["total_reads"]],
            ["mapped_nts/total_nts", self.sumstat("mapped_nts"), self.file_stats["total_nts"]],
            ["editdistance/alignment_length", self.sumstat("aln_editdistance"), self.sumstat("aln_len")],
            ["alignment_score/alignment_length", self.sumstat("aln_score"), self.sumstat("aln_len")],
            ["SNPs/mapped_nts", self.sumstat("subst"), self.sumstat("mapped_nts")],
            ["ins/mapped_nts", self.sumstat("ins"), self.sumstat("mapped_nts")],
            ["del/mapped_nts", self.sumstat("del"), self.sumstat("mapped_nts")],            
        ]
        def process_line(line): 
            return [str(x) for x in (line + ["{0:%}".format(float(line[1])/line[2])])]
        return [process_line(line) for line in lines]

In [15]:
def mk_consensus(bam_file, ref_file): 
    """calculate the consensus sequence of a bam file"""
    cons_fq = !samtools mpileup -uf {ref_file} {bam_file} | \
        bcftools view -cg - | \
        /home/ibis/gregor.sturm/nanopore/tools/bcftools/vcfutils.pl vcf2fq
    i = [i for i, line in enumerate(cons_fq) if line[0] == "@"][0]
    return cons_fq, i

In [12]:
def needle(ref_seq, target_seq):    
    """needleman-wunsch global alignment of two sequences"""
    with NamedTemporaryFile('w') as ref_file:
        with NamedTemporaryFile('w') as target_file: 
            with NamedTemporaryFile('r+') as output_file: 
                ref_file.write(ref_seq)
                target_file.write(target_seq)
                target_file.flush()
                ref_file.flush()
                !needle -asequence {ref_file.name} -bsequence {target_file.name} -aformat score -outfile {output_file.name}
                out = output_file.readlines()
    
    return out
    

In [13]:
def e2p(e): 
    """convert Evalue to Pvalue (of Alignment)"""
    return 1-np.exp(-e)