# Run eQTL Analysis

This notebook coordinates and executes the eQTL analysis. This notebook is
specialized for the Frazer lab cluster. Since running the entire analysis is 
time consuming, I generally run it "by hand," starting jobs for groups of
genes at different times.

In [1]:
import cPickle
import datetime
import glob
import gzip
import os
import random
import shutil
import subprocess
import time
import uuid

import cdpybio as cpb
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pybedtools as pbt
import scipy.stats as stats
import seaborn as sns
import statsmodels.api as sm

import cardipspy as cpy
import ciepy

%matplotlib inline
%load_ext rpy2.ipython

random.seed(20150605)

In [2]:
outdir = os.path.join(ciepy.root, 'output',
                      'run_eqtl_analysis')
cpy.makedir(outdir)

private_outdir = os.path.join(ciepy.root, 'private_output',
                              'run_eqtl_analysis')
cpy.makedir(private_outdir)

In [19]:
gene_info = pd.read_table(cpy.gencode_gene_info, index_col=0)

gold_eqtls = pd.read_table(os.path.join(ciepy.root, 'output', 
                                       'eqtl_methods_exploration',
                                       'gold_eqtls.tsv'), index_col=0)

fn = os.path.join(ciepy.root, 'output', 'eqtl_input', 'gene_to_regions.p')
gene_to_regions = cPickle.load(open(fn, 'rb'))

exp = pd.read_table(os.path.join(ciepy.root, 'output', 'eqtl_input', 
                                 'tpm_log_filtered_phe_std_norm_peer_resid.tsv'), index_col=0)

In [18]:
def run_emmax(gene_id):
    os.chdir('/raid3/projects/CARDIPS/analysis/cardips-ipsc-eqtl/notebooks')
    toutdir = os.path.join(outdir, 'results', gene_id)
    if not os.path.exists(toutdir):
        cpy.makedir(toutdir)
        fn = os.path.join(toutdir, '{}.sh'.format(gene_id))
        with open(fn, 'w') as f:
            c = 'python {} \\\n\t'.format(os.path.join(ciepy.root, 'scripts', 'run_emmax.py'))
            c += ' \\\n\t'.join([
                    gene_id,
                    os.path.join(ciepy.root, 'private_data', 'wgs', 'biallelic_snvs.vcf.gz'),
                    ','.join(gene_to_regions[gene_id]),
                    os.path.join(ciepy.root, 'output', 'eqtl_input', 
                                 'tpm_log_filtered_phe_std_norm_peer_resid.tsv'),
                    os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax.ind'),
                    os.path.join(ciepy.root, 'output', 'eqtl_input', 'wgs.kin'),
                    toutdir,
                    '-c {}'.format(os.path.join(ciepy.root, 'output', 'eqtl_input', 
                                                'emmax_sex_only.cov')),
                ])
            f.write(c + '\n')
        subprocess.check_call('bash {}'.format(fn), shell=True)

## PBS

Run jobs using PBS queue system.

In [689]:
todo = list(set(exp.index) - 
            set([os.path.split(x)[1] for x in glob.glob(os.path.join(outdir, 'results', '*'))]))
todo = [x for x in todo if gene_info.ix[x, 'chrom'] not in ['chrX', 'chrY', 'chrM']]

In [207]:
def run_emmax_pbs(gene_ids, n=10):
    """
    gene_ids is a list of gene_ids and n is the number of genes to submit at the same time.
    This script will find n number of genes that EMMAX hasn't been run for and submit a job
    for those genes.
    """
    genes_todo = []
    i = 0
    while len(genes_todo) < n:
        if not os.path.exists(os.path.join(outdir, 'results', gene_ids[i])):
            genes_todo.append(gene_ids[i])
        i += 1
    res = datetime.datetime.now()
    date = '{}-{:02d}-{:02d}-{:02d}-{:02d}-{:02d}'.format(res.year, res.month,
                                                          res.day, res.hour,
                                                          res.minute,
                                                          res.second)
    fn = os.path.join(outdir, 'results', 'pbs_scripts', '{}.pbs'.format(date))
    with open(fn, 'w') as f:
        f.write('#!/bin/bash\n#PBS -q high\n')
        f.write('#PBS -N emmax_{}\n'.format(date))
        f.write('#PBS -l nodes=1:ppn=2\n')
        f.write('#PBS -o {}/emmax_{}.out\n'.format(
                os.path.join(outdir, 'results', 'pbs_scripts'), date))
        f.write('#PBS -e {}/emmax_{}.err\n\n'.format(
                    os.path.join(outdir, 'results', 'pbs_scripts'), date))
        f.write('source activate cardips\n')
        f.write('source /raid3/projects/CARDIPS/pipeline/'
                'cardips-data-software/environment.sh\n\n')
        for gene_id in genes_todo:
            toutdir = os.path.join(outdir, 'results', gene_id)
            cpy.makedir(toutdir)
            c = 'python {} \\\n\t'.format(os.path.join(ciepy.root, 'scripts', 'run_emmax.py'))
            c += ' \\\n\t'.join([
                    gene_id,
                    os.path.join(ciepy.root, 'private_data', 'wgs', 'biallelic_snvs.vcf.gz'),
                    ','.join(gene_to_regions[gene_id]),
                    os.path.join(ciepy.root, 'output', 'eqtl_input', 
                                 'tpm_log_filtered_phe_std_norm_peer_resid.tsv'),
                    os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax.ind'),
                    os.path.join(ciepy.root, 'output', 'eqtl_input', 'wgs.kin'),
                    toutdir,
                    '-c {}'.format(os.path.join(ciepy.root, 'output', 'eqtl_input', 
                                                'emmax_sex_only.cov')),
                ])
            f.write(c + '\n\n')
    cpy.submit_job(fn)

