In [1]:
# Import the libraries we will need
import subprocess  # The library that will actually be talking to the shell and
                   # tell it to what to run and when.
from itertools import islice  # Important tool that will allow us to split up our
                              # commands for each output.
import random  # This will determine how long each process will take.
random.seed(1)  # Feel free to change this, but useful in the notebook so the author can explain
                # the output even if the output is 'random'

In [2]:
# Set the number of threads
threads = 5  # Of the 200 commands, five will be running at any one time.

# We need to have 5 separate output files to stop each running command from 
# over writing the work of a simultaneous command.
output_files = ["output.file.%d" % i for i in range(0, threads)]  #output.file.0 to output.file.4

file_handlers = [None]*threads  # Generates a list of NULL variables of length 5.

# This assigns the file handler for each file.
for index, output_file in enumerate(output_files):
    file_handlers[index] = open(output_file, 'w')

for handler in file_handlers:  # Print the file handler so we know what they look like.
    print (handler)



<_io.TextIOWrapper name='output.file.0' mode='w' encoding='UTF-8'>
<_io.TextIOWrapper name='output.file.1' mode='w' encoding='UTF-8'>
<_io.TextIOWrapper name='output.file.2' mode='w' encoding='UTF-8'>
<_io.TextIOWrapper name='output.file.3' mode='w' encoding='UTF-8'>
<_io.TextIOWrapper name='output.file.4' mode='w' encoding='UTF-8'>


In [351]:
random_number_list = [random.uniform(0.1,1) for i in range(0,10)]
commands = ["sleep %d && echo Command number - %d. Slept for %f." % (j, i, j)
            for i, j in enumerate(random_number_list)]

In [None]:
processes = (subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            for cmd in commands)

In [355]:
processes = (cmd for cmd in commands)
running_processes = list(islice(processes, threads))

In [357]:
for i, process in enumerate(running_processes):
    print(process)
    print(i)
    running_processes[i] = next(processes, None) #this line moves along the content of the running_processes list one at a time 
    #pulling 
    print('what is running_processes[i]? it is %s' % (running_processes[i]))
    #running_processes[i] = next(processes, None)


sleep 0 && echo Command number - 5. Slept for 0.111144.
0
what is running_processes[i]? it is None
sleep 0 && echo Command number - 6. Slept for 0.703370.
1
what is running_processes[i]? it is None
sleep 0 && echo Command number - 7. Slept for 0.182515.
2
what is running_processes[i]? it is None
sleep 0 && echo Command number - 8. Slept for 0.203592.
3
what is running_processes[i]? it is None
sleep 0 && echo Command number - 9. Slept for 0.896554.
4
what is running_processes[i]? it is None


In [330]:
running_processes = islice(processes, threads)

In [None]:
running_processes[1]

In [None]:
while running_processes:
    for i, process in enumerate(running_processes):
        #print(i)
        if process.poll() is not None:  # Means that the process is complete!
            stdout, stderr = process.communicate()  # Get the output of the completed process
            file_handlers[i].write(str(stdout) + "\n")  # Write the output to handler that
            running_processes[i] = next(processes, None)
            # Run the next number in the list.
            if running_processes[i] is None:  # No more commands waiting to be processed.
                del running_processes[i]  # Not a valid process.
                break

In [None]:

# By closing the file_handler this prints everything accumulated in the handler to the file.
for handler in file_handlers:
    handler.close()



In [None]:
# Now let's have a look at the first few lines of each each file.
number_of_lines = 3
for output_file in output_files:
    with open(output_file) as output_handler:
        head = list(islice(output_handler, number_of_lines))
    print ("### " + output_file + " ###")
    print (head)

## Set playing around with multiprocessing using the TE files

In [2]:
%matplotlib inline
import pandas as pd
import os
import re
from Bio import SeqIO
import pysam
from Bio.SeqRecord import SeqRecord
from Bio.Seq import Seq
from Bio import SearchIO
from pybedtools import BedTool
import numpy as np
import pybedtools
import multiprocessing
import re
import time
import matplotlib.pyplot as plt

