Skip to content

Commit

Permalink
Experimental support for parallelization (Theano only) in train_allel…
Browse files Browse the repository at this point in the history
…e_specific_models_command
  • Loading branch information
timodonnell committed Nov 28, 2017
1 parent c699066 commit 5b4dfbd
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import time
import signal
import traceback
from multiprocessing import Pool

import pandas
from mhcnames import normalize_allele_name

from .class1_affinity_predictor import Class1AffinityPredictor
from ..common import configure_logging
Expand Down Expand Up @@ -78,7 +80,14 @@
type=int,
help="Keras verbosity. Default: %(default)s",
default=1)

parser.add_argument(
"--parallelization-num-jobs",
default=1,
type=int,
metavar="N",
help="Parallelization jobs. Experimental. Does NOT work with tensorflow. "
"Set to 1 for serial run. Set to 0 to use number of cores. "
"Default: %(default)s.")

def run(argv=sys.argv[1:]):
# On sigusr1 print stack trace
Expand Down Expand Up @@ -110,7 +119,9 @@ def run(argv=sys.argv[1:]):
allele_counts = df.allele.value_counts()

if args.allele:
alleles = args.allele
alleles = [normalize_allele_name(a) for a in args.allele]

# Allele names in data are assumed to be already normalized.
df = df.ix[df.allele.isin(alleles)]
else:
alleles = list(allele_counts.ix[
Expand All @@ -121,6 +132,15 @@ def run(argv=sys.argv[1:]):
print("Training data: %s" % (str(df.shape)))

predictor = Class1AffinityPredictor()
if args.parallelization_num_jobs == 1:
# Serial run
worker_pool = None
else:
worker_pool = Pool(
processes=(
args.parallelization_num_jobs
if args.parallelization_num_jobs else None))
print("Using worker pool: %s" % str(worker_pool))

if args.out_models_dir and not os.path.exists(args.out_models_dir):
print("Attempting to create directory: %s" % args.out_models_dir)
Expand All @@ -139,29 +159,42 @@ def run(argv=sys.argv[1:]):
if args.max_epochs:
hyperparameters['max_epochs'] = args.max_epochs

for model_group in range(n_models):
for (i, allele) in enumerate(alleles):
print(
"[%2d / %2d hyperparameters] "
"[%2d / %2d replicates] "
"[%4d / %4d alleles]: %s" % (
h + 1,
len(hyperparameters_lst),
model_group + 1,
n_models,
i + 1,
len(alleles), allele))

train_data = df.ix[df.allele == allele].dropna().sample(
frac=1.0)

predictor.fit_allele_specific_predictors(
n_models=1,
architecture_hyperparameters=hyperparameters,
allele=allele,
peptides=train_data.peptide.values,
affinities=train_data.measurement_value.values,
models_dir_for_save=args.out_models_dir)
work_items = []
for (i, (allele, sub_df)) in enumerate(df.groupby("allele")):
for model_group in range(n_models):
work_dict = {
'model_group': model_group,
'n_models': n_models,
'allele_num': i,
'n_alleles': len(alleles),
'hyperparameter_set_num': h,
'num_hyperparameter_sets': len(hyperparameters_lst),
'allele': allele,
'sub_df': sub_df,
'hyperparameters': hyperparameters,
'predictor': predictor if not worker_pool else None,
'save_to': args.out_models_dir if not worker_pool else None,
}
work_items.append(work_dict)

if worker_pool:
print("Processing %d work items in parallel." % len(work_items))
predictors = worker_pool.map(work_entrypoint, work_items, chunksize=1)
print("Merging %d predictors fit in parallel." % (len(predictors)))
predictor = Class1AffinityPredictor.merge([predictor] + predictors)
print("Saving merged predictor to: %s" % args.out_models_dir)
predictor.save(args.out_models_dir)
else:
# Run in serial. In this case, every worker is passed the same predictor,
# which it adds models to, so no merging is required. It also saves
# as it goes so no saving is required at the end.
for item in work_items:
work_predictor = work_entrypoint(item)
assert work_predictor is predictor

if worker_pool:
worker_pool.close()
worker_pool.join()

if args.percent_rank_calibration_num_peptides_per_length > 0:
start = time.time()
Expand All @@ -173,5 +206,50 @@ def run(argv=sys.argv[1:]):
predictor.save(args.out_models_dir, model_names_to_write=[])


def work_entrypoint(item):
return process_work(**item)


def process_work(
model_group,
n_models,
allele_num,
n_alleles,
hyperparameter_set_num,
num_hyperparameter_sets,
allele,
sub_df,
hyperparameters,
predictor,
save_to):

if predictor is None:
predictor = Class1AffinityPredictor()

print(
"[%2d / %2d hyperparameters] "
"[%2d / %2d replicates] "
"[%4d / %4d alleles]: %s" % (
hyperparameter_set_num + 1,
num_hyperparameter_sets,
model_group + 1,
n_models,
allele_num + 1,
n_alleles,
allele))

train_data = sub_df.dropna().sample(frac=1.0)
predictor.fit_allele_specific_predictors(
n_models=1,
architecture_hyperparameters=hyperparameters,
allele=allele,
peptides=train_data.peptide.values,
affinities=train_data.measurement_value.values,
models_dir_for_save=save_to)

return predictor



if __name__ == '__main__':
run()
10 changes: 9 additions & 1 deletion test/test_train_allele_specific_models_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
]


def test_run():
def run_and_check(n_jobs=0):
models_dir = tempfile.mkdtemp(prefix="mhcflurry-test-models")
hyperparameters_filename = os.path.join(
models_dir, "hyperparameters.yaml")
Expand All @@ -63,6 +63,7 @@ def test_run():
"--allele", "HLA-A*02:01", "HLA-A*01:01", "HLA-A*03:01",
"--out-models-dir", models_dir,
"--percent-rank-calibration-num-peptides-per-length", "10000",
"--parallelization-num-jobs", str(n_jobs),
]
print("Running with args: %s" % args)
train_allele_specific_models_command.run(args)
Expand All @@ -81,3 +82,10 @@ def test_run():

print("Deleting: %s" % models_dir)
shutil.rmtree(models_dir)

def Xtest_run_parallel():
run_and_check(n_jobs=3)


def test_run_serial():
run_and_check(n_jobs=1)

0 comments on commit 5b4dfbd

Please sign in to comment.