In [1]:
%load_ext autoreload
%autoreload 2
%pylab inline

import os.path as op
import findspark
import os
findspark.init()

import pyspark
sc = pyspark.SparkContext()

Populating the interactive namespace from numpy and matplotlib


## Utility functions

In [2]:
assembly = 'hg19'

In [15]:
data_dir = op.expanduser("~/data")
output_dir = op.join(data_dir, assembly)   # where all of the intermediate output will be stored
base_ucsc_dir = op.join(data_dir, 'ucsc-data/{}'.format(assembly))  # where all of the files downloaded from UCSC will be stored

import shutil

# create a directory to store intermediate output files
def get_outfile(table_name):
    outfile = op.join(output_dir, 'genbank-output/{}'.format(table_name))
    if op.exists(outfile):
        shutil.rmtree(outfile)
    return outfile

## Load the chromosome lengths

In [16]:
def get_chrom_lengths(base_dir):
    '''
    Get the cumulative start positions for the chromosomes in an assembly. The chromosomes
    will be sorted alphabetically by their names.
    
    :param base_dir: A directory containing meta data about a genome assembly
    :return: A dictionary of the from { 'chr2': 234323432 }, showing at which position
             chromosomes start.
    '''
    chromLengths = (sc.textFile(op.join(base_dir, 'chromInfo.txt.gz'))
                    .map(lambda x: x.split('\t'))
                    .map(lambda x: {'chrom': x[0], 'length': int(x[1]) })
                    .collect())
    
    cum_chrom_lengths = {}
    curr_cum_lengths = 0
    
    for x in sorted(chromLengths, key=lambda x: -x['length']):
        cum_chrom_lengths[x['chrom']] = curr_cum_lengths
        curr_cum_lengths += x['length']
        
    return cum_chrom_lengths

cum_chrom_lengths = get_chrom_lengths(base_ucsc_dir)

print cum_chrom_lengths['chr1'], cum_chrom_lengths['chr2'], cum_chrom_lengths['chr3']

0 249250621 492449994


## Loading the refgene data

In [17]:
from pyspark import SparkContext, SparkConf


def parse_exon_positions(exon_positions_str):
    return map(int, exon_positions_str.strip(",").split(','))

def load_refgene_data(base_dir):
    '''
    Load the UCSC refgene data for a particular assembly.
    
    :param base_dir: The directory which contains the refGene.txt.gz file.
    '''
    refGene = (sc.textFile(op.join(base_dir, 'refGene.txt.gz'))
               .map(lambda x: x.split('\t'))
               .map(lambda x: {'name': x[1],
                               'chrom': x[2],
                               'strand': x[3],
                               'txStart': x[4],
                               'txEnd': x[5],
                               'cdsStart': x[6],
                               'cdsEnd': x[7],
                               'exonCount': x[8],
                               'exonStarts': x[9].strip(','),
                               'exonEnds': x[10].strip(','),
                               'chromOffset': cum_chrom_lengths[x[2]],
                               'genomeTxStart': cum_chrom_lengths[x[2]] + int(x[4]),
                               'genomeTxEnd': cum_chrom_lengths[x[2]] + int(x[5]),
                               'geneName': x[12],
                               'geneLength': int(x[5]) - int(x[4]),
                               })
            )
    
    return refGene

refGene = load_refgene_data(base_ucsc_dir)
### add the genomic position
print refGene.take(1)

[{'exonEnds': u'15469389,15471514,15473730,15476045,15478082,15484120', 'geneName': u'EAF1', 'chromOffset': 492449994, 'name': u'NM_033083', 'txStart': u'15469063', 'exonCount': u'6', 'strand': u'+', 'cdsEnd': u'15480662', 'genomeTxStart': 507919057, 'geneLength': 15057, 'cdsStart': u'15469286', 'chrom': u'chr3', 'genomeTxEnd': 507934114, 'txEnd': u'15484120', 'exonStarts': u'15469063,15471419,15473593,15475854,15477848,15480615'}]


## Count the references

For this exercise, the result should be a tsv with the following columns:

```
<taxId> <geneId> <refSeqID> <citation count>
```

