diff --git a/pull_into_place/big_jobs.py b/pull_into_place/big_jobs.py index 989c6af..8759be6 100644 --- a/pull_into_place/big_jobs.py +++ b/pull_into_place/big_jobs.py @@ -1,6 +1,7 @@ #!/usr/bin/env python2 -import sys, os, re, json, time, subprocess +import sys, os, re, json, subprocess, gzip +from klab.process import tee from . import pipeline def submit(script, workspace, **params): @@ -22,10 +23,9 @@ def submit(script, workspace, **params): if test_run: nstruct = 50 - max_runtime = '2:00:00' if nstruct is None: - raise TypeError("sumbit() requires the keyword argument 'nstruct' for production runs.") + raise TypeError("submit() requires the keyword argument 'nstruct' for production runs.") # Submit the job and put it immediately into the hold state. @@ -50,7 +50,7 @@ def submit(script, workspace, **params): job_id = status_match.group(1) - with open(workspace.job_params_path(job_id), 'w') as file: + with open(workspace.job_info_path(job_id), 'w') as file: json.dump(params, file) # Release the hold on the job. @@ -61,20 +61,105 @@ def submit(script, workspace, **params): def initiate(): """Return some relevant information about the currently running job.""" + print_debug_header() + workspace = pipeline.workspace_from_dir(sys.argv[1]) workspace.cd_to_root() - job_id = int(os.environ['JOB_ID']) - task_id = int(os.environ['SGE_TASK_ID']) - 1 - job_params = read_params(workspace.job_params_path(job_id)) + job_info = read_job_info(workspace.job_info_path(os.environ['JOB_ID'])) + job_info['job_id'] = int(os.environ['JOB_ID']) + job_info['task_id'] = int(os.environ['SGE_TASK_ID']) - 1 + + return workspace, job_info + +def debrief(): + """ + Report the amount of memory used by this job, among other things. + """ + job_number = os.environ['JOB_ID'] + '.' + os.environ['SGE_TASK_ID'] + run_command(['/usr/local/sge/bin/linux-x64/qstat', '-j', job_number]) + +def run_rosetta(workspace, job_info, + use_resfile=False, use_restraints=False, use_fragments=False): + + test_run = job_info.get('test_run', False) + rosetta_cmd = [ + workspace.rosetta_scripts_path, + '-database', workspace.rosetta_database_path, + '-in:file:s', workspace.input_path(job_info), + '-in:file:native', workspace.input_path(job_info), + '-out:prefix', workspace.output_prefix(job_info), + '-out:suffix', workspace.output_suffix(job_info), + '-out:no_nstruct_label', + '-out:overwrite', + '-out:pdb_gz', + '-out:mute', 'protocols.loops.loops_main', + '-parser:protocol', workspace.protocol_path, + '-parser:script_vars', + 'wts_file=' + workspace.scorefxn_path, + 'cst_file=' + workspace.restraints_path, + 'loop_file=' + workspace.loops_path, + 'loop_start=' + str(workspace.loop_boundaries[0]), + 'loop_end=' + str(workspace.loop_boundaries[1]), + 'outputs_folder=' + workspace.seqprof_dir, + 'design_number=' + workspace.output_basename(job_info), + 'vall_path=' + workspace.rosetta_vall_path(test_run), + 'fragment_weights=' + workspace.fragment_weights_path, + 'fast=' + ('yes' if test_run else 'no'), + ] + if use_resfile: rosetta_cmd += [ + '-packing:resfile', workspace.resfile_path, + ] + if use_restraints: rosetta_cmd += [ + '-constraints:cst_fa_file', workspace.restraints_path, + ] + if use_fragments: rosetta_cmd += \ + workspace.fragments_flags(workspace.input_path(job_info)) + + rosetta_cmd += [ + '@', workspace.flags_path, + ] + + run_command(rosetta_cmd) + run_external_metrics(workspace, job_info) + +def run_external_metrics(workspace, job_info): + pdb_path = workspace.output_path(job_info) + + for metric in workspace.metric_scripts: + command = metric, pdb_path + + print "Working directory:", os.getcwd() + print "Command:", ' '.join(command) + sys.stdout.flush() + + stdout, stderr = tee([metric, pdb_path]) + file = gzip.open(pdb_path, 'a') + + for line in stdout.strip().split('\n'): + if line.strip(): + file.write('EXTRA_METRIC {0}\n'.format(line)) + + file.close() + +def run_command(command): + print "Working directory:", os.getcwd() + print "Command:", ' '.join(command) + sys.stdout.flush() + + process = subprocess.Popen(command) + + print "Process ID:", process.pid + print + sys.stdout.flush() - return workspace, job_id, task_id, job_params + process.wait() -def read_params(params_path): - with open(params_path) as file: +def read_job_info(json_path): + with open(json_path) as file: return json.load(file) -def print_debug_info(): +def print_debug_header(): from datetime import datetime from socket import gethostname @@ -86,17 +171,5 @@ def print_debug_info(): print sys.stdout.flush() -def run_command(command): - print "Working directory:", os.getcwd() - print "Command:", ' '.join(command) - sys.stdout.flush() - process = subprocess.Popen(command) - - print "Process ID:", process.pid - print - sys.stdout.flush() - process.wait() - jobnumber = os.environ['JOB_ID'] + '.' + os.environ['SGE_TASK_ID'] - print 'Job Number:', jobnumber - subprocess.call(['/usr/local/sge/bin/linux-x64/qstat','-j',jobnumber]) + diff --git a/pull_into_place/big_jobs/pip_build.py b/pull_into_place/big_jobs/pip_build.py index 78a5dab..86c1fe6 100755 --- a/pull_into_place/big_jobs/pip_build.py +++ b/pull_into_place/big_jobs/pip_build.py @@ -6,38 +6,13 @@ #$ -l netapp=1G #$ -cwd -import os, sys, subprocess from pull_into_place import big_jobs -workspace, job_id, task_id, parameters = big_jobs.initiate() -output_prefix = '{0}/{1}_{2:06d}_'.format(workspace.output_dir, job_id, task_id) -test_run = parameters.get('test_run', False) - -big_jobs.print_debug_info() -big_jobs.run_command([ - workspace.rosetta_scripts_path, - '-database', workspace.rosetta_database_path, - '-in:file:s', workspace.input_pdb_path, - '-in:file:native', workspace.input_pdb_path, - '-out:prefix', output_prefix, - '-out:no_nstruct_label', - '-out:overwrite', - '-out:pdb_gz', - '-out:mute', 'protocols.loops.loops_main', - '-parser:protocol', workspace.build_script_path, - '-parser:script_vars', - 'wts_file=' + workspace.scorefxn_path, - 'cst_file=' + workspace.restraints_path, - 'loop_file=' + workspace.loops_path, - 'fast=' + ('yes' if test_run else 'no'), - 'loop_start=' + str(workspace.loop_boundaries[0]), - 'loop_end=' + str(workspace.loop_boundaries[1]), - 'outputs_folder=' + workspace.seqprof_dir, - 'design_number=' + '{0}_{1:06d}'.format(job_id,task_id), - 'vall_path=' + (workspace.rosetta_vall_path(test_run)), - 'fragment_weights=' + workspace.fragment_weights_path, - '-packing:resfile', workspace.resfile_path, - '-constraints:cst_fa_file', workspace.restraints_path, -] + workspace.fragments_flags(workspace.input_pdb_path) + [ - '@', workspace.flags_path, -]) +workspace, job_info = big_jobs.initiate() +big_jobs.run_rosetta( + workspace, job_info, + use_resfile=True, + use_restraints=True, + use_fragments=True, +) +big_jobs.debrief() diff --git a/pull_into_place/big_jobs/pip_design.py b/pull_into_place/big_jobs/pip_design.py index 5c223b6..5d579ae 100755 --- a/pull_into_place/big_jobs/pip_design.py +++ b/pull_into_place/big_jobs/pip_design.py @@ -7,38 +7,14 @@ #$ -l h_core=0 #$ -cwd - -import os, sys, subprocess from pull_into_place import big_jobs -workspace, job_id, task_id, parameters = big_jobs.initiate() -test_run = parameters.get('test_run', False) - -bb_models = parameters['inputs'] -bb_model = bb_models[task_id % len(bb_models)] -design_id = task_id // len(bb_models) +workspace, job_info = big_jobs.initiate() -big_jobs.print_debug_info() -big_jobs.run_command([ - workspace.rosetta_scripts_path, - '-database', workspace.rosetta_database_path, - '-in:file:s', workspace.input_path(bb_model), - '-in:file:native', workspace.input_pdb_path, - '-out:prefix', workspace.output_dir + '/', - '-out:suffix', '_{0:03}'.format(design_id), - '-out:no_nstruct_label', - '-out:overwrite', - '-out:pdb_gz', - '-parser:protocol', workspace.design_script_path, - '-parser:script_vars', - 'wts_file=' + workspace.scorefxn_path, - 'cst_file=' + workspace.restraints_path, - 'loop_start=' + str(workspace.loop_boundaries[0]), - 'loop_end=' + str(workspace.loop_boundaries[1]), - 'outputs_folder=' + workspace.seqprof_dir, - 'design_number=' + bb_model + '_{0:03}'.format(design_id), - 'vall_path=' + (workspace.rosetta_vall_path(test_run)), - 'fragment_weights=' + workspace.fragment_weights_path, - '-packing:resfile', workspace.resfile_path, - '@', workspace.flags_path, -]) +# I wasn't able to get PackRotamers to respect any restraints set on the +# command line, so instead the restraints are set in the protocol itself. +big_jobs.run_rosetta( + workspace, job_info, + use_resfile=True, +) +big_jobs.debrief() diff --git a/pull_into_place/big_jobs/pip_validate.py b/pull_into_place/big_jobs/pip_validate.py index b38dc86..d98dcdd 100755 --- a/pull_into_place/big_jobs/pip_validate.py +++ b/pull_into_place/big_jobs/pip_validate.py @@ -6,38 +6,11 @@ #$ -l netapp=1G #$ -cwd -import os, sys, subprocess from pull_into_place import big_jobs -workspace, job_id, task_id, parameters = big_jobs.initiate() - -designs = parameters['inputs'] -design = designs[task_id % len(designs)] -test_run = parameters.get('test_run', False) - -big_jobs.print_debug_info() -big_jobs.run_command([ - workspace.rosetta_scripts_path, - '-database', workspace.rosetta_database_path, - '-in:file:s', workspace.input_path(design), - '-in:file:native', workspace.input_pdb_path, - '-out:prefix', workspace.output_subdir(design) + '/', - '-out:suffix', '_{0:03d}'.format(task_id / len(designs)), - '-out:no_nstruct_label', - '-out:overwrite', - '-out:pdb_gz', - '-out:mute', 'protocols.loops.loops_main', - '-parser:protocol', workspace.validate_script_path, - '-parser:script_vars', - 'wts_file=' + workspace.scorefxn_path, - 'loop_file=' + workspace.loops_path, - 'fast=' + ('yes' if test_run else 'no'), - 'loop_start=' + str(workspace.loop_boundaries[0]), - 'loop_end=' + str(workspace.loop_boundaries[1]), - 'outputs_folder=' + workspace.seqprof_dir, - 'design_number=' + design + '_{0:03d}'.format(task_id / len(designs)), - 'vall_path=' + workspace.rosetta_vall_path(test_run), - 'fragment_weights=' + workspace.fragment_weights_path, -] + workspace.fragments_flags(design) + [ - '@', workspace.flags_path, -]) +workspace, job_info = big_jobs.initiate() +big_jobs.run_rosetta( + workspace, job_info, + use_fragments=True, +) +big_jobs.debrief() diff --git a/pull_into_place/big_jobs/standard_params/filters.xml b/pull_into_place/big_jobs/standard_params/filters.xml index e6df943..c55a04f 100644 --- a/pull_into_place/big_jobs/standard_params/filters.xml +++ b/pull_into_place/big_jobs/standard_params/filters.xml @@ -1,8 +1,44 @@ - - - - - + + + + + diff --git a/pull_into_place/commands/07_setup_design_fragments.py b/pull_into_place/commands/07_setup_design_fragments.py index 05091e2..454483a 100644 --- a/pull_into_place/commands/07_setup_design_fragments.py +++ b/pull_into_place/commands/07_setup_design_fragments.py @@ -20,13 +20,8 @@ Print out the command-line that would be used to generate fragments, but don't actually run it. - -x, --clear - Remove any previously generated fragment files. - -Simply rerun this command if some of your fragment generation jobs fail. By -default it will only submit jobs for inputs that are missing valid fragment -files. You can force the fragments to be regenerated from scratch by passing -the '--clear' flag. +Simply rerun this command if some of your fragment generation jobs fail. It +will only submit jobs for inputs that are missing valid fragment files. """ import subprocess @@ -43,16 +38,12 @@ def main(): workspace.check_rosetta() workspace.make_dirs() - # Do this before working out the 'klab_generate_fragments' command, because - # it may affect which inputs are picked. - if args['--clear'] and not args['--dry-run']: - workspace.clear_fragments() - generate_fragments = [ 'klab_generate_fragments', '--loops_file', workspace.loops_path, '--outdir', workspace.fragments_dir, '--memfree', args['--mem-free'], + '--overwrite', ] + pick_inputs(workspace) if args['--dry-run']: diff --git a/pull_into_place/pipeline.py b/pull_into_place/pipeline.py index bc4c3b1..6a64f29 100644 --- a/pull_into_place/pipeline.py +++ b/pull_into_place/pipeline.py @@ -13,7 +13,7 @@ from klab import scripting from pprint import pprint -class Workspace (object): +class Workspace(object): """ Provide paths to every file used in the design pipeline. @@ -186,14 +186,22 @@ def restraints_path(self): def scorefxn_path(self): return self.find_path('scorefxn.wts') - @property - def build_script_path(self): - return self.find_path('build_models.xml') - @property def filters_path(self): return self.find_path('filters.xml') + @property + def metrics_dir(self): + return self.find_path('metrics') + + @property + def metric_scripts(self): + return glob.glob(os.path.join(self.metrics_dir, '*')) + + @property + def build_script_path(self): + return self.find_path('build_models.xml') + @property def design_script_path(self): return self.find_path('design_models.xml') @@ -329,7 +337,7 @@ def exists(self): return os.path.exists(self.focus_dir) -class BigJobWorkspace (Workspace): +class BigJobWorkspace(Workspace): """ Provide paths needed to run big jobs on the cluster. @@ -339,6 +347,10 @@ class BigJobWorkspace (Workspace): parameters files, and several other things like that. """ + @property + def protocol_path(self): + raise NotImplementedError + @property def input_dir(self): return os.path.join(self.focus_dir, 'inputs') @@ -347,8 +359,11 @@ def input_dir(self): def input_paths(self): return glob.glob(os.path.join(self.input_dir, '*.pdb.gz')) - def input_path(self, name): - return os.path.join(self.input_dir, name) + def input_path(self, job_info): + raise NotImplementedError + + def input_basename(self, job_info): + return os.path.basename(self.input_path(job_info)) @property def input_names(self): @@ -366,6 +381,21 @@ def output_subdirs(self): def output_paths(self): return glob.glob(os.path.join(self.input_dir, '*.pdb.gz')) + def output_path(self, job_info): + prefix = self.output_prefix(job_info) + basename = os.path.basename(self.input_path(job_info)[:-len('.pdb.gz')]) + suffix = self.output_suffix(job_info) + return prefix + basename + suffix + '.pdb.gz' + + def output_basename(self, job_info): + return os.path.basename(self.output_path(job_info)) + + def output_prefix(self, job_info): + return self.output_dir + '/' + + def output_suffix(self, job_info): + return '' + @property def io_dirs(self): return [self.input_dir] + self.output_subdirs @@ -387,22 +417,22 @@ def rsync_exclude_patterns(self): parent_patterns = super(BigJobWorkspace, self).rsync_exclude_patterns return parent_patterns + ['stderr/', 'stdout/', '*.sc'] - def job_params_path(self, job_id): + def job_info_path(self, job_id): return os.path.join(self.focus_dir, '{0}.json'.format(job_id)) @property - def all_job_params_paths(self): + def all_job_info_paths(self): return glob.glob(os.path.join(self.focus_dir, '*.json')) @property - def all_job_params(self): + def all_job_info(self): from . import big_jobs - return [big_jobs.read_params(x) for x in self.all_job_params_paths] + return [big_jobs.read_job_info(x) for x in self.all_job_info_paths] @property def unclaimed_inputs(self): inputs = set(self.input_names) - for params in self.all_job_params: + for params in self.all_job_info: inputs -= set(params['inputs']) return sorted(inputs) @@ -421,11 +451,11 @@ def clear_outputs(self): scripting.clear_directory(self.stdout_dir) scripting.clear_directory(self.stderr_dir) - for path in self.all_job_params_paths: + for path in self.all_job_info_paths: os.remove(path) -class WithFragmentLibs (object): +class WithFragmentLibs(object): """ Provide paths needed to interact with fragment libraries. @@ -513,7 +543,7 @@ def clear_fragments(self): scripting.clear_directory(self.fragments_dir) -class RestrainedModels (BigJobWorkspace, WithFragmentLibs): +class RestrainedModels(BigJobWorkspace, WithFragmentLibs): def __init__(self, root): BigJobWorkspace.__init__(self, root) @@ -524,22 +554,35 @@ def from_directory(directory): @property def focus_name(self): - return os.path.join(self.root_dir, 'restrained_models') + return 'restrained_models' @property def focus_dir(self): return os.path.join(self.root_dir, '01_{0}'.format(self.focus_name)) + @property + def protocol_path(self): + return self.build_script_path + @property def input_dir(self): return self.root_dir + def input_path(self, parameters): + return self.input_pdb_path + @property def input_paths(self): return [self.input_pdb_path] + def output_prefix(self, job_info): + return os.path.join( + self.output_dir, + '{0}_{1:06d}_'.format(job_info['job_id'], job_info['task_id']), + ) + -class FixbbDesigns (BigJobWorkspace): +class FixbbDesigns(BigJobWorkspace): def __init__(self, root, round): BigJobWorkspace.__init__(self, root) @@ -569,8 +612,20 @@ def focus_dir(self): subdir = '{0:02}_{1}_round_{2}'.format(prefix, self.focus_name, self.round) return os.path.join(self.root_dir, subdir) + @property + def protocol_path(self): + return self.design_script_path + def input_path(self, job_info): + bb_models = job_info['inputs'] + bb_model = bb_models[job_info['task_id'] % len(bb_models)] + return os.path.join(self.input_dir, bb_model) + + def output_suffix(self, job_info): + design_id = job_info['task_id'] // len(job_info['inputs']) + return '_{0:03}'.format(design_id) -class ValidatedDesigns (BigJobWorkspace, WithFragmentLibs): + +class ValidatedDesigns(BigJobWorkspace, WithFragmentLibs): def __init__(self, root, round): BigJobWorkspace.__init__(self, root) @@ -597,6 +652,15 @@ def focus_dir(self): subdir = '{0:02}_{1}_round_{2}'.format(prefix, self.focus_name, self.round) return os.path.join(self.root_dir, subdir) + @property + def protocol_path(self): + return self.validate_script_path + + def input_path(self, job_info): + designs = job_info['inputs'] + design = designs[job_info['task_id'] % len(designs)] + return os.path.join(self.input_dir, design) + @property def output_subdirs(self): return sorted(glob.glob(os.path.join(self.output_dir, '*/'))) @@ -605,6 +669,13 @@ def output_subdir(self, input_name): basename = os.path.basename(input_name[:-len('.pdb.gz')]) return os.path.join(self.output_dir, basename) + def output_prefix(self, job_info): + input_model = self.input_basename(job_info)[:-len('.pdb.gz')] + return os.path.join(self.output_dir, input_model) + '/' + + def output_suffix(self, job_info): + return '_{0:03d}'.format(job_info['task_id'] / len(job_info['inputs'])) + def big_job_dir():