Exploring multiprocessing to get things really going here.

In [11]:
import multiprocessing
import functools
import time
import parmap
#time.sleep(5) # delays for 5 seconds

In [13]:
def f(obj, outdir):
    i = obj[0]
    j = obj[1]
    out = "{i}:{j}:{outdir}".format(**locals())
    print(out)
    time.sleep(2)
    return out

In [5]:
mglist = ['a', 'b', 'c', 'd']
saglist = ['e', 'f', 'g', 'h']

In [51]:
parmap.map(f, mglist, saglist)

a:['e', 'f', 'g', 'h']
d:['e', 'f', 'g', 'h']
b:['e', 'f', 'g', 'h']
c:['e', 'f', 'g', 'h']


["a:['e', 'f', 'g', 'h']",
 "b:['e', 'f', 'g', 'h']",
 "c:['e', 'f', 'g', 'h']",
 "d:['e', 'f', 'g', 'h']"]

In [6]:
pairs = []
for m in mglist:
    for s in saglist:
        pairs.append((m, s))
pairs

[('a', 'e'),
 ('a', 'f'),
 ('a', 'g'),
 ('a', 'h'),
 ('b', 'e'),
 ('b', 'f'),
 ('b', 'g'),
 ('b', 'h'),
 ('c', 'e'),
 ('c', 'f'),
 ('c', 'g'),
 ('c', 'h'),
 ('d', 'e'),
 ('d', 'f'),
 ('d', 'g'),
 ('d', 'h')]

In [18]:
cmds = []

for m in mglist:
    for s in saglist:
        cmd = "{m}**{s}".format(**locals())
        cmds.append(cmd)
print(cmds)

['a**e', 'a**f', 'a**g', 'a**h', 'b**e', 'b**f', 'b**g', 'b**h', 'c**e', 'c**f', 'c**g', 'c**h', 'd**e', 'd**f', 'd**g', 'd**h']


In [9]:
p = multiprocessing.Pool(8)

In [2]:
def split_threads_by_runs(threads, runs):
    p = multiprocessing.Pool(processes=threads)
    cores = threads//runs
    if cores == 0:
        raise IOError("number of threads must be greater than or equal to the number of runs")
    return p, cores

def bed_cov_it(fqref, outdir, pctid, cores, cleanup, pe):
    '''This function takes a fastq file and a reference, entered in the first input as a tuple, 
    1. aligns them using bwa
    2. filters them based on pctid alignment threshold
    3. creates a counts file of how many reads passed the filter
    4. creates a genome coverage table using bedtools
    5. returns a tuple of the coverage file and the count file
    Args:
        fqref: tuple as (path to metagenome fastq, path to reference SAG)
        outdir (str): output directory
        pctid (int): percent identity of aligned reads to keep
        cores (int): # threads to use 
        cleanup (boolean): if true, delete output bam and bai file after running
        pe (boolean): if true, reads are paired 
    Output:
        bedtools coverage file, alignment count file
    '''
    fastq = fqref[0]
    reference = fqref[1]
    fqpre = "_".join(op.basename(fastq).split(".")[:-1])
    ref_pre = "_".join(op.basename(reference).split(".")[:-1])
    outbam = op.join(os.path.abspath(outdir), fqpre+"_vs_"+ref_pre+".bam")
    
    if pe:
        bam = bwa_mem(fastq, outbam, reference, options='-p', cores=cores)
    else:
        bam = bwa_mem(fastq, outbam, reference, options=None, cores=cores)             # run bwa mem 
    bam, goodcount = filter_bam(bam, bam.replace(".bam", "_{pctid}.bam".format(**locals())), pctid=pctid)   # filter aligned reads based on pctid
    bed = get_coverage(bam)                         # create per base coverage table
    print("coverage_table_created, called:", bed)
    if cleanup:
        #idx_files = [reference + x for x in ['.amb', '.ann', '.bwt', '.pac', '.sa']]
        #for f in idx_files+[bam, bam+".bai"]:
        for f in [bam, bam+".bai"]:
            os.remove(f)
    return bed

