Consider using Cairo for plotting...

In [57]:
import math
import cairo # see https://www.cairographics.org/samples/
from IPython.display import Image

In [58]:
if (cairo.HAS_SVG_SURFACE and cairo.HAS_PNG_FUNCTIONS):
    print ('Cairo: {c}'.format(c=cairo.CAIRO_VERSION_STRING))

Cairo: 1.14.12


In [59]:
with cairo.SVGSurface("./img/example.svg", 100, 100) as surface:
    context = cairo.Context(surface)
    x, y, x1, y1 = 0.1, 0.5, 0.4, 0.9
    x2, y2, x3, y3 = 0.6, 0.1, 0.9, 0.5
    context.scale(100, 100)
    context.set_line_width(0.04)
    context.move_to(x, y)
    context.curve_to(x1, y1, x2, y2, x3, y3)
    context.stroke()
    context.set_source_rgba(1, 0.2, 0.2, 0.6)
    context.set_line_width(0.02)
    context.move_to(x, y)
    context.line_to(x1, y1)
    context.move_to(x2, y2)
    context.line_to(x3, y3)
    context.stroke()

![a plot of the above code](./img/example.svg)

In [42]:
anno_str       = 'Illumina-450k-Anno.{rev}.{ext}'.format(rev='hg19',ext='pkl')
annotation     = load_obj(anno_str[:-4]) # load the saved annotation file

print (len(annotation.probe))

482421


In [44]:
def _range(probes, interval):
    '''
    Return an unsorted range of probes. 
    Equivalent to df.head() at any starting location. 
    '''
    return dict(list(probes.items())
                [interval.start:interval.end])

def sort_range_by_refid(probes):
    '''
    Return dict of range of probes, sorted by refid.
    Call _range function first to limit scope.
    '''
    return dict(sorted(list(probes.items())))

def sort_range_by_coordinate(probes):
    '''
    Return dict of range of probes, sorted by coordinate.
    Call _range function first to limit scope.
    '''
    return dict(sorted(list(probes.items()), 
                       key=lambda item: item[1].cord))

def get_probes_by_chr(probes, interval):
    """
    Return a dict of probes by chromosome.
    Call _range function first to limit scope.
    """ 
    return {p: probes[p] for p in probes 
            if probes[p].chr == interval.chr}

def get_probes_by_location(probes, interval):
    """
    Return a dict of probes by location.
    Call _range function first to limit scope.
    """
    chrom = interval.chr
    
    probe_dict = {k: probes[k] for k in probes if
                 probes[k].chr == chrom and start < probes[k].cord < end}
    return probe_dict

In [45]:
chrom = 'Y'
start = 0
end  = 5
_slice = Interval(chrom, start, end, '+')

In [46]:
probe_slice = _range(annotation.probe, _slice) # dict of the first 10 annotation entries.
# Cord    Chr   ID* 
# 8553009   Y   cg00035864
# 9363356   Y   cg00050873
# 25314171  Y   cg00061679
# 22741795  Y   cg00063477
# 21664296  Y   cg00121626

In [47]:
sort_range_by_refid(probe_slice)
# 'cg00035864'  8553009
# 'cg00050873'  9363356
# 'cg00061679' 25314171
# 'cg00063477' 22741795
# 'cg00121626' 21664296

{'cg00035864': <__main__.Probe at 0x141df3b70>,
 'cg00050873': <__main__.Probe at 0x141df3b38>,
 'cg00061679': <__main__.Probe at 0x141df39e8>,
 'cg00063477': <__main__.Probe at 0x141df3828>,
 'cg00121626': <__main__.Probe at 0x141df3470>}

In [48]:
sort_range_by_coordinate(probe_slice)
# 'cg00035864'  8553009
# 'cg00050873'  9363356
# 'cg00121626' 21664296
# 'cg00063477' 22741795
# 'cg00061679' 25314171

{'cg00035864': <__main__.Probe at 0x141df3b70>,
 'cg00050873': <__main__.Probe at 0x141df3b38>,
 'cg00121626': <__main__.Probe at 0x141df3470>,
 'cg00063477': <__main__.Probe at 0x141df3828>,
 'cg00061679': <__main__.Probe at 0x141df39e8>}

In [37]:
get_probes_by_chr(probe_slice, _slice)