In [None]:
if len(todo) > 0:
    for i in range(10):
        run_emmax_pbs(todo, n=20)
        run_emmax_pbs(todo, n=20)
        run_emmax_pbs(todo, n=20)
        run_emmax_pbs(todo, n=20)
        run_emmax_pbs(todo, n=20)
        time.sleep(30)

### Look for failed jobs

Sometimes jobs I submit don't actually start. The code below checks for such
jobs.

Some genes have errors when they are running. These genes won't have `minimum_pvalues.tsv`
files even when their job finishes. I want to identify these genes and remove their output
directory so they will be run in a new job. I think these genes often leave behind their
temp directory so I'll have to go delete those.

In [680]:
c = ('ssh cdeboever@flc.ucsd.edu \'qstat -f | grep Job_Name > /raid3/projects/'
     'CARDIPS/analysis/cardips-ipsc-eqtl/output/run_eqtl_analysis/'
     'results/pbs_scripts/jnames.txt\'')
subprocess.check_call(c, shell=True)

c = ('ssh cdeboever@flc.ucsd.edu \'qstat -fn1 > /raid3/projects/'
     'CARDIPS/analysis/cardips-ipsc-eqtl/output/run_eqtl_analysis/'
     'results/pbs_scripts/qstatfn1.txt\'')
subprocess.check_call(c, shell=True)

with open('/raid3/projects/CARDIPS/analysis/cardips-ipsc-eqtl/output/'
          'run_eqtl_analysis/results/pbs_scripts/jnames.txt') as f:
    jnames = [x.strip().split()[-1] for x in f.readlines()]
    
with open('/raid3/projects/CARDIPS/analysis/cardips-ipsc-eqtl/output/'
          'run_eqtl_analysis/results/pbs_scripts/qstatfn1.txt') as f:
    qstat = [x.split() for x in f.readlines()[5:]]
qstat = pd.DataFrame(qstat)
qstat.columns = ['job_id', 'username', 'queue', 'short_name',
                  'sessid', 'nds', 'tsk', 'mem', 'time', 
                  'status', 'elap_time', 'node']
qstat['job_id'] = qstat.job_id.apply(lambda x: x.split('.')[0])
qstat['node'] = qstat.node.apply(lambda x: x.split('/')[0])
qstat['name'] = jnames
qstat.index = qstat.name
qstat = qstat[qstat.username == 'cdeboeve']

In [681]:
bad_jobs = []
for j in qstat.index:
    fn = ('/raid3/projects/CARDIPS/analysis/cardips-ipsc-eqtl/output/'
          'run_eqtl_analysis/results/pbs_scripts/{}.pbs'.format(j.replace('emmax_', '')))
    if not os.path.exists(fn):
        bad_jobs.append(qstat.ix[j, 'job_id'])

In [682]:
print('qdel {}'.format(' '.join(bad_jobs)))

qdel 


In [683]:
qstat['genes'] = [set() for i in range(qstat.shape[0])]
qstat['started_genes'] = [set() for i in range(qstat.shape[0])]
qstat['finished_genes'] = [set() for i in range(qstat.shape[0])]

for j in qstat.index:
    fn = ('/raid3/projects/CARDIPS/analysis/cardips-ipsc-eqtl/output/'
          'run_eqtl_analysis/results/pbs_scripts/{}.pbs'.format(j.replace('emmax_', '')))
    with open(fn) as f:
        lines = [x.strip().split()[0] for x in f.readlines() if x.strip()[0:3] == 'ENS']
    qstat.ix[j, 'genes'] = set(lines)
    for g in lines:
        if os.path.exists('/raid3/projects/CARDIPS/analysis/cardips-ipsc-eqtl/output/'
                          'run_eqtl_analysis/results/{0}/{0}.tsv'.format(g)):
            qstat.ix[j, 'started_genes'].add(g)
        if os.path.exists('/raid3/projects/CARDIPS/analysis/cardips-ipsc-eqtl/output/'
                          'run_eqtl_analysis/results/{0}/minimum_pvalues.tsv'.format(g)):
            qstat.ix[j, 'finished_genes'].add(g)

