In []:
import gzip, os, sys
from Bio.SeqIO.QualityIO import FastqGeneralIterator
from IPython.parallel import Client

In []:
file_dir = "/home/cfriedline/eckertlab/gypsy_indiv/HiSeq_140425"
os.chdir(file_dir)

In []:
gz_files = !ls *.gz

In []:
rc = Client(profile="sge")
dview = rc[:]
lview = rc.load_balanced_view()
len(rc)

In []:
@dview.remote(block=True)
def get_cpu_count():
    import multiprocessing as mp
    import socket
    return socket.gethostname(), mp.cpu_count()
cpu_counts = get_cpu_count()
from collections import defaultdict
cpu_dict = defaultdict(list)
for i, c in enumerate(cpu_counts):
    if c[1] >= 16:
        cpu_dict[c[0]].append(i)  
print cpu_dict
cview = rc.load_balanced_view(targets=[v[0] for k, v in cpu_dict.items()])

In []:
with dview.sync_imports():
    import stopwatch
    import numpy
    import numpy as np
    import scipy
    import pandas
    import gzip
    import os
    import tempfile
    import shutil
    import socket
    from Bio.SeqIO.QualityIO import FastqGeneralIterator
    from collections import deque, defaultdict
    import multiprocessing 
    from multiprocessing import Pool, Manager
    import traceback
    from itertools import izip

In []:
@lview.remote()
def get_num_seqs(f):
    count = !zgrep -c . $f
    return (f, int(count[0])/4)

In []:
count_jobs = []
for f in gz_files:
    f = os.path.abspath(f)
    count_jobs.append(get_num_seqs(f))

In []:
[j.ready() for j in count_jobs]

In []:
seq_nums = {}
with open("seq_nums.txt", "w") as o:
    for j in count_jobs:
        if j.ready():
            seq_nums[j.r[0]] = j.r
            o.write("%s\n" % "\t".join([str(x) for x in j.r]))

In []:
seq_nums

In []:
seq_pairs = []
for k, v in seq_nums.items():
    if "_R1_" in v[0]:
        read2 = v[0].replace("_R1_", "_R2_")
        if os.path.exists(read2):
            seq_pairs.append([v, seq_nums[read2]])
        else:
            seq_pairs.append([v])
for p in seq_pairs:
    print p

In []:
def format_fastq_tuple(title, seq, qual):
    assert len(seq) == len(qual)
    return "@%s\n%s\n+\n%s\n" % (title, seq, qual)

def convert_qual(q):
    return ord(q)-33

def get_qual_scores(q):
    qual = [ord(x)-33 for x in q] #list comps seems to be fastest here
    return numpy.array([qual, numpy.mean(qual)])

def eval_quality(q):
    qual = get_qual_scores(q)
    scores = qual[0]
    win_size = 5
    qual_cutoff = 30
    len_cutoff = 0.5
    
    if qual[1] < qual_cutoff:
        return False
    
    below_cutoff = 0.0
    window = deque(maxlen=win_size)
    qual_perc_cutoff = 0.20
    win_end = win_size
    last_good = None
    for s in scores:
        window.append(s)
        if s < qual_cutoff:
            below_cutoff += 1 #keep track of scores below the quality cutoff
        if len(window) == win_size:
            if numpy.mean(window) < qual_cutoff:
                if last_good is None:
                    last_good = win_end                    
                    if float(last_good)/len(scores) < len_cutoff:
                        return False    # then it's too short                
            win_end += 1 
    perc_below = below_cutoff/len(scores)
    if last_good:
        scores = scores[0:(last_good-1)] #trim the scores if it will be long enough
    perc_len = float(len(scores))/len(qual[0])
    if perc_below > qual_perc_cutoff:
        return False #drop reads if overall bases have quality values < cutoff, even if average is ok
    return scores