In [18]:
def load_gene_counts(genbank_dir):
    gene2pubmed = (sc.textFile(op.join(genbank_dir, "gene2pubmed"))
                     .filter(lambda x: x[0] != '#')
                     .map(lambda x: x.split('\t'))
                     .map(lambda p: {'taxid': int(p[0]), 'geneid': int(p[1]), 'pmid': int(p[2]), 'count': 1})
                     .map(lambda x: ((x['taxid'], x['geneid']), {'count': x['count']}))
                     )
    
    def reduce_count(r1, r2):
        '''
        A reduce function that simply counts the number of elements in the table.
        
        @param r1: A Row
        @param r2: A Row
        @return: A new Row, equal to the first Row with a summed count.
        '''
        #print >>sys.stderr, "r1:", r1
        r1['count'] += r2['count']
        return r1

    print gene2pubmed.take(1)
    reduced_gene2pubmed = gene2pubmed.reduceByKey(reduce_count)
    
    outfile = get_outfile('taxid-geneid-count')

    (reduced_gene2pubmed
        .map(lambda x: "{}\t{}\t{}".format(x[0][0], x[0][1], x[1]['count']))
        .saveAsTextFile(outfile)
        )
    
    return reduced_gene2pubmed


taxid_geneid_count = load_gene_counts(op.join(data_dir, 'genbank-data/'))
print taxid_geneid_count.take(1)

[((9, 1246500), {'count': 1})]
[((497978, 12113554), {'count': 1})]


## Load the gene2refseq annotations

In [23]:
def load_refseq2gene(genbank_base_dir):
    '''
    Get the mapping from refseq IDs to gene IDs
    
    :param genbank_base_dir: The directory that contains all of the genbank files.
    :return: A set of tuples of the form (refseq_id, (taxid, geneid))
    '''
    gene2refseq = (sc.textFile(op.join(genbank_base_dir, 'gene2refseq'))
                   .filter(lambda x: x[0] != '#')
                   .map(lambda x: x.split('\t'))
                   .map(lambda p: {'taxid': int(p[0]), 'geneid': int(p[1]), 'refseqid': p[3] })
                   .map(lambda x: (x['refseqid'].split('.')[0], (x['taxid'], x['geneid'])))
                   )
    
    def reduce_by_refseq_id(r1, r2):
        # because we're just looking for a mapping from geneId to refseqId, we just need to throw
        # away single entries with identical refseq ids
        return r1
    
    print gene2refseq.take(10)
    refseq2gene = gene2refseq.reduceByKey(reduce_by_refseq_id)
    print refseq2gene.take(1)
    
    outfile = get_outfile('refseqid-taxid-geneid')

    (refseq2gene.map(lambda x: "{}\t{}\t{}".format(x[0], x[1][0], x[1][1]))
         .saveAsTextFile(outfile)
    )
    return refseq2gene

refseqid_taxid_geneid = load_refseq2gene(op.join(data_dir, 'genbank-data'))

[(u'-', (9, 1246500)), (u'-', (9, 1246501)), (u'-', (9, 1246502)), (u'-', (9, 1246503)), (u'-', (9, 1246504)), (u'-', (9, 1246505)), (u'-', (9, 1246509)), (u'-', (9, 1246510)), (u'-', (9, 3722426)), (u'-', (9, 8655732))]
[(u'XM_009628531', (4098, 104117469))]


## Join the reference counts and refseq to geneid translations

In [25]:
print refseqid_taxid_geneid.take(1)

[(u'XM_009054708', (225164, 20243866))]


In [27]:
def join_counts_and_ids(refseqid_taxid_geneid, taxid_geneid_count):
    taxid_geneid_refseq = refseqid_taxid_geneid.map(lambda x: (x[1], x[0]))
    
    
    '''    
    taxid_geneid_refseq = (sc.textFile(op.join(output_dir, 'genbank-output/refseqid-taxid-geneid'))
                   .map(lambda x: x.split())
                   .map(lambda x: ((int(x[1]), int(x[2])), x[0]))
                        )
    '''
    print taxid_geneid_refseq.take(1)
    
    '''
    (sc.textFile(op.join(output_dir, 'genbank-output/taxid-geneid-count'))
                          .map(lambda x: x.split())
                          .map(lambda x: ((int(x[0]), int(x[1])), int(x[2])))
                          )
    '''
    taxid_geneid_count_refseq = taxid_geneid_count.join(taxid_geneid_refseq)
    print taxid_geneid_count_refseq.take(1)
    print taxid_geneid_count.take(1)
    
    outfile = get_outfile('taxid-geneid-refseqid-count')

    (taxid_geneid_count_refseq.map(lambda x: "{}\t{}\t{}\t{}".format(x[0][0],
                                                                  x[0][1],
                                                                  x[1][1],
                                                                  x[1][0]))
     .saveAsTextFile(outfile)
     )
    return taxid_geneid_count_refseq

