In [0]:
import os, sys
from ipyparallel import Client
import matplotlib.pyplot as plt
%matplotlib inline
from subprocess import Popen, PIPE
from Bio import SeqIO
import pandas as pd
import pickle
import scandir
import sqlite3
from sqlalchemy import create_engine, MetaData, Table, Column, VARCHAR
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from ipyparallel import Client
import filelock
from pymongo import MongoClient
from collections import Counter
import shutil
import numpy as np
import stopwatch
import multiprocessing
import pysam
from itertools import zip_longest

In [0]:
host = "godel96"
port = 27018
mongo = MongoClient(host, port)
db = mongo.fastq_index
reads = db.reads

In [0]:
filedir = "/gpfs_fs/home/eckertlab/Mitra/mapping_bwa/trimmed/mapped/split_parallel"

In [0]:
if not os.path.exists(filedir):
    os.mkdir(filedir)

In [0]:
cd $filedir

In [0]:
sam = "/gpfs_fs/home/eckertlab/Mitra/mapping_bwa/trimmed/mapped/all_fastq.sam"

In [0]:
samtools = "/home/cfriedline/bin/samtools"

In [0]:
header = !$samtools view -H $sam

In [0]:
with open("header.sam", "w") as o:
    for h in header:
        o.write(h + "\n")

In [0]:
rc = Client(profile="sge")
dv = rc[:]
lv = rc.load_balanced_view()
len(dv)

In [0]:
dv.push({'host':host, 'port':port}).r;

In [0]:
with dv.sync_imports():
    import os
    import sys
    import shutil
    import socket
    from collections import Counter

In [0]:
%px from pymongo import MongoClient
%px mongo = MongoClient(host, port)
%px db = mongo.fastq_index
%px cd $filedir
%px reads = db.reads

In [0]:
def get_writer(sample):
    f = "%s_%s_%d.sam" % (sample, socket.gethostname(), os.getpid())
    if not sample in writers:
        writers[sample] = open(f, "w")
    return writers[sample]

def get_sample(read):
    return reads.find_one({"read": read})['sample']

In [0]:
def close_writers():
    for k, v in writers.items():
        v.close()

In [0]:
dv.push({'get_writer': get_writer, 
         'get_sample':get_sample,
         'close_writers': close_writers,
         'writers':{}}).r;

## split alignments into 5M line files

```
cat ../all_fastq.sam | grep -v "^@" | split -l 5000000 -d -a 3
```

In [0]:
split_sam_files = !ls x*

In [0]:
len(split_sam_files)

In [0]:
@lv.remote()
def split_sam(f):
    counts = Counter()
    read_count = 0
    for line in open(f):
        read = line.split()[0]        
        sample = get_sample(read)
        writer = get_writer(sample)
        writer.write(line)
        read_count += 1
        counts[sample] += 1
        if read_count % 100000 == 0:
            print("wrote %d" % read_count)
    return counts, read_count

In [0]:
jobs = []
for s in split_sam_files:
    jobs.append(split_sam(s))

In [0]:
[x.stdout.split("\n")[-2] for x in jobs], len(split_sam_files)

In [0]:
np.sum([x.ready() for x in jobs]), len(split_sam_files)

In [0]:
dv.apply_async(close_writers).r

In [0]:
pwd

In [0]:
sample_sams = !ls *.sam | grep godel

In [0]:
len(sample_sams)

In [0]:
def get_name(filename):
    s1 = filename.split(".fastq")
    name = None
    if len(s1) == 2:
        name = s1[0].split("_")[0]
    elif len(s1) == 3:
        name = s1[1].split("_")[2]
    else:
        raise Exception
    return name

In [0]:
sam_dict = {}
for s in sample_sams:
    name = get_name(s)
    if not name in sam_dict:
        sam_dict[name] = []
    sam_dict[name].append(s)

In [0]:
dv['sam_dict'] = sam_dict

In [0]:
pwd

In [0]:
!mkdir collapsed

In [0]:
%px cd collapsed

In [0]:
cd collapsed

In [0]:
@lv.remote()
def collapse(key):
    s = "%s.sam" % key
    shutil.copy("../header.sam", s)
    with open(s, "a") as o:
        for f in sam_dict[key]:
            for line in open("../%s" % f):
                o.write(line)

In [0]:
jobs = []
for k in sam_dict:
    jobs.append(collapse(k))

In [0]:
np.sum([x.ready() for x in jobs])