def split_file(seqs):
    d = defaultdict(list)
    num_cpu = multiprocessing.cpu_count()
    for seq in seqs:
        print "seq=", seq
        f, num = seq
        print f
        reads_per_file = float(num)//num_cpu
        read_idx = 0
        file_num = 0
        for title, seq, qual in FastqGeneralIterator(gzip.open(f)):
            if read_idx == 0:
                t = tempfile.NamedTemporaryFile(delete=False)
                print socket.gethostname(), t.name, file_num + 1, "/", num_cpu
                d[f].append(t)
            t.write(format_fastq_tuple(title, seq, qual))
            read_idx += 1
            
            if read_idx == reads_per_file:
                read_idx = 0
                file_num += 1
    for k, l in d.items():
        [x.close() for x in l]
        d[k] = [x.name for x in l]
    return d

def collapse_results(source, results):
    out = source.replace(".gz", "_processed.fastq")
    temp = tempfile.NamedTemporaryFile(delete=False)
    for r in results:
        for line in open(r):
            temp.write(line)
    temp.close()
    shutil.copy(temp.name, out)
    os.remove(temp.name)
    x = [os.remove(x) for x in results]
    return out 

def process_single_file(f):
    tmp = tempfile.NamedTemporaryFile(delete=False)
    basename = os.path.basename(f)
    count = 0
    n = 0
    trimmed = 0
    for title, seq, qual in FastqGeneralIterator(open(f)):
        if seq.startswith("N"):
            seq = seq[1:]
            qual = qual[1:]
            
        if not "N" in seq:
            scores = eval_quality(qual)
            if scores:
                if len(scores) != len(seq):
                    seq = seq[0:len(scores)]
                    qual = qual[0:len(scores)]
                    trimmed += 1
                tmp.write(format_fastq_tuple(title, seq, qual))
        else:
            n += 1
            
        count += 1
        
        if count % 10000 == 0:
            print("%s, %s, %d, %d, %d" % (socket.gethostname(), basename, count, n, trimmed))
    tmp.close()
    return tmp.name


def process_single(seqs):
    timer = stopwatch.Timer()
    pool = Pool()
    hostname = socket.gethostname()
    print "processing on %s" % hostname
    splits = split_file(seqs)
    results = []
    source = None
    for k, temp_files in splits.items():
        source = k
        for f in temp_files:
            try:
                p = pool.apply_async(process_single_file, (f,))
                results.append(p) 
            except:
                traceback.print_exc()
    pool.close() 
    pool.join()
    
    #collapse processed temp files
    res = collapse_results(source, [x.get() for x in results])
    
    #remove temp split source files
    for k, v in splits.items():
        x = [os.remove(x) for x in v]   
    timer.stop()
    return socket.gethostname(), source, res, timer.elapsed

dview['format_fastq_tuple'] = format_fastq_tuple
dview['eval_quality'] = eval_quality
dview['get_qual_scores'] = get_qual_scores
dview['convert_qual'] = convert_qual
dview['split_file'] = split_file
dview['process_single'] = process_single
dview['collapse_results'] = collapse_results
dview['process_single_file'] = process_single_file 

In []:
process_jobs = []
for s in seq_pairs:
    process_jobs.append(cview.apply_async(process_single, s))

In []:
[j.ready() for j in process_jobs]

In []:
@lview.remote()
def run_cmd(cmd, dirname):
    print "running %s on %s" % (cmd, socket.gethostname())
    os.chdir(dirname)
    !$cmd
    return cmd, dirname

In []:
fastqc_jobs = []
fastqc_submitted = set()

In []:
fastqc_submitted

In []:
[r.ready() for r in fastqc_jobs]

In []:
for j in process_jobs:
    if j.ready():
        proc_file = j.r[2]
        if not proc_file in fastqc_submitted:
            fastqc_submitted.add(proc_file)
            fastqc_jobs.append(run_cmd("/home/cfriedline/src/FastQC/fastqc %s" % proc_file, os.path.dirname(proc_file)))

In []:
from IPython.display import Image

In []:
Image("lane1_Undetermined_L001_R1_001.fastq_processed_fastqc/Images/per_base_quality.png")

In []:
Image("lane2_Undetermined_L002_R1_001.fastq_processed_fastqc/Images/per_base_quality.png")