Skip to content

Commit

Permalink
Merge pull request #13 from bmds-lab/dev/fix-max-open-files-offtarget…
Browse files Browse the repository at this point in the history
…-extraction

bug: when merging the sorted off-target sites files an OS error could…
  • Loading branch information
SystemsResearch committed Mar 27, 2023
2 parents b00de36 + 4ad77dd commit bb50af9
Showing 1 changed file with 85 additions and 55 deletions.
140 changes: 85 additions & 55 deletions src/crackling/utils/extractOfftargets.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,14 @@
'''

import glob, multiprocessing, os, re, shutil, string, sys, tempfile, heapq
import glob, multiprocessing, os, re, shutil, sys, tempfile, heapq, argparse
from crackling.Helpers import *
from crackling.Paginator import Paginator

# Defining the patterns used to detect sequences
pattern_forward_offsite = r"(?=([ACG][ACGT]{19}[ACGT][AG]G))"
pattern_reverse_offsite = r"(?=(C[CT][ACGT][ACGT]{19}[TGC]))"

# The off-target sites need to be sorted so the ISSL index is space-optimised.
# In some cases, there are many files to sort, we can paginate the files.
# How many files should we sort per page?
# This is dictated by the number of arguments that we can pass to `sort`
# Default: 50
SORT_PAGE_SIZE = 500

# Set the number of processes to generate.
# This sets the --threads argument for `sort`, and
# the process count for multiprocessing.Map(..)
# Default: os.cpu_count()
PROCESSES_COUNT = os.cpu_count()

def explodeMultiFastaFile(fpInput, fpOutputTempDir):
newFilesPaths = []

Expand Down Expand Up @@ -103,7 +90,6 @@ def processingNode(fpInputs, fpOutputTempDir = None):
seqsByHeader[header].append(line.rstrip().upper())

# For each FASTA sequence
lineNumber = 0
offtargets = []
for header in seqsByHeader:
seq = ''.join(seqsByHeader[header])
Expand All @@ -118,10 +104,9 @@ def processingNode(fpInputs, fpOutputTempDir = None):
offtargets.append(
seqModifier(match_chr[i][0:20])
)

lineNumber += 1

outFile.write(''.join(f'{offTarget}\n' for offTarget in offtargets))
return len(offtargets)

# Node function that sorts a file for multiprocessing pool
def sortingNode(fileToSort, sortedTempDir):
Expand All @@ -133,16 +118,16 @@ def sortingNode(fileToSort, sortedTempDir):
)
# Sort input file and store in new output dir
with open(fileToSort, 'r') as input:
# Read 'page'
page = input.readlines()
# Sort Page
page.sort()
# Write sorted page to file
sortedFile.writelines(page)
# Close sorted file
sortedFile.close()

def paginatedSort(filesToSort, fpOutput, mpPool):
# Read 'page'
page = input.readlines()
# Sort Page
page.sort()
# Write sorted page to file
sortedFile.writelines(page)
# Close sorted file
sortedFile.close()

def paginatedSort(filesToSort, fpOutput, mpPool, maxNumOpenFiles=400):
# Create temp file directory
sortedTempDir = tempfile.TemporaryDirectory()
printer(f'Created temp directory {sortedTempDir.name} for sorting')
Expand All @@ -161,6 +146,8 @@ def paginatedSort(filesToSort, fpOutput, mpPool):
args
)

printer('Sorting of each file completed')

# Collect sorted files to merge
sortedFiles = glob.glob(
os.path.join(
Expand All @@ -170,20 +157,43 @@ def paginatedSort(filesToSort, fpOutput, mpPool):
)

# Open all the sorted files to merge
sortedFilesPointers = [open(file, 'r') for file in sortedFiles]

# Merge all the sorted files
with open(fpOutput, 'w') as f:
f.writelines(heapq.merge(*sortedFilesPointers))
printer(f'Beginning to merge sorted files, {maxNumOpenFiles:,} at a time')
while len(sortedFiles) > 1:
# A file to write the merged sequences to
mergedFile = tempfile.NamedTemporaryFile(delete = False)

# Select the files to merge
while True:
try:
sortedFilesPointers = [open(file, 'r') for file in sortedFiles[:maxNumOpenFiles]]
break
except OSError as e:
if e.errno == 24:
printer(f'Attempted to open too many files at once (OSError errno 24)')
maxNumOpenFiles = max(1, int(maxNumOpenFiles / 2))
printer(f'Reducing the number of files that can be opened by half to {maxNumOpenFiles}')
continue
raise e

printer(f'Merging {len(sortedFilesPointers):,}')

# Merge and write
with open(mergedFile.name, 'w') as f:
f.writelines(heapq.merge(*sortedFilesPointers))

# Close all of the open files
for file in sortedFilesPointers:
file.close()

# prepare for the next set to be merged
sortedFiles = sortedFiles[maxNumOpenFiles:] + [mergedFile.name]

# Close all of the sorted files
for file in sortedFilesPointers:
file.close()
shutil.move(mergedFile.name, fpOutput)

def startMultiprocessing(fpInputs, fpOutput, mpPool):
def startMultiprocessing(fpInputs, fpOutput, mpPool, numThreads, maxOpenFiles):
printer('Extracting off-targets using multiprocessing approach')

printer(f'Allowed processes: {PROCESSES_COUNT}')
printer(f'Allowed processes: {numThreads}')

fpTempDir = tempfile.TemporaryDirectory()
printer(f'Created a temporary directory for intermediate files: {fpTempDir.name}')
Expand Down Expand Up @@ -218,17 +228,16 @@ def startMultiprocessing(fpInputs, fpOutput, mpPool):
) for fpInput in fpInputs
]

printer(f'Beginning to process {len(args)} files...')
printer(f'Beginning to process {len(args)} files')

mpPool.starmap(
numTargets = mpPool.starmap(
processingNode,
args
)

printer('Processing completed')
printer(f'Processing completed. Found {sum(numTargets):,} targets.')

printer('Preparing for ISSL by sorting all intermediate files')
printer('Then, writing to user-specified output file')

# sort in batches
paginatedSort(
Expand All @@ -239,30 +248,51 @@ def startMultiprocessing(fpInputs, fpOutput, mpPool):
)
),
fpOutput,
mpPool
mpPool,
maxNumOpenFiles=maxOpenFiles
)

def main():
if (len(sys.argv) < 3):
print('Error!')
print('Expecting: ExtractOfftargets.py <output-file> [<input-file-1> <input-file-2> <input-file-n> | <input-dir>]')
print('\n')
exit()
parser = argparse.ArgumentParser(description='Extract CRISPR target sites for Crackling.')

parser.add_argument('output',
help='A file to write the off-targets to.'
)
parser.add_argument('inputs',
help='A space-separated list of paths or a path containing wildcards to FASTA files.',
nargs='+'
)
parser.add_argument('--maxOpenFiles',
help='The number of files allowed to be opened by a single process. Use `ulimit -n` to find out your system setting.',
type=int,
default=1000,
required=False
)
parser.add_argument('--threads',
help='The number of threads to use. Default is `os.cpu_count()`.',
type=int,
default=os.cpu_count(),
required=False
)

args = parser.parse_args()

# Create multiprocessing pool
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap
mpPool = multiprocessing.Pool(PROCESSES_COUNT)

fpOutput = sys.argv[1]

fpInputs = sys.argv[2:]

startMultiprocessing(fpInputs, fpOutput, mpPool)
mpPool = multiprocessing.Pool(args.threads)

startMultiprocessing(
args.inputs,
args.output,
mpPool,
args.threads,
args.maxOpenFiles
)

# Clean up. Close multiprocessing pool
mpPool.close()

printer('Goodbye.')

if __name__ == '__main__':
main()
main()

0 comments on commit bb50af9

Please sign in to comment.