In [0]:
@lv.remote()
def convert_sam_to_bam(sam):
    import stopwatch, multiprocessing, os
    timer = stopwatch.Timer()
    cpus = multiprocessing.cpu_count()
    bam = sam.replace(".sam", ".bam")
    bam_sorted = "%s_sorted.bam" % bam.replace(".bam", "")
    bam_index = bam_sorted.replace(".bam", ".bai")
    if not os.path.exists(bam):
        !/home/cfriedline/bin/samtools view -bS $sam > $bam
        !/home/cfriedline/bin/samtools sort -@ $cpus $bam -o $bam_sorted
    timer.stop()
    return bam, bam_sorted, bam_index, timer.elapsed

In [0]:
combined_sams = !ls *.sam

In [0]:
def get_flowcell(name, sample):
    n = name.split(":")
    return "%s.%s.%s" % (n[2], n[3], sample)

In [0]:
sample_rgids = {}
for c in combined_sams:
    line = !tail -n1 $c
    name = line[0].split("\t")[0]
    sample = c.split(".")[0]
    sample_rgids[sample] = get_flowcell(name, sample)

In [0]:
dv['sample_rgids'] = sample_rgids

In [0]:
def add_rg_to_sam(sam):
    import os
    sam_rg = sam.replace(".sam", "_rg.sam")
    sample = sam.split(".")[0]
    rg_z = sample_rgids[sample]
    rgid = "ID:%s" % rg_z
    rglb = "LB:%s" % ".".join(rg_z.split(".")[0:-1])
    rgpu = "PU:%s" % ".".join(rg_z.split(".")[0:-1])
    rgsm = "SM:%s" % sample
    rg_string = "\t".join(["@RG",
                          rgid,
                          rglb,
                          rgsm,
                          rgpu])
    
    add_rg_z = False
    with open(sam_rg, "w") as o:
        for line in open(sam):
            if not line.startswith("@"):
                if not add_rg_z:
                    o.write(rg_string + "\n")
                    add_rg_z = True
                else:
                    o.write(line.strip() + "\t" + "RG:Z:%s" % rg_z + "\n" )
            else:
                o.write(line)
    return sam_rg, rg_string

In [0]:
dv['add_rg_to_sam'] = add_rg_to_sam

In [0]:
jobs = []
for c in combined_sams:
    jobs.append(lv.apply_async(add_rg_to_sam, c))

In [0]:
np.sum([x.ready() for x in jobs])

In [0]:
rg_sams = !ls *rg.sam

In [0]:
len(rg_sams)

In [0]:
jobs = []
for s in rg_sams:
    jobs.append(convert_sam_to_bam(s))

In [0]:
np.sum([x.ready() for x in jobs])

In [0]:
bams = !ls *rg_sorted.bam

In [0]:
len(bams)

In [0]:
def index_bam(bam):
    cmd = "%s index %s" % (samtools, bam)
    !$cmd
    return bam

In [0]:
def get_mapped(bam):
    import os
    out = "%s_mapped.bam" % bam.split(".")[0]
    if not os.path.exists(out):                                       
        cmd = "%s view -b -F 4 %s > %s" % (samtools, bam, out)
        res = !$cmd
    index_bam(out)
    return bam

In [0]:
dv['samtools'] = samtools
dv['index_bam'] = index_bam
dv['get_mapped'] = get_mapped

In [0]:
%px cd /gpfs_fs/home/eckertlab/Mitra/mapping/split_parallel/collapsed

In [0]:
get_mapped(bams[0]).r

In [0]:
bams[0]

In [0]:
jobs = []
for b in bams:
    jobs.append(get_mapped(b))

In [0]:
np.sum([x.ready() for x in jobs])

In [0]:
mapped = !ls *mapped.bam

In [0]:
len(mapped)

In [0]:
with open("bams.txt", "w") as o:
    for b in mapped:
        o.write("%s\n" % b)

In [0]:
def add_rg_info_to_bam(bam):
    import os
    lane_map = {"I1": 1, "I3":2}
    cmd = "java -jar /home/cfriedline/gpfs/src/picard-tools-1.112/AddOrReplaceReadGroups.jar"
    base = os.path.basename(bam).split(".")
    bam_rg = bam.replace(".bam", "_rg.bam")
    rglb = os.path.basename(os.path.dirname(os.path.dirname(bam)))
    rgpu = base[0].split("_")[-1]
    rgsm = base[0]
    rgid = "FLOWCELL.LANE%d.%s" % (lane_map[rglb],rgsm)
    rg_string = "RGID=%s RGLB=%s RGPL=illumina RGPU=%s RGSM=%s" % (rgid,
                                                                   rglb,
                                                                   rgpu,
                                                                   rgsm)
    cmd = "%s INPUT=%s OUTPUT=%s %s CREATE_INDEX=true" % (cmd,
                                                          bam,
                                                          bam_rg,
                                                          rg_string)
#     if not os.path.exists(bam_rg):
    !$cmd
    return bam_rg, rg_string, cmd