{'cg00035864': <__main__.Probe at 0x106bda710>,
 'cg00050873': <__main__.Probe at 0x108c2aeb8>,
 'cg00061679': <__main__.Probe at 0x108c2acc0>,
 'cg00063477': <__main__.Probe at 0x108cbe748>,
 'cg00121626': <__main__.Probe at 0x108cbe898>}

In [124]:
sorted_locations = sorted([probes[k].cord for k in probes])

In [90]:
sorted_locations = sorted([probes[k].cord for k in probes])

midway = int((interval.start + interval.end)/2)

if midway in range(sorted_locations[0], sorted_locations[-1]):
    print ('found')




#    
#probes_by_key = sorted([probe for probe in probes]) # sorts on REF_ID by default
#probes_by_key


67905522 X
73754570 X
70400600 X
30265202 X
118370171 X
21901558 X
147062909 X
2847549 X
152912242 X
40015196 X
118892768 X


In [117]:
sorted_locations = sorted([probes[k].cord for k in probes])

midway = int((interval.start + interval.end)/2)

if midway in range(sorted_locations[0], sorted_locations[-1]):
    print ('found')

found


In [108]:
mid in range(sorted_locations[0],sorted_locations[-1])
mid

23444

In [101]:
len(range(range_tup[0], range_tup[1]))

int((range_tup[0] + range_tup[1])/2)

23444

In [None]:
def get_probes_in_range(start, stop):
    return dict(sorted(list(annotate.probe.items())[start:stop]))

probes = get_probes_in_range(0,5) # dict of the first 5 entries in annotate.probe

chrom = 'Y'
start = 8443000
stop  = 8572220

# key:  cg00035864 cg00050873 cg00061679 cg00063477 cg00121626
# cord: 8553009    9363356    25314171   22741795   21664296
# chr:  Y          Y          Y          Y          Y    

[probes[k] for k in probes if probes[k].chr == chrom and start < probes[k].cord < stop]


In [6]:
# %load '../methylator/annotation/annotate_450k.py'

import os

class Probe:
    """
    Holds Illumina 450k probe info for a single CpG site.
    """
    def __init__(self):
        self.id = None
        self. seq = None
        self.name = None
        self.chr = None
        self.cord = None
        self.strand = None
        self.gene = None
        self.refseq = None
        self.tour = None
        self.loc = None
        
class Interval:
    """
    Define a genomic interval by chromsome and strand orientation.
    """
    def __init__(self, chromosome, start, end, strand):
        self.chr = chromosome
        self.start = start
        self.end = end
        self.strand = strand

class Location:
    """
    Define a Probe location.
    """
    BODY = "Body"
    TSS200 = "TSS200"
    TSS1500 = "TSS1500"
    UTR5 = "5'UTR"
    UTR3 = "3'UTR"
    EXON = "Exon"

class CpG_location:
    """
    Defines a CpG location.
    """
    ISLAND = "Island"
    NSHORE = "N_Shore"
    SSHORE = "S_Shore"
    NSHELF = "N_Shelf"
    SSHELF = "S_Shelf"
    
class SNP:
    """
    Defines the SNPs in probes. Used to filter probes.
    """

    def __init__(self):
        self.probeid = None
        self.snpid = None
    