def run_bed_cov_mp(fqref_list, outdir, pctid, total_cores, cleanup, pe):
    p, core_pr = split_threads_by_runs(total_cores, 3) # set up pool and number of cores per run
    rbc = partial(bed_cov_it, outdir=outdir, pctid=pctid, 
                  core_pr=core_pr, cleanup=True)
    for i in range(0, len(fqref_list), 3):
        results += p.map(rbc, fqref[i:i+3])
    p.close()
    
    return results

### Dummy run on a similar and less complicated function...

In [34]:
def f(obj, outdir):
    i = obj[0]
    j = obj[1]
    out = "{i}:{j}:{outdir}".format(**locals())
    print(out)
    time.sleep(3)
    return out

In [35]:
def split_threads_by_runs(threads, runs):
    p = multiprocessing.Pool(processes=threads)
    cores = threads//runs
    if cores == 0:
        raise IOError("number of threads must be greater than or equal to the number of runs")
    return p, cores

In [40]:
def run_f_mp(objlist, outdir, total_cores):
    reslist = []
    p, core_pr = split_threads_by_runs(total_cores, 3)
    funk = functools.partial(f, outdir=outdir)
    for i in range(0, len(objlist), 3):
        out = p.map(funk, objlist[i:i+3])
        reslist += out
    p.close()
    p.join()
    return reslist, out

In [41]:
yea, yah = run_f_mp(pairs, "out!", 8)

a:g:out!
a:f:out!
a:e:out!
b:f:out!
a:h:out!
b:e:out!
c:e:out!
b:g:out!
b:h:out!
c:f:out!
c:h:out!
c:g:out!
d:g:out!
d:e:out!
d:f:out!
d:h:out!


In [42]:
yea

['a:e:out!',
 'a:f:out!',
 'a:g:out!',
 'a:h:out!',
 'b:e:out!',
 'b:f:out!',
 'b:g:out!',
 'b:h:out!',
 'c:e:out!',
 'c:f:out!',
 'c:g:out!',
 'c:h:out!',
 'd:e:out!',
 'd:f:out!',
 'd:g:out!',
 'd:h:out!']

In [43]:
yah

['d:h:out!']

In [1]:
!scp charlie:/mnt/stepanauskas_nfs/julia/*.txt ../data

allsakinawsags.txt                            100% 2032     2.0KB/s   00:00    
firstlist.txt                                 100%  305     0.3KB/s   00:00    
mgs.txt                                       100%  181     0.2KB/s   00:00    
thenthese.txt                                 100% 1673     1.6KB/s   00:00    


In [6]:
loi =  [i for i in open("../data/allsakinawsags.txt").read().split("\n") if len(i)>0]


In [7]:
loi

['/mnt/stepanauskas_nfs/julia/Test_FragRecruitment/Sakinaw/Hallam_SAGs_AAA255/Masked_genomes_AAA255/AAA255A6_96912.fasta',
 '/mnt/stepanauskas_nfs/julia/Test_FragRecruitment/Sakinaw/Hallam_SAGs_AAA255/Masked_genomes_AAA255/AAA255B16_97682.fasta',
 '/mnt/stepanauskas_nfs/julia/Test_FragRecruitment/Sakinaw/Hallam_SAGs_AAA255/Masked_genomes_AAA255/AAA255C20_98068.fasta',
 '/mnt/stepanauskas_nfs/julia/Test_FragRecruitment/Sakinaw/Hallam_SAGs_AAA255/Masked_genomes_AAA255/AAA255E10_95567.fasta',
 '/mnt/stepanauskas_nfs/julia/Test_FragRecruitment/Sakinaw/Hallam_SAGs_AAA255/Masked_genomes_AAA255/AAA255E4_97381.fasta',
 '/mnt/stepanauskas_nfs/julia/Test_FragRecruitment/Sakinaw/Hallam_SAGs_AAA255/Masked_genomes_AAA255/AAA255E5_97489.fasta',
 '/mnt/stepanauskas_nfs/julia/Test_FragRecruitment/Sakinaw/Hallam_SAGs_AAA255/Masked_genomes_AAA255/AAA255F10_96652.fasta',
 '/mnt/stepanauskas_nfs/julia/Test_FragRecruitment/Sakinaw/Hallam_SAGs_AAA255/Masked_genomes_AAA255/AAA255F6_96485.fasta',
 '/mnt/stepa