In [684]:
started_genes = frozenset().union(*qstat.started_genes)
finished_genes = frozenset().union(*qstat.finished_genes)
job_genes = frozenset().union(*qstat.genes)

In [685]:
gene_dys = glob.glob('/raid3/projects/CARDIPS/analysis/cardips-ipsc-eqtl/output/'
                     'run_eqtl_analysis/results/ENS*')
not_done = []
for dy in gene_dys:
    g = os.path.split(dy)[1]
    if not os.path.exists(os.path.join(dy, 'minimum_pvalues.tsv')):
        not_done.append(g)

In [686]:
# These are genes that aren't finished but aren't in
# any of the current jobs. We can delete these directories.
to_delete = set(not_done) - job_genes
print(len(to_delete))

3


In [687]:
for g in to_delete:
    shutil.rmtree(os.path.join(outdir, 'results', g))

In [688]:
# I am specifying nodes individually because rcom is hanging up due to some 
# nodes that are offline.
node_to_delete = {}
nodes = set(qstat.node) - set(['--'])
for node in nodes - set(['cn6']):
    c = ('ssh cdeboever@flc.ucsd.edu \'ssh {} \'ls /dev/shm\'\''.format(node))
    in_ram = set(subprocess.check_output(c, shell=True).strip().split('\n')) - set([''])
    t = qstat[qstat.node == node]
    job_genes = frozenset().union(*t.genes)
    to_delete = in_ram - job_genes
    node_to_delete[node] = to_delete
    if len(to_delete) > 0:
        fn = os.path.join(ciepy.root, 'sandbox', '{}.sh'.format(node))
        with open(fn, 'w') as f:
            f.write('#!/bin/bash\n\n')
            f.write('\n'.join(['rm -r /dev/shm/{}'.format(x) for x in to_delete]) + '\n')
        c = ('ssh cdeboever@flc.ucsd.edu \'ssh {} \'bash {}\'\''.format(node, fn))
        subprocess.check_call(c, shell=True)
        os.remove(fn)

In [None]:
2 +

## Local

Run jobs on fl1 using an IPython cluster. This code may need to be updated.

In [48]:
def run_emmax(gene_id):
    os.chdir('/raid3/projects/CARDIPS/analysis/cardips-ipsc-eqtl/notebooks')
    toutdir = os.path.join(outdir, 'test_results', gene_id)
    if not os.path.exists(toutdir):
        ppy.makedir(toutdir)
        fn = os.path.join(toutdir, '{}.sh'.format(gene_id))
        with open(fn, 'w') as f:
            c = 'python {} \\\n\t'.format(os.path.join(cpy.root, 'scripts', 'run_emmax.py'))
            c += ' \\\n\t'.join([
                    gene_id,
                    os.path.join(cpy.root, 'private_data', 'wgs', 'biallelic_snvs.vcf.gz'),
                    ','.join(gene_to_regions[gene_id]),
                    os.path.join(cpy.root, 'output', 'eqtl_input', 
                                 'tpm_log_filtered_phe_std_norm_peer_resid.tsv'),
                    os.path.join(cpy.root, 'output', 'eqtl_input', 'emmax.ind'),
                    os.path.join(cpy.root, 'output', 'kinship_matrix', 'wgs.kin'),
                    toutdir,
                    '-c {}'.format(os.path.join(cpy.root, 'output', 'eqtl_input', 
                                                'emmax_sex_only.cov')),
                ])
            f.write(c + '\n')
        subprocess.check_call('bash {}'.format(fn), shell=True)

In [134]:
!ipcluster stop --profile=cardips

2015-07-18 08:16:45.402 [IPClusterStop] CRITICAL | Could not read pid file, cluster is probably not running.


In [99]:
!ipcluster start -n 12 --daemon --profile=cardips
!sleep 30

In [101]:
from IPython.parallel import Client
parallel_client = Client()

In [102]:
dview = parallel_client[:]

In [103]:
with dview.sync_imports():
    import os
    import subprocess
    import time
    import ciepy
    import projectpy

importing os on engine(s)
importing subprocess on engine(s)
importing time on engine(s)
importing ciepy on engine(s)
importing projectpy on engine(s)


In [104]:
%px cpy = ciepy
%px ppy = projectpy

In [105]:
dview.push(dict(gene_to_regions=gene_to_regions, outdir=outdir, run_emmax=run_emmax))

<AsyncResult: _push>

In [129]:
dview.scatter('todo', todo);

In [130]:
sleep = np.arange(0, 10 * len(parallel_client.ids), 10)
dview.scatter('sleep', sleep);

In [None]:
%px time.sleep(sleep[0]) ; [run_emmax(x) for x in todo]