In [38]:
genome = 'Pst_104E_v12_p_ctg'
p_genome_file = 'Pst_104E_v12_p_ctg.genome_file'

In [9]:
out_dir = '/home/benjamin/genome_assembly/PST79/FALCON/p_assemblies/v9_1/Pst_104E_v12/TE_analysis'
TE_post_analysis_p = '/home/benjamin/genome_assembly/PST79/FALCON/p_assemblies/v9_1/REPET/Pst79_p/Pst79_p_full_annotate/postanalysis'
TE_post_analysis_p_header = 'TE      length  covg    frags   fullLgthFrags   copies  fullLgthCopies  meanId  sdId    minId   q25Id   medId   q75Id   maxId   meanLgth        sdLgth  minLgth q25Lgth medLgth q75Lgth maxLgth meanLgthPerc    sdLgthPerc      minLgthPerc  q25LgthPerc     medLgthPerc     q75LgthPerc     maxLgthPerc'.split(' ')
TE_post_analysis_p_header = [x for x in TE_post_analysis_p_header if x != '']

In [5]:
#generate the directory structure to safe specific coverage files
os.chdir(out_dir)
TE_types = ['Retrotransposon', 'DNA_transposon', 'noCat', 'SSR']
TE_path = [os.path.join(out_dir, x) for x in TE_types]
TE_path_dict = dict(zip(TE_types, TE_path))
for TE_type in TE_types:
    new_path = os.path.join(out_dir, TE_type)
    if not os.path.exists(new_path):
        os.mkdir(new_path)

In [10]:
#this needs to be fixed up to pick the proper summary table
p_repet_summary_df = pd.read_csv(TE_post_analysis_p+'/'+'Pst79p_anno_chr_allTEs_nr_noSSR_join_path.annotStatsPerTE.tab' ,\
                                names = TE_post_analysis_p_header, header=None, sep='\t', skiprows=1 )

#check if I can filter the tab files for removing all TEs that are on the 2000 plus contigs
#remove tRNAs TEs with infernal

p_repet_summary_df['Code'] = p_repet_summary_df['TE'].apply(lambda x: x.split('_')[0])

code_keys = p_repet_summary_df['Code'].unique()

code_keys.sort()

code_long = ['DNA_transposon Helitron', 'DNA_transposon Helitron', 'DNA_transposon Helitron', 'DNA_transposon Maverick',\
            'DNA_transposon TIR', 'DNA_transposon TIR', 'DNA_transposon TIR', 'DNA_transposon TIR', 'DNA_transposon noCat',\
             'DNA_transposon MITE','DNA_transposon MITE', 'Potential Host Gene', 'Retrotransposon LINE', 'Retrotransposon LINE',\
             'Retrotransposon LINE','Retrotransposon LTR','Retrotransposon LTR', 'Retrotransposon LTR', 'Retrotransposon LTR', 'Retrotransposon PLE', \
             'Retrotransposon SINE',  'Retrotransposon SINE', 'Retrotransposon noCat', 'Retrotransposon LARD',\
             'Retrotransposon LARD', 'Retrotransposon TRIM', 'Retrotransposon TRIM', 'Retrotransposon noCat',  \
             'Retrotransposon DIRS','Retrotransposon DIRS','Retrotransposon DIRS','Retrotransposon DIRS',\
             'noCat', 'noCat']

code_dict = dict(zip(code_keys, code_long))

In [32]:
REPET_ID_df = pd.read_csv(out_dir+'/'+genome+'.REPET.ID_column.gff', header=None, sep='\t')
_id = pd.read_csv(out_dir+'/'+genome+'.REPET.ID_column.gff', header=None, sep='\t')[8].unique()
REPET_ID_bed = pybedtools.BedTool(out_dir+'/'+genome+'.REPET.ID_column.gff')
REPET_ID_bed = REPET_ID_bed.remove_invalid().saveas(out_dir+'/'+genome+'.REPET.ID_column.bedobject')
print(len(_id))