taxid_geneid_count_refseq = join_counts_and_ids(refseqid_taxid_geneid, taxid_geneid_count)

[((559292, 852335), u'NM_001178394')]
[((670483, 19307480), ({'count': 1}, u'XM_007869167'))]
[((530564, 8707716), {'count': 1})]


## Join the refgene data with the count data

In [28]:
def join_refgene_and_counts(refGene, taxid_geneid_count_refseq):
    '''
    Combine the refGene information about the genes with the citation
    count information.
    '''
    refseqid_refgene = refGene.map(lambda x: (x['name'], x))
    
    print refseqid_refgene.take(1)
    
    refseqid_count = taxid_geneid_count_refseq.map(lambda x: (x[1][1], x[1][0]))
    
    print refseqid_count.take(1)
    
    refseqid_refgene_count = refseqid_refgene.join(refseqid_count)

    print refseqid_refgene_count.take(1)
    

    return refseqid_refgene_count

refseqid_refgene_count = join_refgene_and_counts(refGene, taxid_geneid_count_refseq)

outfile = get_outfile('refgene-count')
(refseqid_refgene_count.map(lambda x: "{name}\t{chrom}\t{strand}\t{txStart}\t{txEnd}\t{genomeTxStart}\t{genomeTxEnd}\t{cdsStart}\t{cdsEnd}\t{exonCount}\t{exonStarts}\t{exonEnds}\t{geneName}\t{count}"
                            .format(count=x[1][1], **x[1][0]))
 .saveAsTextFile(outfile))