class Annotate_450k:
    """
    Parse and hold information about Illumina probes.
    """

    def __init__(self):        
        for i in open(anno_file, mode="r"):
            self.ann = os.path.join("../../data/", i.strip("\n").strip("\r"))

        self.probe = {}
        self.__run__()

    def __run__(self):
        """
        A static function to setup the Probe classes.
        """
        for i in open(self.ann, mode="r"):
            if i.startswith("cg"):
                data = i.split(",")
                # Assign probe information.
                new_probe = Probe()
                new_probe.id = data[0]
                new_probe.name = data[1]
                new_probe.seq = data[13]
                new_probe.chr = str(data[11])
                new_probe.cord = int(data[12])
                new_probe.strand = data[16]
                new_probe.gene = data[21].split(";")
                new_probe.refseq = data[22]
                locs = data[23].split(";")
                list_locs = []
                for i in locs:
                    if i not in list_locs:
                        list_locs.append(i)

                new_probe.loc = list_locs

                new_probe.tour = data[25]
                newcpg = {new_probe.id: new_probe}
                self.probe.update(newcpg)

    def get_probe(self, probe_id): #WORKS
        """
        Return probe info associated with an reference.
        """
        try:
            probe = self.probe[probe_id]
        except Exception as ex:
            probe = None
            print("WARNING: No probe with ref-id of %s found." % probe_id)
        return probe

    def get_all_probes(self):
        """
        Return list of all probes.
        """
        probe_list = []
        for probe in self.probe.keys():
            probe_list.append(self.get_probe(probe))
        return probe_list
    
    def get_probes_by_list(self, list_of_ids):
        """
        Return a list of probes from a list of references.
        """
        out_list = []
        for probe_id in list_of_ids:
            out_list.append(self.get_probe(probe_id))

        return out_list
    
    def get_probe_refs_by_gene(self, gene_name):
        """
        Get all probe references associated with a gene.
        """
        probes = {k: self.probe[k] for k in self.probe if gene_name in self.probe[k].gene}
        return self.get_keys(probes.keys())

    def get_probe_refs_by_location(self, probe_loc):
        """
        Get all probe references associated with a genomic location.
        """
        probes = {k: self.probe[k] for k in self.probe if probe_loc in self.probe[k].loc}
        return self.get_keys(probes.keys())

    def get_keys(self, dic_keys):
        """
        Get Probe reference from probe dictionaries.
        """
        l = []
        for i in dic_keys:
            l.append(i)
        return l

    def get_probes_by_gene(self, gene_name):
        """
        Return list of probes for an associated gene.
        """
        return self.get_probes_by_list(self.get_probe_refs_by_gene(gene_name))

    def get_probes_by_location(self, loc):
        """
        Return list of probes from genomic location.
        """
        return self.get_probes_by_list(self.get_probe_refs_by_location(loc))

    def get_probes_by_cpg(self, cpg_loc):
        """
        Get a list probes from cpg location.
        FIXME
        """
        return self.get_probes_by_list(self.get_probes_by_cpg(cpg_loc))

    def get_probes_by_chr(self, chr_loc):
        """
        Get a list of probes within a certain genomic region
        FIXME
        """
        print (chr_loc.chr)
        probes = {k: self.probe[k] for k in self.probe if
                  self.probe[k].chr == chr_loc.chr}

    def get_probes_by_chr_and_loc(self, chr_loc):
        """
        Get a list of probes within a certain genomic region
        FIXME
        """
        chrom = chr_loc.chr
        start = int(chr_loc.start)
        end = int(chr_loc.end)
        
        #print (chrom, start, stop)

        probes = {k: self.probe[k] for k in self.probe if
                  self.probe[k].chr == chrom and start < self.probe[k].cord < end}
        return probes

    def get_probe_keys_by_chr_and_loc(self, chr_loc):
        """
        Get a list of probe reference *keys* within a genomic region
        FIXME
        """
        probes = self.get_probes_by_chr_and_loc(chr_loc)
        return self.get_keys(probes)

    def get_number(self):
        """
        Return total number of probes.
        """
        number = 0
        for probe_id in self.probe.keys():
            number += 1

        return number

    def get_coord(self, probe):
        """
        Get genomic coordinate of a single probe.
        """
        return probe.cord
    
    def get_sorted_probes_by_id(self):
        """
        Sort probes according to probe id.
        """
        sorted_keys = sorted(list(self.probe.keys()))
        return sorted_keys
    
    def get_sorted_probes_by_chr(self):
        """
        Sort probes according to probe id.
        """
        return sorted(self.get_all_probes(), key=lambda x: x.chr)
    
    def remove_snp_probes(self):
        """
        Removes all SNPs associated with probes.
        """
        snp_list = []
        snp_file = open("../../data/humanmethylation450_dbsnp137.snpupdate.table.v2.sorted.txt", "r")
        for line in snp_file:
            if line.startswith("cg"):
                line = line.strip("\n").strip("\r").split("\t")
                new_snp = SNP()
                new_snp.probeid = line[0]
                new_snp.snpid = line[1]
                snp_list.append(new_snp)

        for snp in snp_list:
            self.probe.pop(snp.probeid)

anno_file = os.path.abspath("../../data/config.ini") # Illumina probe manifest. Note: This (large) file is not 
                                                     # in the repository.

# Functions to save/load dictionary objects. 

import _pickle as pickle

def save_obj(obj, name):
    with open('../../data/pickle/'+ name + '.pkl', 'wb+') as f:
        pickle.dump(obj, f)
        
def load_obj(name):
    with open('../../data/pickle/' + name + '.pkl', 'rb') as f:
        return pickle.load(f)