Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change multiprocessing methodology from Process to Pool.starmap. #9

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 36 additions & 38 deletions mantis_ml/modules/supervised_learn/pu_learn/pu_learning.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
warnings.filterwarnings("ignore", message="numpy.ufunc size changed")

import xgboost as xgb
from multiprocessing import Process, Manager
from multiprocessing import Manager
from multiprocessing.pool import Pool
from itertools import repeat

from mantis_ml.config_class import Config
from mantis_ml.modules.supervised_learn.core.prepare_train_test_sets import PrepareTrainTestSets
Expand Down Expand Up @@ -266,60 +268,56 @@ def run_pu_learning(self, selected_base_classifiers=None, final_level_classifier
train_acc_list = manager.list()
test_acc_list = manager.list()
auc_score_list = manager.list()
process_jobs = []

total_subsets = None
for i in range(1, self.cfg.iterations + 1):
print('-----------------------------------------------> Iteration:', i)

process_jobs = []

# get random partition of the entire dataset
iter_random_state = random.randint(0, 1000000000)

set_generator = PrepareTrainTestSets(self.cfg)
train_dfs, test_dfs = set_generator.get_balanced_train_test_sets(self.data, random_state=iter_random_state)
if total_subsets is None:
total_subsets = len(train_dfs)

# Loop through all balanced datasets from the entire partitioning of current iteration
for i in range(len(train_dfs)):
#for i in range(10): # Debugging only
train_data = train_dfs[i]
test_data = test_dfs[i]
#print(f"Training set size: {train_data.shape[0]}")
#print(f"Test set size: {test_data.shape[0]}")

p = None

train_data = train_dfs
test_data = test_dfs

# debug only
# train_data = train_dfs[:10]
# test_data = test_dfs[:10]

with Pool(processes=self.cfg.nthreads) as pool:
# Train models on each of the balanced datasets in our current iteration

if self.clf_id == 'DNN':
p = Process(target=self.train_dnn_on_subset, args=(train_data, test_data,
pos_genes_dict, neg_genes_dict,
train_acc_list, test_acc_list, auc_score_list, pos_y_prob_dict))
# if a = [1, 2] and b = 3, zip(a, repeat(b)) = [(1, 3), (2, 3)]
# so starmap(f, zip(a, repeat(b))) will call f(1, 3) and f(2, 3)
pool.starmap(self.train_dnn_on_subset,
zip(train_data, test_data,
repeat(pos_genes_dict), repeat(neg_genes_dict),
repeat(train_acc_list), repeat(test_acc_list),
repeat(auc_score_list), repeat(pos_y_prob_dict)))

elif self.clf_id == 'Stacking':
# Define Stacking classifiers
p = Process(target=self.train_stacking_on_subset, args=(selected_base_classifiers, final_level_classifier,
train_data, test_data,
pos_genes_dict, neg_genes_dict,
feature_dataframe_list, train_acc_list, test_acc_list, auc_score_list, pos_y_prob_dict))
pool.starmap(self.train_stacking_on_subset,
zip(repeat(selected_base_classifiers), repeat(final_level_classifier),
train_data, test_data,
repeat(pos_genes_dict), repeat(neg_genes_dict),
repeat(feature_dataframe_list),
repeat(train_acc_list), repeat(test_acc_list),
repeat(auc_score_list), repeat(pos_y_prob_dict)))

elif self.clf_id in sklearn_extended_classifiers:
p = Process(target=self.train_extended_sklearn_clf_on_subset, args=(train_data, test_data,
pos_genes_dict, neg_genes_dict,
feature_dataframe_list, train_acc_list,
test_acc_list, auc_score_list, pos_y_prob_dict))

process_jobs.append(p)
p.start()
pool.starmap(self.train_extended_sklearn_clf_on_subset,
zip(train_data, test_data,
repeat(pos_genes_dict), repeat(neg_genes_dict),
repeat(feature_dataframe_list),
repeat(train_acc_list), repeat(test_acc_list),
repeat(auc_score_list), repeat(pos_y_prob_dict)))

if len(process_jobs) > self.cfg.nthreads:
for p in process_jobs:
p.join()
process_jobs = []

for p in process_jobs:
p.join()


if self.clf_id in feature_imp_classifiers:
feat_list_file = str(self.cfg.superv_out / ('PU_' + self.clf_id + '.feature_dfs_list.txt'))
# cleanup
Expand Down