[(u'NM_033083', {'exonEnds': u'15469389,15471514,15473730,15476045,15478082,15484120', 'geneName': u'EAF1', 'chromOffset': 492449994, 'name': u'NM_033083', 'txStart': u'15469063', 'exonCount': u'6', 'strand': u'+', 'cdsEnd': u'15480662', 'genomeTxStart': 507919057, 'geneLength': 15057, 'cdsStart': u'15469286', 'chrom': u'chr3', 'genomeTxEnd': 507934114, 'txEnd': u'15484120', 'exonStarts': u'15469063,15471419,15473593,15475854,15477848,15480615'})]
[(u'XM_006853603', {'count': 1})]
[(u'NM_001283037', ({'txEnd': u'55093942', 'geneLength': 50302, 'geneName': u'RTFDC1', 'chromOffset': 2814714882, 'name': u'NM_001283037', 'txStart': u'55043640', 'chrom': u'chr20', 'cdsEnd': u'55093148', 'genomeTxStart': 2869758522, 'strand': u'+', 'cdsStart': u'55043753', 'exonCount': u'8', 'genomeTxEnd': 2869808824, 'exonEnds': u'55043822,55048451,55049827,55052180,55059245,55088484,55092257,55093942', 'exonStarts': u'55043640,55048356,55049733,55052040,55059166,55088370,55092161,55093142'}, {'count': 13})

## Run the entire pipeline

In [29]:
assembly = 'hg19'
output_dir = op.join(data_dir, assembly)    # where all of the intermediate output will be stored
base_ucsc_dir = op.join(data_dir, 'ucsc-data/{}'.format(assembly))  # where all of the files downloaded from UCSC will be stored

cum_chrom_lengths = get_chrom_lengths(base_ucsc_dir)
refGene = load_refgene_data(base_ucsc_dir)
taxid_geneid_count = load_gene_counts(op.join(data_dir, 'genbank-data/'))
refseqid_taxid_geneid = load_refseq2gene(op.join(data_dir, 'genbank-data'))
taxid_geneid_count_refseq = join_counts_and_ids(refseqid_taxid_geneid, taxid_geneid_count)
refseqid_refgene_count = join_refgene_and_counts(refGene, taxid_geneid_count_refseq)

outfile = get_outfile('refgene-count')
(refseqid_refgene_count.map(lambda x: "{name}\t{chrom}\t{strand}\t{txStart}\t{txEnd}\t{genomeTxStart}\t{genomeTxEnd}\t{cdsStart}\t{cdsEnd}\t{exonCount}\t{exonStarts}\t{exonEnds}\t{geneName}\t{count}"
                            .format(count=x[1][1]['count'], **x[1][0]))
 .saveAsTextFile(outfile))

[((9, 1246500), {'count': 1})]
[(u'-', (9, 1246500)), (u'-', (9, 1246501)), (u'-', (9, 1246502)), (u'-', (9, 1246503)), (u'-', (9, 1246504)), (u'-', (9, 1246505)), (u'-', (9, 1246509)), (u'-', (9, 1246510)), (u'-', (9, 3722426)), (u'-', (9, 8655732))]
[(u'XM_009103278', (3711, 103827740))]
[((81985, 17878053), u'XM_006284442')]
[((670483, 19307480), ({'count': 1}, u'XM_007869167'))]
[((8030, 100195418), {'count': 1})]
[(u'NM_033083', {'exonEnds': u'15469389,15471514,15473730,15476045,15478082,15484120', 'geneName': u'EAF1', 'chromOffset': 492449994, 'name': u'NM_033083', 'txStart': u'15469063', 'exonCount': u'6', 'strand': u'+', 'cdsEnd': u'15480662', 'genomeTxStart': 507919057, 'geneLength': 15057, 'cdsStart': u'15469286', 'chrom': u'chr3', 'genomeTxEnd': 507934114, 'txEnd': u'15484120', 'exonStarts': u'15469063,15471419,15473593,15475854,15477848,15480615'})]
[(u'XM_007595264', {'count': 1})]
[(u'NM_004265', ({'exonEnds': u'61596069,61605360,61608003,61608197,61615756,61624543,616250

In [36]:
refseqid_refgene_count.filter(lambda x: x[1][0]['geneName'] == 'TP53').collect()

[(u'NM_000546',
  ({'cdsEnd': u'7579912',
    'cdsStart': u'7572926',
    'chrom': u'chr17',
    'chromOffset': 2655442424,
    'exonCount': u'11',
    'exonEnds': u'7573008,7574033,7576926,7577155,7577608,7578289,7578554,7579590,7579721,7579940,7590868',
    'exonStarts': u'7571719,7573926,7576852,7577018,7577498,7578176,7578370,7579311,7579699,7579838,7590694',
    'geneLength': 19149,
    'geneName': u'TP53',
    'genomeTxEnd': 2663033292,
    'genomeTxStart': 2663014143,
    'name': u'NM_000546',
    'strand': u'-',
    'txEnd': u'7590868',
    'txStart': u'7571719'},
   {'count': 6622})),
 (u'NM_001276761',
  ({'cdsEnd': u'7579569',
    'cdsStart': u'7572926',
    'chrom': u'chr17',
    'chromOffset': 2655442424,
    'exonCount': u'11',
    'exonEnds': u'7573008,7574033,7576926,7577155,7577608,7578289,7578554,7579590,7579721,7579937,7590868',
    'exonStarts': u'7571719,7573926,7576852,7577018,7577498,7578176,7578370,7579311,7579699,7579838,7590694',
    'geneLength': 19149,
    '

In [31]:
outfile = get_outfile('refgene-count')
(refseqid_refgene_count.map(lambda x: "{name}\t{chrom}\t{strand}\t{txStart}\t{txEnd}\t{genomeTxStart}\t{genomeTxEnd}\t{cdsStart}\t{cdsEnd}\t{exonCount}\t{exonStarts}\t{exonEnds}\t{geneName}\t{count}"
                            .format(count=x[1][1]['count'], **x[1][0]))
 .saveAsTextFile(outfile))

In [None]:
refseqid_refgene_count.take(1)

In [120]:
outfile = get_outfile('refgene-count-chr1')
(refseqid_refgene_count.filter(lambda x: x[1][0]['chrom'] == 'chr1').map(lambda x: "{name}\t{chrom}\t{strand}\t{txStart}\t{txEnd}\t{genomeTxStart}\t{genomeTxEnd}\t{cdsStart}\t{cdsEnd}\t{exonCount}\t{exonStarts}\t{exonEnds}\t{geneName}\t{count}"
                            .format(count=x[1][1]['count'], **x[1][0]))
 .saveAsTextFile(outfile))

In [119]:
print outfile

/data/hg19/genbank-output/refgene-count-chr1
