Skip to content

Commit

Permalink
remove cooc_collector from decision of num_threads in collection parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSolotky committed Feb 3, 2019
1 parent 43fd44c commit 10b2fe7
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
5 changes: 1 addition & 4 deletions src/artm/core/collection_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -767,10 +767,7 @@ CollectionParserInfo CollectionParser::ParseVowpalWabbit() {
num_threads = 1;
LOG(INFO) << "CollectionParserConfig.num_threads is set to 1 (default)";
} else {
// number of threads shouldn't be higher than maximal allowable number of open files in a process
// because else there could be a situation when all the threads are trying to dump it's own batch
// to the external storage
num_threads = std::min(n, cooc_collector.config_.max_num_of_open_files_in_a_process());
num_threads = n;
LOG(INFO) << "CollectionParserConfig.num_threads is automatically set to " << num_threads;
}
} else {
Expand Down
20 changes: 13 additions & 7 deletions src/artm/core/cooccurrence_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ void CooccurrenceCollector::ReadAndMergeCooccurrenceBatches() {

std::cerr << "\nMerging co-occurrence batches. Stage 1: parallel agglomerative merge" << std::endl;
// This magic constant isn't the best and can be set to another value found in experiments
// This value should be lower than maximal number of open files in a process to avoid frequent
// openning-closing of the files
const unsigned min_num_of_batches_to_be_merged_in_parallel = 32;
while (NumOfCooccurrenceBatches() > min_num_of_batches_to_be_merged_in_parallel) {
FirstStageOfMerging(); // number of files is decreasing here
Expand All @@ -277,6 +279,8 @@ void CooccurrenceCollector::ReadAndMergeCooccurrenceBatches() {
open_files_counter_ -= buffer_for_output_files.open_files_counter_;
}

// ToDo (MichaelSolotky): find out why it doesn't use the whole power of all the cores
// and sometimes runs a long time with a single thread
void CooccurrenceCollector::FirstStageOfMerging() {
// Stage 1: merging portions of batches into intermediate batches
// The strategy is the folowing: divide the vector of batches into as much buckets as it's possible
Expand Down Expand Up @@ -304,12 +308,12 @@ void CooccurrenceCollector::FirstStageOfMerging() {
// 1 is subtracted here because 1 file will be needed for writing in it
const int optimal_portion_size = std::min(static_cast<int>(vector_of_batches_.size() / num_of_threads),
config_.max_num_of_open_files_in_a_thread() - 1);
// Portion size lower than 2 would lead to cycling or an error
const int portion_size = std::max(optimal_portion_size, 2);

// Portion size < 2 would lead to cycling or an error, if it would be > 10, much RAM space would be needed
const int portion_size = std::min(std::max(optimal_portion_size, 2), 10);
std::shared_ptr<std::mutex> open_close_file_mutex_ptr(new std::mutex);

auto func = [&open_close_file_mutex_ptr, portion_size, this](int i) { // Wrapper around KWayMerge
auto func = [&open_close_file_mutex_ptr, portion_size, this](int portion_index) { // Wrapper around KWayMerge
const int total_num_of_batches = vector_of_batches_.size();
std::shared_ptr<CooccurrenceBatch> batch(CreateNewCooccurrenceBatch());
OpenBatchOutputFile(batch);
// vocab will be coppied here into buffer_for_a_batch
Expand All @@ -320,9 +324,10 @@ void CooccurrenceCollector::FirstStageOfMerging() {
// Stage 1: take i-th portion from pull of batches
{
std::unique_lock<std::mutex> vector_of_batches_access_lock(vector_of_batches_access_mutex_);
for (int j = i * portion_size; j < (i + 1) * portion_size &&
j < static_cast<int>(vector_of_batches_.size()); ++j) {
portion_of_batches.push_back(std::move(vector_of_batches_[j]));
for (int batch_index = portion_index * portion_size;
batch_index < (portion_index + 1) * portion_size && batch_index < total_num_of_batches;
++batch_index) {
portion_of_batches.push_back(std::move(vector_of_batches_[batch_index]));
}
}
KWayMerge(&buffer_for_a_batch, BATCH, &portion_of_batches, batch, open_close_file_mutex_ptr);
Expand Down Expand Up @@ -702,6 +707,7 @@ void CooccurrenceBatch::ReadRecords() {
// ********************************* Methods of class BufferOfCooccurrences *********************************

// ToDo (MichaeSolotky): logic of this class isn't obvious, it would be cool to express thoughts clearer
// ToDo (MichaeSolotky): don't copy vocab if there's no need (like in CooccurrenceCollector)

// The main purpose of this class is to store statistics of co-occurrences and some
// variables calculated on base of them, perform that calculations, write that in
Expand Down

0 comments on commit 10b2fe7

Please sign in to comment.