Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster parallel Vocabulary Reading #425

Merged
merged 2 commits into from
Jul 18, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 50 additions & 13 deletions src/index/Vocabulary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "../parser/RdfEscaping.h"
#include "../parser/Tokenizer.h"
#include "../util/BatchedPipeline.h"
#include "../util/File.h"
#include "../util/HashMap.h"
#include "../util/HashSet.h"
Expand All @@ -27,27 +28,63 @@ void Vocabulary<S, C>::readFromFile(const string& fileName,
std::fstream in(fileName.c_str(), std::ios_base::in);
string line;
[[maybe_unused]] bool first = true;
std::string lastExpandedString;
while (std::getline(in, line)) {
if constexpr (_isCompressed) {
// when we read from file it means that all preprocessing has been done
// and the prefixes are already stripped in the file
auto str = RdfEscaping::unescapeNewlinesAndBackslashes(
expandPrefix(CompressedString::fromString(line)));

_words.push_back(compressPrefix(str));
if constexpr (_isCompressed) {
// return a std::optional<string> with the next word and nullopt in case of
// exhaustion.
auto creator = [&]() -> std::optional<std::string> {
if (std::getline(in, line)) {
return std::optional(std::move(line));
} else {
return std::nullopt;
}
};

auto expand = [this](std::string&& s) {
return expandPrefix(CompressedString::fromString(s));
};

auto normalize = [](std::string&& s) {
return RdfEscaping::unescapeNewlinesAndBackslashes(s);
};

auto compress = [this](std::string&& s) { return compressPrefix(s); };

auto push = [this](CompressedString&& s) {
_words.push_back(std::move(s));
if (_words.size() % 50'000'000 == 0) {
LOG(INFO) << "Read " << _words.size() << " words." << std::endl;
}
return std::move(s);
};

auto check = [&, lastExpandedString = std::string{},
first = true](std::string&& s) mutable {
if (!first) {
if (!(_caseComparator.compare(lastExpandedString, str,
if (!(_caseComparator.compare(lastExpandedString, s,
SortLevel::TOTAL))) {
LOG(ERROR) << "Vocabulary is not sorted in ascending order for words "
<< lastExpandedString << " and " << str << std::endl;
<< lastExpandedString << " and " << s << std::endl;
// AD_CHECK(false);
}
} else {
first = false;
}
lastExpandedString = std::move(str);
} else {
lastExpandedString = std::move(s);
return lastExpandedString;
};

auto pipeline = ad_pipeline::setupParallelPipeline<2, 3, 1, 2, 1>(
100000, creator, expand, normalize, check, compress, push);
while ([[maybe_unused]] auto opt = pipeline.getNextValue()) {
// run to exhaustion
}
LOG(INFO) << "WaitTimes for Pipeline in msecs\n";
for (const auto& t : pipeline.getWaitingTime()) {
LOG(INFO) << t << " msecs\n";
}

} else {
while (std::getline(in, line)) {
_words.push_back(line);
}
}
Expand Down