In [0]:
from IPython.parallel import Client
import os, time

In [0]:
rc = Client(profile="huge")
dview = rc[:]
lview = rc.load_balanced_view()
len(rc)

In [0]:
def get_async_progress(jobs):
    ready = 0
    for j in jobs:
        if j.ready():
            ready += 1
    return "%d/%d" % (ready, len(jobs))

In [0]:
def get_idle_engines(rc):
    qs = rc.queue_status()
    time.sleep(10)
    active = [eid for eid in sorted(qs)[:-1] if not qs[eid]['queue']]
    d = rc[active]
    l = rc.load_balanced_view(targets=active)
    return d, l

In [0]:
dview, lview = get_idle_engines(rc)

In [0]:
len(dview)

In [0]:
with dview.sync_imports():
    import os
    import sys
    import socket
    import stopwatch

In [0]:
bam_dir = "/data7/eckertlab/projects/ethan/analysis"
analysis_dir = os.path.join(bam_dir, "samtools0.1.19")
assert os.path.exists(analysis_dir)

In [0]:
bam_files = !ls $bam_dir/*_sorted.bam

In [0]:
@lview.remote()
def remove_duplicates(args):
    samtools, sorted_bam, analysis_dir = args
    out = os.path.join(analysis_dir, "%s_rmdup.bam" % os.path.basename(sorted_bam))
    !$samtools rmdup -s $sorted_bam $out
    !$samtools index $out
    return out

In [0]:
samtools = "/home/cfriedline/data7/src/samtools-0.1.19/samtools"

In [0]:
rmdup_jobs = []
for b in bam_files:
    rmdup_jobs.append(remove_duplicates((samtools, b, analysis_dir)))

In [0]:
get_async_progress(rmdup_jobs)

In [0]:
rmdup_jobs[0].get()

In [0]:
assembly = "/data7/cfriedline/assemblies/foxtail2/Green_26_ATCGCGCAA.fastq_31_data_31/contigs.fa_in_map.fa"

In [0]:
bam_rmdup_files = !ls $analysis_dir/*_rmdup.bam

In [0]:
bam_string = "-b " + " -b ".join(bam_rmdup_files)

In [0]:
def create_ploidy_file(bam_files):
    d = os.path.dirname(bam_files[0])
    base = os.path.basename(bam_files[0])
    ploidy_file = os.path.join(d, "%s.ploidy" % "all")
    with open(ploidy_file, "w") as o:
        for b in bam_files:
            name = "%s.fastq" % os.path.basename(b).split(".fastq")[0]
            ploidy = 2
            o.write("%s\t%d\n" % (name, ploidy))
    return ploidy_file
dview['create_ploidy_file'] = create_ploidy_file

In [0]:
def call_snps(args):
    print socket.gethostname()
    timer = stopwatch.Timer()
    samtools, reference, bam_sorted, bcftools, raw_bcf, perl, vcfutils, raw_vcf, out_dir = args 
    if not out_dir:
        out_dir = os.environ['TMPDIR']
    raw_bcf = os.path.join(out_dir, raw_bcf)
    raw_vcf = os.path.join(out_dir, raw_vcf)
    ploidy_file = create_ploidy_file(bam_sorted)
    pileup = "%s mpileup -uf %s %s | %s view -s %s -bvcg - > %s" % (samtools, reference, ' '.join(bam_sorted), bcftools, ploidy_file, raw_bcf) 
    view_filter = "%s view -s %s %s | %s %s varFilter -D100 > %s" % (bcftools, ploidy_file, raw_bcf, perl, vcfutils, raw_vcf)
    print pileup
    #!$pileup
    print view_filter
    !$view_filter
    timer.stop()
    return raw_vcf, args, pileup, view_filter, timer.elapsed
dview['call_snps'] = call_snps

In [0]:
samtools = "/home/cfriedline/data7/src/samtools-0.1.19/samtools"
bcftools = "/home/cfriedline/data7/src/samtools-0.1.19/bcftools/bcftools"
vcfutils = "/home/cfriedline/data7/src/bcftools/vcfutils.pl"
perl = "/home/cfriedline/data7/opt/ActivePerl-5.16/bin/perl"

In [0]:
args = [samtools, 
        assembly, 
        bam_rmdup_files, 
        bcftools, 
        "samtools.bcf", 
        perl,
        vcfutils, 
        "samtools.vcf", 
        analysis_dir]
samtools_job = lview.apply_async(call_snps, args)

In [0]:
print samtools_job.stdout