48659


In [25]:
len(REPET_ID_df[REPET_ID_df[8].str.contains("MCL")][8].unique())

1494

In [35]:
# Next, we create a function to pass only features for a particular
# featuretype.  This is similar to a "grep" operation when applied to every
# feature in a BedTool
def id_filter(feature, _id):
    if feature[8] == _id:
        return True
    return False

In [39]:
# subset the id and safe in specific folder
# return the subsetted file as bedtool
def subset_id_m(_id_list, _dict):
    repet_prefix = 'Pst_104E_v12_p_ctg.REPET.TE_level'
    for _id in _id_list:
    
        #ClassI are retrotransposon form blast
        if 'ClassI:' in _id:
            out_path = TE_path_dict['Retrotransposon']   
        #ClassII are DNA_transponson
        elif 'ClassII' in _id:
            out_path = TE_path_dict['DNA_transposon'] 
        #The rest with '_' should be REPET_TEs
        elif _id.split('_')[0] in list(code_dict.keys()):
            key = code_dict[_id.split('_')[0]].split(' ')[0]
            out_path = TE_path_dict[key]
        #everything without '_' at the end should be SSR
        elif '_' not in _id:
            out_path = TE_path_dict['SSR']
        out_fn = out_path+'/'+repet_prefix+'.'+_id+'.gff'
        result = REPET_ID_bed.filter(id_filter, _id).saveas(out_fn)
        cov_fn = out_fn.replace('gff','cov')
        cov = result.genome_coverage(dz=True,g=p_genome_file)
        cov.saveas(cov_fn)
        _len = len(pd.read_csv(cov_fn, header=None, sep='\t'))
        _dict[_id] = _len
    #return pybedtools.BedTool(result.fn)

In [40]:
#use multiprocessing to do the bedcov genome coverage per classification. Keep track if everything is already done.
import math
nproces = 30 #number of processors
chunksize = int(math.ceil(len(_id) / float(nproces)))
jobs = []
job_count = 0
manager = multiprocessing.Manager()
TE_cov_df  = manager.dict()
for i in range(nproces):
        p = multiprocessing.Process(target=subset_id_m, args=(_id[chunksize * i:chunksize * (i + 1)],TE_cov_df))
        jobs.append(p)
        p.start()
        job_count += 1
while set([j.is_alive() for j in jobs]) != {False}:
    running_process = [j.is_alive() for j in jobs].count(True)
    time.sleep(15)
    print("Waiting for Subset_TE_level classificatio to finish!"% running_process)
print("All Subset_id_classifications done! Totaling %i"% (job_count))

TypeError: not all arguments converted during string formatting

In [None]:
while set([j.is_alive() for j in jobs]) != {False}:
    running_process = [j.is_alive() for j in jobs].count(True)
    time.sleep(15)
    print("Waiting for Subset_TE_level classificatio to finish! %i jobs still running"% running_process)
print("All Subset_id_classifications done! Totaling %i"% (job_count))

Waiting for Subset_TE_level classificatio to finish! 30 jobs still running
Waiting for Subset_TE_level classificatio to finish! 30 jobs still running
Waiting for Subset_TE_level classificatio to finish! 30 jobs still running
Waiting for Subset_TE_level classificatio to finish! 30 jobs still running
Waiting for Subset_TE_level classificatio to finish! 30 jobs still running
Waiting for Subset_TE_level classificatio to finish! 30 jobs still running
Waiting for Subset_TE_level classificatio to finish! 30 jobs still running
Waiting for Subset_TE_level classificatio to finish! 30 jobs still running
Waiting for Subset_TE_level classificatio to finish! 30 jobs still running
Waiting for Subset_TE_level classificatio to finish! 30 jobs still running
Waiting for Subset_TE_level classificatio to finish! 30 jobs still running
Waiting for Subset_TE_level classificatio to finish! 30 jobs still running
Waiting for Subset_TE_level classificatio to finish! 30 jobs still running
Waiting for Subset_TE_lev