Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correction of mistakes associated with co-occurrence gathering and collection parsing #947

Merged
merged 16 commits into from
Feb 8, 2019
Merged
6 changes: 4 additions & 2 deletions appveyor-mingw.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ environment:

cache:
# Cache MSYS2 packages to speed up builds. If version in repo changes, pacman will update it
- c:\msys64\mingw64
- c:\msys64\var\lib\pacman
# These lines below are commented, because disable caching is hot fix,
# but generally it's a usefull function, and when we find solution, they can be uncommented
#- c:\msys64\mingw64
#- c:\msys64\var\lib\pacman
- C:\Miniconda36-x64\Lib\site-packages
- C:\Miniconda-x64\Lib\site-packages

Expand Down
2 changes: 1 addition & 1 deletion docs/download.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Downloads

* **Windows**

* Latest 64 bit release: `BigARTM_v0.9.0_win64 <https://github.com/bigartm/bigartm/releases/download/v0.9.0/BigARTM_v0.9.0_win64.7z>`_
* Latest 64 bit release: `BigARTM_v0.10.0_win64 <https://github.com/bigartm/bigartm/releases/download/v0.10.0/BigARTM_v0.10.0_win64.7z>`_
* Latest build from master branch: `BigARTM_master_win64.7z <https://ci.appveyor.com/api/projects/bigartm/bigartm/artifacts/BigARTM.7z?branch=master&job=Environment%3A%20PYTHON_VERSION%3D3.6%2C%20MINICONDA%3DC%3A%5CMiniconda36-x64>`_ (warning, use this with caution)
* All previous releases are available at https://github.com/bigartm/bigartm/releases

Expand Down
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def is_pure(self):

