Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster Index Building. #467

Merged
merged 7 commits into from
Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/index/ConstantsIndexBuilding.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ static const size_t PARSER_BATCH_SIZE = 1000000;
static const size_t PARSER_MIN_TRIPLES_AT_ONCE = 100000;

// When reading from a file, Chunks of this size will
// be fed to the parser at once (100 << 20 is exactly 100 MiB
static const size_t FILE_BUFFER_SIZE = 100 << 20;
// be fed to the parser at once (100 MiB)
static const size_t FILE_BUFFER_SIZE = 100 * (1ul << 20);

// When the BZIP2 parser encouters a parsing exception it will increase its
// buffer and try again (we have no other way currently to determine if the
Expand All @@ -55,11 +55,11 @@ static const std::string PARTIAL_MMAP_IDS = ".partial-ids-mmap";
static const std::string TMP_BASENAME_COMPRESSION = ".tmp.compression_index";

// _________________________________________________________________
// The degree of parallelism that is used for IndexBuilding step where the
// unique elements of the vocabulary are identified via hash maps. Typically, 4
// The degree of parallelism that is used for the index building step, where the
// unique elements of the vocabulary are identified via hash maps. Typically, 6
// is a good value. On systems with very few CPUs, a lower value might be
// beneficial.
constexpr size_t NUM_PARALLEL_ITEM_MAPS = 4;
constexpr size_t NUM_PARALLEL_ITEM_MAPS = 6;

