In [7]:
# imports
import AmpliPy
import pysam
import multiprocessing

MAIN_EXAMPLE_INPUT_FILE = "./example/example_untrimmed_sorted.bam"
data = []

In [8]:
# counts total reads we have
import AmpliPy
in_align,_ = AmpliPy.create_AlignmentFile_objects(MAIN_EXAMPLE_INPUT_FILE)
count = 0
for read in in_align:
	count += 1

print("total reads", str(count)) # last count was 201600

total reads 201600


In [15]:
# simple function to take time
import time
from random import randrange

# Test function for testing IO & read manipulation
# Attempts to simulate *some* runtime, so it's less instant
def old_func_to_run(read):
	i = 0
	for c in read.query_sequence:
		if c == 'A':
			i += 1
		elif c == 'C':
			i += 2
		elif c == 'G':
			i += 3
		elif c == 'T':
			i += 4
	type = i % 4
	out = ""
	
	for i in range(len(read.query_sequence)):
		if type == 0:
			char = "A"
		elif type == 1:
			char = "C"
		elif type == 2:
			char = "G"
		elif type == 3:
			char = "T"
		out += char
		type = (type + randrange(0, 4)) % 4
	read.query_sequence = out

def func_to_run(x):
	# The rough estimate of runtime that the real algorithm takes
	time.sleep(.0001)

In [36]:
# simple test to check time for a linear approach
new_output_file = "test.linear.bam"
if os.path.exists(new_output_file):
	os.remove(new_output_file)
inter_align,outer_align = AmpliPy.create_AlignmentFile_objects(MAIN_EXAMPLE_INPUT_FILE, new_output_file)

linear_start = time.time()
for read in inter_align:
	func_to_run(read)
	outer_align.write(read)
linear_end = time.time()

linear_time = linear_end - linear_start

print("total time for linear approach: ", str(linear_time)) # took 50.36s on last run
data = []
data.append(["linear", -1, linear_time])

total time for linear approach:  39.00561714172363


In [51]:
# simple multiprocessing test
import multiprocessing as mp
multiprocessed_output_file = "test.multi.bam"
if os.path.exists(multiprocessed_output_file):
	os.remove(multiprocessed_output_file)
inter_align,outer_align = AmpliPy.create_AlignmentFile_objects(MAIN_EXAMPLE_INPUT_FILE, multiprocessed_output_file)

# Maximum Queue size
MAX_QUEUE = 1024
input_queue = mp.JoinableQueue(MAX_QUEUE)
output_queue = mp.JoinableQueue(MAX_QUEUE)

# Constant to signal the end of the Queue to workers
SIGNAL_QUEUE_END = -1
# The number of processes to test
NUM_PROCESSES = 16

# Worker logic
def worker(test):
	while True:
		read = input_queue.get()
		if (read == SIGNAL_QUEUE_END):
			test.value = mp.current_process().pid
			input_queue.task_done()
			break

		read = pysam.AlignedSegment.fromstring(read, header=inter_align.header)
		func_to_run(read)
		output_queue.put(read.to_string())
		input_queue.task_done()

# Writer logic	
def writer():
	while True:
		read = output_queue.get()
		if (read == SIGNAL_QUEUE_END):
			output_queue.task_done()
			break

		read = pysam.AlignedSegment.fromstring(read, header=inter_align.header)
		outer_align.write(read)
		output_queue.task_done()
	# Close the output file when done, so it's actually written to disk properly
	outer_align.close()

if __name__ == '__main__':
	multi_start = time.time()
	
	worker_processes = NUM_PROCESSES-2
	processes = []
	test = []
	for _ in range(worker_processes):
		v = mp.Value("I", 0)
		p = mp.Process(target=worker, args=(v,))
		p.start()
		processes.append(p)
		test.append(v)

	writer = mp.Process(target=writer)
	writer.start()

	print("STARTING READING")

	for read in inter_align:
		print("in: %i, out: %i"%(input_queue.qsize(), output_queue.qsize()))
		input_queue.put(read.to_string())

	print("FINISHED READING ALL INPUT")

	# Signal on the queue that we are done getting input
	for _ in range(worker_processes):
		input_queue.put(SIGNAL_QUEUE_END)
		
	# Join the queue, so we wait until all processors are done
	input_queue.close()
	input_queue.join()

	print("FINISHED PROCESSING ALL INPUT")
	
	# Should be redundant, but join each processess too, so it can be closed
	for p in processes:
		p.join()

	# Now that processing is done, signal the output is done and join it
	output_queue.put(SIGNAL_QUEUE_END)
	output_queue.close()
	output_queue.join()

	print("FINISHED OUTPUTTING ALL READS")

	multi_end = time.time()
	multi_time = multi_end - multi_start
	data.append([NUM_PROCESSES, MAX_QUEUE, multi_time])

STARTING READING
in: 0, out: 0
in: 1, out: 0
in: 2, out: 0
in: 1, out: 0
in: 2, out: 0
in: 3, out: 1
in: 4, out: 0
in: 4, out: 0
in: 3, out: 1
in: 3, out: 1
in: 3, out: 1
in: 4, out: 2
in: 4, out: 3
in: 5, out: 3
in: 6, out: 3
in: 7, out: 2
in: 7, out: 2
in: 8, out: 2
in: 9, out: 2
in: 10, out: 2
in: 7, out: 5
in: 8, out: 5
in: 9, out: 5
in: 10, out: 5
in: 10, out: 6
in: 7, out: 9
in: 7, out: 9
in: 8, out: 3
in: 8, out: 0
in: 9, out: 1
in: 10, out: 1
in: 10, out: 0
in: 11, out: 0
in: 10, out: 0
in: 11, out: 1
in: 11, out: 0
in: 5, out: 5
in: 6, out: 4
in: 7, out: 0
in: 8, out: 0
in: 1, out: 0
in: 2, out: 0
in: 3, out: 0
in: 4, out: 0
in: 1, out: 0
in: 2, out: 1
in: 2, out: 0
in: 1, out: 1
in: 1, out: 0
in: 1, out: 0
in: 2, out: 1
in: 3, out: 0
in: 4, out: 1
in: 4, out: 0
in: 5, out: 1
in: 5, out: 0
in: 6, out: 0
in: 6, out: 1
in: 7, out: 1
in: 8, out: 0
in: 3, out: 3
in: 1, out: 4
in: 2, out: 7
in: 1, out: 0
in: 2, out: 1
in: 1, out: 0
in: 2, out: 0
in: 3, out: 1
in: 4, out: 0
in: 1, o

KeyboardInterrupt: 

In [49]:
import pandas as pd
pd.DataFrame(data, columns=["Processes", "Max Queue", "Runtime"])

Unnamed: 0,Processes,Max Queue,Runtime
0,linear,-1,39.005617
1,16,1,44.050176
2,16,2,20.717345
3,16,4,15.95957
4,16,8,15.018509
5,16,16,13.126452
6,16,32,14.561757
7,16,64,13.815433
8,16,128,13.483323
9,16,256,9.883868