setup_kwargs = dict(
name='bigartm',
version='0.9.0',
version='0.10.0',
packages=find_packages(),
install_requires=[
'pandas',
Expand Down
2 changes: 1 addition & 1 deletion src/artm/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ add_custom_target(proto_generation ALL

# The version number
set(ARTM_VERSION_MAJOR 0)
set(ARTM_VERSION_MINOR 9)
set(ARTM_VERSION_MINOR 10)
set(ARTM_VERSION_PATCH 0)

# configure a header file to pass some of the CMake settings
Expand Down
86 changes: 46 additions & 40 deletions src/artm/core/collection_parser.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018, Additive Regularization of Topic Models.
// Copyright 2019, Additive Regularization of Topic Models.

#include "artm/core/collection_parser.h"

Expand Down Expand Up @@ -487,7 +487,7 @@ CollectionParserInfo CollectionParser::ParseVowpalWabbit() {
std::istream& docword = stream_or_cin.get_stream();
utility::ProgressPrinter progress(stream_or_cin.size());

auto config = config_;
auto collection_parser_config = config_;

std::mutex read_access;
std::mutex cooc_config_access;
Expand All @@ -499,7 +499,7 @@ CollectionParserInfo CollectionParser::ParseVowpalWabbit() {
std::unordered_map<Token, bool, TokenHasher> token_map;
CollectionParserInfo parser_info;

::artm::core::CooccurrenceCollector cooc_collector(config);
::artm::core::CooccurrenceCollector cooc_collector(collection_parser_config);
int64_t total_num_of_pairs = 0;
std::atomic_bool gather_transaction_cooc(false); // This flag will be needed for special
// gathering of co-occurrences on transaction data
Expand All @@ -508,13 +508,14 @@ CollectionParserInfo CollectionParser::ParseVowpalWabbit() {
// 1. Acquire lock for reading from docword file
// 2. Read num_items_per_batch lines from docword file, and store them in a local buffer (vector<string>)
// 3. Release the lock
// 4. Parse strings, form a batch, and save it to disk
// 4. Parse strings, form a batch, and save it to the external storage
// During parsing it gathers co-occurrence counters for pairs of tokens (if the correspondent flag == true)
// Steps 1-4 are repeated in a while loop until there is no content left in docword file.
// Multiple copies of the function can work in parallel.
auto func = [&docword, &global_line_no, &progress, &batch_name_generator, &read_access,
&cooc_config_access, &token_map_access, &token_statistics_access, &parser_info,
&token_map, &total_num_of_pairs, &cooc_collector, &gather_transaction_cooc, config]() {
&token_map, &total_num_of_pairs, &cooc_collector, &gather_transaction_cooc,
collection_parser_config]() {
int64_t local_num_of_pairs = 0; // statistics for future ppmi calculation
while (true) {
// The following variable remembers at which line the batch has started.
Expand All @@ -533,7 +534,7 @@ CollectionParserInfo CollectionParser::ParseVowpalWabbit() {
break;
}

while ((int64_t) all_strs_for_batch.size() < config.num_items_per_batch()) {
while ((int64_t) all_strs_for_batch.size() < collection_parser_config.num_items_per_batch()) {
std::string str;
std::getline(docword, str);
global_line_no++;
Expand All @@ -550,16 +551,17 @@ CollectionParserInfo CollectionParser::ParseVowpalWabbit() {
}
}

// It will hold tf and df of pairs of tokens
// Every pair of valid tokens (both exist in vocab) is saved in this storage
// After walking through portion of documents all the statistics is dumped on disk
// and then this storage is destroyed
// This container holds tf and df of pairs of tokens
// Every pair of valid tokens (e.g. both exist in vocab) is saved in this storage
// After walking through a batch of documents all the statistics will be dumped to the external storage
// and then this storage will be destroyed
CooccurrenceStatisticsHolder cooc_stat_holder;
// For every token from vocab keep the information about the last document this token occured in

// ToDo (MichaelSolotky): consider the case if there is no vocab
std::vector<int> num_of_last_document_token_occured(cooc_collector.vocab_.token_map_.size(), -1);
std::vector<int> num_of_the_last_document_token_occurred_in(cooc_collector.VocabSize(), -1);

// Loop through documents
for (int str_index = 0; str_index < (int64_t) all_strs_for_batch.size(); ++str_index) {
std::string str = all_strs_for_batch[str_index];
const int line_no = first_line_no_for_batch + str_index;
Expand All @@ -569,7 +571,7 @@ CollectionParserInfo CollectionParser::ParseVowpalWabbit() {

if (strs.size() <= 1) {
std::stringstream ss;
ss << "Error in " << config.docword_file_path() << ":" << line_no
ss << "Error in " << collection_parser_config.docword_file_path() << ":" << line_no
<< " has too few entries: " << str;
BOOST_THROW_EXCEPTION(InvalidOperation(ss.str()));
}
Expand All @@ -583,6 +585,7 @@ CollectionParserInfo CollectionParser::ParseVowpalWabbit() {
std::vector<ClassId> class_ids;
std::vector<float> weights;

// Loop through tokens
for (unsigned elem_index = 1; elem_index < strs.size(); ++elem_index) {
std::string elem = strs[elem_index];
if (elem.size() == 0) {
Expand Down Expand Up @@ -622,7 +625,7 @@ CollectionParserInfo CollectionParser::ParseVowpalWabbit() {
}

// Skip token when it is not among modalities that user has requested to parse
if (!useClassId(current_class_id, config)) {
if (!useClassId(current_class_id, collection_parser_config)) {
continue;
}

Expand All @@ -632,7 +635,7 @@ CollectionParserInfo CollectionParser::ParseVowpalWabbit() {
if (split_index != std::string::npos) {
if (split_index == 0 || split_index == (elem.size() - 1)) {
std::stringstream ss;
ss << "Error in " << config.docword_file_path() << ":" << line_no
ss << "Error in " << collection_parser_config.docword_file_path() << ":" << line_no
<< ", entries can not start or end with colon: " << elem;
BOOST_THROW_EXCEPTION(InvalidOperation(ss.str()));
}
Expand All @@ -642,7 +645,7 @@ CollectionParserInfo CollectionParser::ParseVowpalWabbit() {
}
catch (boost::bad_lexical_cast &) {
std::stringstream ss;
ss << "Error in " << config.docword_file_path() << ":" << line_no
ss << "Error in " << collection_parser_config.docword_file_path() << ":" << line_no
<< ", can not parse integer number of occurences: " << elem;
BOOST_THROW_EXCEPTION(InvalidOperation(ss.str()));
}
Expand All @@ -652,27 +655,27 @@ CollectionParserInfo CollectionParser::ParseVowpalWabbit() {
class_ids.push_back(current_class_id);
weights.push_back(token_weight);

if (config.gather_cooc()) { // Co-occurence gathering starts here
const ClassId first_token_class_id = class_ids[0];
if (collection_parser_config.gather_cooc()) { // Co-occurence gathering starts here
const ClassId first_token_class_id = current_class_id;

int first_token_id = -1;
if (config.has_vocab_file_path()) {
if (collection_parser_config.has_vocab_file_path()) {
std::string first_token = DropWeightSuffix(elem);
first_token_id = cooc_collector.vocab_.FindTokenId(first_token, first_token_class_id);
first_token_id = cooc_collector.FindTokenIdInVocab(first_token, first_token_class_id);
if (first_token_id == TOKEN_NOT_FOUND) {
continue;
}
} else { // ToDo (MichaelSolotky): continue the case if there is no vocab
} else { // ToDo (MichaelSolotky): consider the case if there is no vocab
BOOST_THROW_EXCEPTION(InvalidOperation("No vocab file specified. Can't gather co-occurrences"));
}

if (num_of_last_document_token_occured[first_token_id] != str_index) {
num_of_last_document_token_occured[first_token_id] = str_index;
if (num_of_the_last_document_token_occurred_in[first_token_id] != str_index) {
num_of_the_last_document_token_occurred_in[first_token_id] = str_index;
std::unique_lock<std::mutex> lock(token_statistics_access);
++cooc_collector.num_of_documents_token_occurred_in_[first_token_id];
}
// Take window_width tokens (parameter) to the right of the current one
// If there are some words beginnig on '|' in the text the window should be extended
// If there are some words beginnig with '|' in the text the window should be extended
// and it's extended using not_a_word_counter
ClassId second_token_class_id = first_token_class_id;
unsigned not_a_word_counter = 0;
Expand All @@ -695,13 +698,13 @@ CollectionParserInfo CollectionParser::ParseVowpalWabbit() {
}
int second_token_id = -1;
const std::string neighbour = strs[elem_index + neighbour_index];
if (config.has_vocab_file_path()) {
if (collection_parser_config.has_vocab_file_path()) {
std::string second_token = DropWeightSuffix(neighbour);
second_token_id = cooc_collector.vocab_.FindTokenId(second_token, second_token_class_id);
second_token_id = cooc_collector.FindTokenIdInVocab(second_token, second_token_class_id);
if (second_token_id == TOKEN_NOT_FOUND) {
continue;
}
} else { // ToDo (MichaelSolotky): continue the case if there is no vocab
} else { // ToDo (MichaelSolotky): consider the case if there is no vocab
BOOST_THROW_EXCEPTION(InvalidOperation("No vocab file specified. Can't gather co-occurrences"));
}

Expand All @@ -728,12 +731,13 @@ CollectionParserInfo CollectionParser::ParseVowpalWabbit() {
}

batch_collector.FinishItem(line_no, item_title);
} // End of items of 1 batch parsing
if (config.gather_cooc() && !cooc_stat_holder.Empty()) {
// This function saves gathered statistics on disk
// After saving on disk statistics from all the batches needs to be merged
} // End of parsing the items of 1 batch
if (collection_parser_config.gather_cooc() && !cooc_stat_holder.Empty()) {
// This function saves gathered statistics to the external storage
// After saving to the external storage statistics from all the batches needs to be merged
// This is implemented in ReadAndMergeCooccurrenceBatches(), so the next step is to call this method
// Sorting is needed before storing all pairs of tokens on disk (it's for future agregation)
// Sorting is needed before storing all pairs of tokens to the external storage
// (it's for the future aggregation)
cooc_collector.UploadOnDisk(cooc_stat_holder);
}

Expand All @@ -746,7 +750,7 @@ CollectionParserInfo CollectionParser::ParseVowpalWabbit() {
token_map[artm::core::Token(batch.class_id(token_id), batch.token(token_id))] = true;
}
}
::artm::core::Helpers::SaveBatch(batch, config.target_folder(), batch_name);
::artm::core::Helpers::SaveBatch(batch, collection_parser_config.target_folder(), batch_name);
}
} // End of collection parsing

Expand All @@ -756,19 +760,21 @@ CollectionParserInfo CollectionParser::ParseVowpalWabbit() {
}
};

int num_threads = config.num_threads();
if (!config.has_num_threads() || config.num_threads() < 0) {
unsigned int n = std::thread::hardware_concurrency();
int num_threads = 0;
if (!collection_parser_config.has_num_threads() || collection_parser_config.num_threads() < 0) {
int n = std::thread::hardware_concurrency();
if (n == 0) {
LOG(INFO) << "CollectionParserConfig.num_threads is set to 1 (default)";
num_threads = 1;
LOG(INFO) << "CollectionParserConfig.num_threads is set to 1 (default)";
} else {
LOG(INFO) << "CollectionParserConfig.num_threads is automatically set to " << n;
num_threads = n;
LOG(INFO) << "CollectionParserConfig.num_threads is automatically set to " << num_threads;
}
} else {
num_threads = collection_parser_config.num_threads();
}

Helpers::CreateFolderIfNotExists(config.target_folder());
Helpers::CreateFolderIfNotExists(collection_parser_config.target_folder());

// The func may throw an exception if docword is malformed.
// This exception will be re-thrown on the main thread.
Expand All @@ -789,8 +795,8 @@ CollectionParserInfo CollectionParser::ParseVowpalWabbit() {
cooc_collector.config_.set_total_num_of_documents(parser_info.num_items());

// Launch merging of co-occurrence bathces and ppmi calculation
if (config.gather_cooc() && cooc_collector.VocabSize() >= 2) {
if (cooc_collector.CooccurrenceBatchesQuantity() != 0) {
if (collection_parser_config.gather_cooc() && cooc_collector.VocabSize() >= 2) {
if (cooc_collector.NumOfCooccurrenceBatches() != 0) {
cooc_collector.ReadAndMergeCooccurrenceBatches();
}
}
Expand Down
Loading