// The number of threads that are parsing in parallel, when the parallel Turtle
// parser is used.
Expand Down
53 changes: 37 additions & 16 deletions src/index/Index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ VocabularyData Index::passFileForVocabulary(const string& filename,
size_t linesPerPartial) {
auto parser = std::make_shared<Parser>(filename);
std::unique_ptr<TripleVec> idTriples(new TripleVec());
TripleVec::bufwriter_type writer(*idTriples);
ad_utility::Synchronized<TripleVec::bufwriter_type> writer(*idTriples);
bool parserExhausted = false;

size_t i = 0;
Expand All @@ -152,7 +152,9 @@ VocabularyData Index::passFileForVocabulary(const string& filename,
// we add extra triples
std::vector<size_t> actualPartialSizes;

std::future<void> sortFuture;
// Each of these futures corresponds to the processing and writing of one
// batch of triples and partial vocabulary.
std::array<std::future<void>, 3> writePartialVocabularyFuture;
while (!parserExhausted) {
size_t actualCurrentPartialSize = 0;

Expand All @@ -162,7 +164,7 @@ VocabularyData Index::passFileForVocabulary(const string& filename,
std::array<ItemMapManager, NUM_PARALLEL_ITEM_MAPS> itemArray;

{
auto p = ad_pipeline::setupParallelPipeline<1, NUM_PARALLEL_ITEM_MAPS>(
auto p = ad_pipeline::setupParallelPipeline<3, NUM_PARALLEL_ITEM_MAPS>(
_parserBatchSize,
// when called, returns an optional to the next triple. If
// `linesPerPartial` triples were parsed, return std::nullopt. when
Expand Down Expand Up @@ -210,27 +212,40 @@ VocabularyData Index::passFileForVocabulary(const string& filename,
// to control the number of threads and the amount of memory used at the
// same time. typically sorting is finished before we reach again here so
// it is not a bottleneck.
if (sortFuture.valid()) {
sortFuture.get();
ad_utility::Timer sortFutureTimer;
sortFutureTimer.start();
if (writePartialVocabularyFuture[0].valid()) {
writePartialVocabularyFuture[0].get();
}
sortFutureTimer.stop();
LOG(TIMING)
<< "Time spent waiting for the writing of a previous vocabulary: "
<< sortFutureTimer.msecs() << "ms." << std::endl;
std::array<ItemMap, NUM_PARALLEL_ITEM_MAPS> convertedMaps;
for (size_t j = 0; j < NUM_PARALLEL_ITEM_MAPS; ++j) {
convertedMaps[j] = std::move(itemArray[j]).moveMap();
}
auto oldItemPtr = std::make_unique<ItemMapArray>(std::move(convertedMaps));
sortFuture = writeNextPartialVocabulary(
i, numFiles, actualCurrentPartialSize, std::move(oldItemPtr),
std::move(localIdTriples), &writer);
for (auto it = writePartialVocabularyFuture.begin() + 1;
it < writePartialVocabularyFuture.end(); ++it) {
*(it - 1) = std::move(*it);
}
writePartialVocabularyFuture[writePartialVocabularyFuture.size() - 1] =
writeNextPartialVocabulary(i, numFiles, actualCurrentPartialSize,
std::move(oldItemPtr),
std::move(localIdTriples), &writer);
numFiles++;
// Save the information how many triples this partial vocabulary actually
// deals with we will use this later for mapping from partial to global
// ids
actualPartialSizes.push_back(actualCurrentPartialSize);
}
if (sortFuture.valid()) {
sortFuture.get();
for (auto& future : writePartialVocabularyFuture) {
if (future.valid()) {
future.get();
}
}
writer.finish();
writer.wlock()->finish();
LOG(INFO) << "Pass done." << endl;

if (_vocabPrefixCompressed) {
Expand Down Expand Up @@ -1526,7 +1541,7 @@ void Index::initializeVocabularySettingsBuild() {
std::future<void> Index::writeNextPartialVocabulary(
size_t numLines, size_t numFiles, size_t actualCurrentPartialSize,
std::unique_ptr<ItemMapArray> items, std::unique_ptr<TripleVec> localIds,
TripleVec::bufwriter_type* globalWritePtr) {
ad_utility::Synchronized<TripleVec::bufwriter_type>* globalWritePtr) {
LOG(INFO) << "Lines (from KB-file) processed: " << numLines << '\n';
LOG(INFO) << "Actual number of Triples in this section (include "
"langfilter triples): "
Expand All @@ -1540,7 +1555,7 @@ std::future<void> Index::writeNextPartialVocabulary(

auto lambda = [localIds = std::move(localIds), globalWritePtr,
items = std::move(items), vocab = &_vocab, partialFilename,
partialCompressionFilename,
partialCompressionFilename, numFiles,
vocabPrefixCompressed = _vocabPrefixCompressed]() mutable {
auto vec = vocabMapsToVector(std::move(items));
const auto identicalPred = [&c = vocab->getCaseComparator()](
Expand All @@ -1562,9 +1577,15 @@ std::future<void> Index::writeNextPartialVocabulary(
return a.second.m_id == b.second.m_id;
}),
vec.end());
LOG(INFO) << "Removed " << sz - vec.size()
<< " Duplicates from the local partial vocabularies\n";
writeMappedIdsToExtVec(*localIds, mapping, globalWritePtr);
LOG(TRACE) << "Removed " << sz - vec.size()
<< " Duplicates from the local partial vocabularies\n";
// The writing to the STXXL vector has to be done in order, to
// make the update from local to global ids work.
globalWritePtr->withWriteLockAndOrdered(
[&](auto& writerPtr) {
writeMappedIdsToExtVec(*localIds, mapping, &writerPtr);
},
numFiles);
writePartialVocabularyToFile(vec, partialFilename);
if (vocabPrefixCompressed) {
// sort according to the actual byte values
Expand Down
2 changes: 1 addition & 1 deletion src/index/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ class Index {
std::future<void> writeNextPartialVocabulary(
size_t numLines, size_t numFiles, size_t actualCurrentPartialSize,
std::unique_ptr<ItemMapArray> items, std::unique_ptr<TripleVec> localIds,
TripleVec::bufwriter_type* globalWritePtr);
ad_utility::Synchronized<TripleVec::bufwriter_type>* globalWritePtr);

void convertPartialToGlobalIds(TripleVec& data,
const vector<size_t>& actualLinesPerPartial,
Expand Down
4 changes: 3 additions & 1 deletion src/index/IndexBuilderTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ using ItemVec = std::vector<std::pair<std::string, IdAndSplitVal>>;
* Ids are assigned in an adjacent range starting with a configurable
* minimum Id. That way multiple maps can be used with non overlapping ranges.
*/
struct ItemMapManager {
// Align each ItemMapManager on its own cache line to avoid false sharing.
struct alignas(256) ItemMapManager {
/// Construct by assigning the minimum Id that shall be returned by the map
explicit ItemMapManager(Id minId, const TripleComponentComparator* cmp)
: _map(), _minId(minId), m_comp(cmp) {}
Expand Down Expand Up @@ -111,6 +112,7 @@ auto getIdMapLambdas(std::array<ItemMapManager, Parallelism>* itemArrayPtr,
LANGUAGE_PREDICATE); // not really needed here, but also not harmful
// and needed to make some completely unrelated
// unit tests pass.
itemArray[j]._map.reserve(2 * maxNumberOfTriples);
}
using OptionalIds = std::array<std::optional<std::array<Id, 3>>, 3>;

Expand Down
22 changes: 22 additions & 0 deletions src/util/OnDestruction.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2021, University of Freiburg, Chair of Algorithms and Data
// Structures. Author: Johannes Kalmbach <kalmbacj@cs.uni-freiburg.de>

#ifndef QLEVER_ONDESTRUCTION_H
#define QLEVER_ONDESTRUCTION_H

namespace ad_utility {

/// A simple type that executes a specified action at the time it is destroyed
/// F must be callable without arguments, return void and be noexcept.
template <typename F>
requires std::is_nothrow_invocable_r_v<void, F> class OnDestruction {
private:
F f_;

public:
OnDestruction(F f) : f_{std::move(f)} {}
~OnDestruction() { f_(); }
};
} // namespace ad_utility

#endif // QLEVER_ONDESTRUCTION_H
47 changes: 36 additions & 11 deletions src/util/Synchronized.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <atomic>
#include <shared_mutex>

#include "./OnDestruction.h"

namespace ad_utility {

/// Does type M have a lock() and unlock() member function (behaves like a
Expand Down Expand Up @@ -84,7 +86,7 @@ class Synchronized {
/// Constructor that is not copy or move, tries to instantiate the underlying
/// type via perfect forwarding (this includes the default constructor)
template <typename... Args>
Synchronized(Args&&... args) : t_{std::forward<Args>(args)...}, m_{} {}
Synchronized(Args&&... args) : data_{std::forward<Args>(args)...}, m_{} {}

/** @brief Obtain an exclusive lock and then call f() on the underlying data
* type, return the result.
Expand All @@ -96,14 +98,33 @@ class Synchronized {
template <typename F>
auto withWriteLock(F f) {
std::lock_guard l(m_);
return f(t_);
return f(data_);
}

/// const overload of with WriteLock
template <typename F>
auto withWriteLock(F f) const {
std::lock_guard l(m_);
return f(t_);
return f(data_);
}

/// Similar to `withWriteLock`, but additionally guarantees that the request
/// with requestNumber 0 is performed first, then comes requestNumber 1 etc.
/// If a request number in the range [0...k] is missing, then the program will
/// deadlock. See `/src/index/Index.cpp` for an example.
template <typename F>
auto withWriteLockAndOrdered(F f, size_t requestNumber) {
std::unique_lock l(m_);
// It is important to create this AFTER the lock, s.t. the
// nextOrderedRequest_ update is still protected. We must give it a name,
// s.t. it is not destroyed immediately.
OnDestruction od{[&]() mutable noexcept {
++nextOrderedRequest_;
l.unlock();
requestCv_.notify_all();
}};
requestCv_.wait(l, [&]() { return requestNumber == nextOrderedRequest_; });
return f(data_);
}

/** @brief Obtain a shared lock and then call f() on the underlying data type,
Expand All @@ -119,7 +140,7 @@ class Synchronized {
typename Res = std::invoke_result_t<F, const T&>>
std::enable_if_t<s, Res> withReadLock(F f) const {
std::shared_lock l(m_);
return f(t_);
return f(data_);
}

/**
Expand Down Expand Up @@ -179,11 +200,15 @@ class Synchronized {
};

private:
T t_; // the actual payload
mutable Mutex m_; // the used mutex
T data_; // The data to which we synchronize the access.
mutable Mutex m_; // The used mutex

// These are used for the withWriteLockAndOrdered function
size_t nextOrderedRequest_ = 0;
std::condition_variable_any requestCv_;

template <class S, bool b, bool c>
friend class LockPtr; // the LockPtr implementation requires private access
friend class LockPtr; // The LockPtr implementation requires private access
};

/// handle to a locked Synchronized class
Expand Down Expand Up @@ -224,21 +249,21 @@ class LockPtr {
/// locks that are not const.
template <bool s = isConst>
std::enable_if_t<!s, value_type&> operator*() {
return s_->t_;
return s_->data_;
}

/// Access to underlying data.
const value_type& operator*() const { return s_->t_; }
const value_type& operator*() const { return s_->data_; }

/// Access to underlying data. Non const access is only allowed for exclusive
/// locks that are not const.
template <bool s = isConst>
std::enable_if_t<!s, value_type*> operator->() {
return &s_->t_;
return &s_->data_;
}

/// Access to underlying data.
const value_type* operator->() const { return &s_->t_; }
const value_type* operator->() const { return &s_->data_; }

private:
ptr_type s_;
Expand Down