Skip to content

Commit

Permalink
Implemented BufferedVector (std::vector/MmapVector hybrid)
Browse files Browse the repository at this point in the history
- Dynamic Array that handles small and big data efficiently as well
- internally holds a std::vector and an MmapVector
- As long as the vector is small, the std::vector is used (fast)
- If the size goes beyond a user-defined threshold the data is shifted
to the MmapVector (probably slower, but saves RAM)

- We use this while creating the Single relations for the permutations.
- Most of them are really small but previously some problems occured
with big relations causing RAM problems in this step.
- The BufferedVector efficiently deals with this problem
  • Loading branch information
joka921 committed Dec 16, 2018
1 parent 916432f commit 441f999
Show file tree
Hide file tree
Showing 15 changed files with 496 additions and 155 deletions.
6 changes: 3 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[submodule "third_party/stxxl"]
path = third_party/stxxl
url = https://github.com/bingmann/stxxl
[submodule "third_party/googletest"]
path = third_party/googletest
url = https://github.com/google/googletest.git
Expand All @@ -7,6 +10,3 @@
[submodule "third_party/re2"]
path = third_party/re2
url = https://github.com/google/re2.git
[submodule "third_party/stxxl"]
path = third_party/stxxl
url = https://github.com/bingmann/stxxl
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ include_directories(third_party/json/include/)
################################
# Disable GNU parallel as it prevents build on Ubuntu 14.04
set(USE_GNU_PARALLEL OFF CACHE BOOL "Don't use gnu parallel" FORCE)
set(USE_OPENMP OFF CACHE BOOL "Don't use OpenMP" FORCE)
set(USE_OPENMP OFF CACHE BOOL "Don't use OPENMP as default" FORCE)
add_subdirectory(third_party/stxxl)
# apply STXXL CXXFLAGS
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${STXXL_CXX_FLAGS}")
Expand Down Expand Up @@ -161,5 +161,6 @@ add_test(HashMapTest test/HashMapTest)
add_test(HashSetTest test/HashSetTest)
add_test(VocabularyGeneratorTest test/VocabularyGeneratorTest)
add_test(MmapVectorTest test/MmapVectorTest)
add_test(BuferedVectorTest test/BufferedVectorTest)
add_test(TokenTest test/TokenTest)
add_test(TurtleParserTest test/TurtleParserTest)
2 changes: 1 addition & 1 deletion src/global/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,4 @@ static constexpr uint8_t NUM_COMPRESSION_PREFIXES = 127;
// if this is the first character of a compressed string, this means that no
// compression has been applied to a word
static const uint8_t NO_PREFIX_CHAR =
MIN_COMPRESSION_PREFIX + NUM_COMPRESSION_PREFIXES;
MIN_COMPRESSION_PREFIX + NUM_COMPRESSION_PREFIXES;
8 changes: 8 additions & 0 deletions src/index/ConstantsIndexCreation.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ const std::string EXTERNAL_LITS_TEXT_FILE_NAME = ".externalized-text";
// Reduce to save RAM
static const int NUM_TRIPLES_PER_PARTIAL_VOCAB = 100000000;

// How many Triples is the Buffer supposed to parse ahead.
// If too big, the memory consumption is high, if too low we possibly lose speed
static const size_t PARSER_BATCH_SIZE = 10000000;

// If a single relation has more than this number of triples, it will be
// buffered into an MmapVector during the creation of the relations;
static const size_t THRESHOLD_RELATION_CREATION = 2 << 20;

// ________________________________________________________________
static const std::string PARTIAL_VOCAB_FILE_NAME = ".partial-vocabulary";
static const std::string PARTIAL_MMAP_IDS = ".partial-ids-mmap";
204 changes: 97 additions & 107 deletions src/index/Index.cpp

Large diffs are not rendered by default.

35 changes: 22 additions & 13 deletions src/index/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "../parser/NTriplesParser.h"
#include "../parser/TsvParser.h"
#include "../parser/TurtleParser.h"
#include "../util/BufferedVector.h"
#include "../util/File.h"
#include "../util/HashMap.h"
#include "../util/MmapVector.h"
Expand All @@ -28,6 +29,7 @@
#include "./TextMetaData.h"
#include "./Vocabulary.h"

using ad_utility::BufferedVector;
using ad_utility::MmapVector;
using ad_utility::MmapVectorView;
using std::array;
Expand All @@ -37,7 +39,6 @@ using std::vector;

using json = nlohmann::json;

// u sing IdPairMMapVec = ad_utility::MmapVector<std::array<Id, 2>>;
// a simple struct for better naming
struct LinesAndWords {
typedef stxxl::vector<array<Id, 3>> ExtVec;
Expand Down Expand Up @@ -362,9 +363,6 @@ class Index {
*/
CompactStringVector<Id, Id> _hasPredicate;

size_t passTsvFileForVocabulary(const string& tsvFile);

void passTsvFileIntoIdVector(const string& tsvFile, ExtVec& data);

// Create Vocabulary and directly write it to disk. Create ExtVec which can be
// used for creating permutations
Expand All @@ -377,14 +375,11 @@ class Index {
template <class Parser>
LinesAndWords passFileForVocabulary(const string& ntFile,
size_t linesPerPartial = 100000000);
template <class Parser>
static std::pair<bool, std::vector<array<string, 3>>> parseBatch(
Parser* parser, size_t maxLines);

template <class Parser>
void passFileIntoIdVector(ExtVec& data,
const vector<size_t>& actualLinesPerPartial,
size_t linesPerPartial);
void convertPartialToGlobalIds(ExtVec &data,
const vector<size_t> &actualLinesPerPartial,
size_t linesPerPartial);

// ___________________________________________________________________________
template <class Map>
Expand Down Expand Up @@ -482,14 +477,15 @@ class Index {
// Careful: only multiplicity for first column is valid in return value
static pair<FullRelationMetaData, BlockBasedRelationMetaData> writeRel(
ad_utility::File& out, off_t currentOffset, Id relId,
const MmapVector<array<Id, 2>>& data, size_t distinctC1, bool functional);
const BufferedVector<array<Id, 2>>& data, size_t distinctC1,
bool functional);

static void writeFunctionalRelation(
const MmapVector<array<Id, 2>>& data,
const BufferedVector<array<Id, 2>>& data,
pair<FullRelationMetaData, BlockBasedRelationMetaData>& rmd);

static void writeNonFunctionalRelation(
ad_utility::File& out, const MmapVector<array<Id, 2>>& data,
ad_utility::File& out, const BufferedVector<array<Id, 2>>& data,
pair<FullRelationMetaData, BlockBasedRelationMetaData>& rmd);

void openFileHandles();
Expand Down Expand Up @@ -579,4 +575,17 @@ class Index {

// initialize the index-build-time settings for the vocabulary
void initializeVocabularySettingsBuild();


// Helper function for Debugging during the index build.
// ExtVecs are not persistent, so we dump them to a mmapVector in a file with given
// filename
static void dumpExtVecToMmap(const ExtVec& vec, std::string filename) {
LOG(INFO) << "Dumping ext vec to mmap" << std::endl;
MmapVector<ExtVec::value_type> mmapVec(vec.size(), filename);
for (size_t i = 0; i < vec.size(); ++i) {
mmapVec[i] = vec[i];
}
LOG(INFO) << "Done" << std::endl;
}
};
40 changes: 17 additions & 23 deletions src/index/VocabularyGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
#include "./ConstantsIndexCreation.h"
#include "./Vocabulary.h"

// helper struct used in the priority queue for merging.
// represents tokens/words in a certain partial vocabular
struct QueueValue {
QueueValue() = default;
QueueValue(const string& v, size_t file, Id word)
: _value(v), _partialFileId(file), _partialWordId(word) {}
string _value;
size_t _partialFileId;
Id _partialWordId;
string _value; // the word
size_t _partialFileId; // from which partial vocabulary did this word come
Id _partialWordId; // which partial id did the word have in this partial vocabulary
};

// we sort alphabetically by the token
class QueueCompare {
public:
bool operator()(const QueueValue& p1, const QueueValue& p2) {
Expand All @@ -37,22 +40,26 @@ class QueueCompare {
// ___________________________________________________________________
size_t mergeVocabulary(const std::string& basename, size_t numFiles) {
std::vector<std::fstream> infiles;

// we will store pairs of <partialId, globalId>
std::vector<IdPairMMapVec> idVecs;
std::ofstream outfile(basename + ".vocabulary");
AD_CHECK(outfile.is_open());
std::ofstream outfileExternal(basename + EXTERNAL_LITS_TEXT_FILE_NAME);
AD_CHECK(outfileExternal.is_open());
std::vector<bool> endOfFile(numFiles, false);

ad_utility::HashMap<string, Id> langtagMap;

// Priority queue for the k-way merge
std::priority_queue<QueueValue, std::vector<QueueValue>, QueueCompare> queue;

// open and prepare all infiles and mmap output vectors
for (size_t i = 0; i < numFiles; i++) {
infiles.emplace_back(basename + PARTIAL_VOCAB_FILE_NAME + std::to_string(i),
std::ios_base::in | std::ios_base::out);
idVecs.emplace_back(0, basename + PARTIAL_MMAP_IDS + std::to_string(i));
AD_CHECK(infiles.back().is_open());

// read the first entry of the vocabulary and add it to the queue
endOfFile[i] = true;

uint32_t len;
Expand All @@ -66,7 +73,10 @@ size_t mergeVocabulary(const std::string& basename, size_t numFiles) {
}
}

// keep track of the last seen word to correctly handle duplicates
std::string lastWritten = "";
// the number of words we have written. This also is the global Id of the next word we see,
// unless it is is equal to the previous word
size_t totalWritten = 0;

// start k-way merge
Expand All @@ -78,6 +88,7 @@ size_t mergeVocabulary(const std::string& basename, size_t numFiles) {
if (top._value != lastWritten) {
lastWritten = top._value;

// write the new word to the vocabulary
if (top._value < string({EXTERNALIZED_LITERALS_PREFIX})) {
outfile << top._value << std::endl;
} else {
Expand All @@ -91,7 +102,7 @@ size_t mergeVocabulary(const std::string& basename, size_t numFiles) {
totalWritten++;
} else {
// this is a duplicate which already occured in another partial vocabulary
// in the last step
// in the last step.
// we already have increased total written, so for the duplicate
// we have to subtract one again
size_t minusOne = totalWritten - 1;
Expand Down Expand Up @@ -119,23 +130,6 @@ size_t mergeVocabulary(const std::string& basename, size_t numFiles) {
return totalWritten;
}

// ____________________________________________________________________________________________
ad_utility::HashMap<string, Id> vocabMapFromPartialIndexedFile(
const string& partialFile) {
std::ifstream file(partialFile, std::ios_base::binary);
AD_CHECK(file.is_open());
ad_utility::HashMap<string, Id> vocabMap;
uint32_t len;
while (file.read((char*)&len, sizeof(len))) {
std::string word(len, '\0');
file.read(&(word[0]), len);
size_t idx;
file.read((char*)&idx, sizeof(idx));
vocabMap[word] = idx;
}
return vocabMap;
}

// ______________________________________________________________________________________________
void writePartialIdMapToBinaryFileForMerging(
const ad_utility::HashMap<string, Id>& map, const string& fileName) {
Expand Down
5 changes: 0 additions & 5 deletions src/index/VocabularyGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ using std::string;
// Returns the number of total Words merged
size_t mergeVocabulary(const std::string& basename, size_t numFiles);

// __________________________________________________________________________________________
// read the words and indices from the file and create hash map from it.
ad_utility::HashMap<string, Id> vocabMapFromPartialIndexedFile(
const string& partialFile);

// _________________________________________________________________________________________
void writePartialIdMapToBinaryFileForMerging(
const ad_utility::HashMap<string, Id>& map, const string& fileName);
Expand Down
2 changes: 1 addition & 1 deletion src/index/VocabularyImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ template <typename>
string Vocabulary<S>::expandPrefix(const CompressedString& word) const {
assert(!word.empty());
auto idx = static_cast<uint8_t>(word[0]) - MIN_COMPRESSION_PREFIX;
if (idx < NUM_COMPRESSION_PREFIXES) {
if (idx >= 0 && idx < NUM_COMPRESSION_PREFIXES) {
return _prefixMap[idx] + word.toStringView().substr(1);
} else {
return string(word.toStringView().substr(1));
Expand Down
3 changes: 2 additions & 1 deletion src/parser/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ add_library(parser
NTriplesParser.h NTriplesParser.cpp
TurtleParser.h TurtleParser.cpp
Tokenizer.h Tokenizer.cpp
ContextFileParser.cpp ContextFileParser.h)
ContextFileParser.cpp ContextFileParser.h
ParallelParseBuffer.h)
target_link_libraries(parser re2 -lbz2)
110 changes: 110 additions & 0 deletions src/parser/ParallelParseBuffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2018, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author: Johannes Kalmbach(joka921) <johannes.kalmbach@gmail.com>

#pragma once

#include <array>
#include <future>
#include <string>
#include <vector>
#include "../util/Log.h"

using std::array;
using std::string;
using std::vector;

// A class that holds a parser for a knowledge base file (.nt, .tsv, .ttl, etc)
// and asynchronously retrieves triples from the file.
// Can be used to parallelize the parsing and processing of kb files.
// External interface is similar to a parser, so the usage makes the code simple
template <class Parser>
class ParallelParseBuffer {
public:

// Parse from the file at filename.
// A batch of bufferSize triples is always parsed in parallel
// bigger bufferSizes will increase memory usage whereas smaller sizes might be
// inefficient.
ParallelParseBuffer(size_t bufferSize, const std::string& filename)
: _bufferSize(bufferSize), _p(filename) {
// parse the initial batch, before this we cannot do anything
std::tie(_isParserValid, _buffer) = parseBatch();

// already submit the next batch for parallel computation
if (_isParserValid) {
_fut = std::async(std::launch::async,
&ParallelParseBuffer<Parser>::parseBatch, this);
}
}

// retrieve a single triple from the internal buffer and write it to
// argument spo. Return true in this case. If the buffer is exhausted blocks until the (asynchronous)
// call to parseBatch has finished. Returns false if the parser has completely parsed the file. In this case
// the argument is untouched
bool getline(std::array<string, 3>& spo) {
// Return our triple in the order the parser handles them to us.
// Makes debugging easier.
if (_buffer.size() == _bufferPosition && _isParserValid) {
// we have to wait for the next parallel batch to be completed.
std::tie(_isParserValid, _buffer) = std::move(_fut.get());
_bufferPosition = 0;
if (_isParserValid) {
// if possible, directly submit the next parsing job
_fut = std::async(std::launch::async,
&ParallelParseBuffer<Parser>::parseBatch, this);
}
}

// we now should have some triples in our buffer
if (_bufferPosition < _buffer.size()) {
spo = _buffer[_bufferPosition];
_bufferPosition++;
return true;
} else {
// we can only reach this if the buffer is exhausted and there is nothing
// more to parse
return false;
}
}

private:
const size_t _bufferSize;
// needed for the ringbuffer-style access
size_t _bufferPosition = 0;
Parser _p;
// becomes false when the parser is done. In this case we still have to
// empty our buffer
bool _isParserValid = true;
std::vector<array<string, 3>> _buffer;
// this future handles the asynchronous parser calls
std::future<std::pair<bool, std::vector<array<string, 3>>>> _fut;

// this function extracts _bufferSize many triples from the parser.
// If the bool argument is false, the parser is exhausted and further calls
// to parseBatch are useless. In this case we probably still have some triples
// that were parsed before the parser was done, so we still have to consider these.
std::pair<bool, std::vector<array<string, 3>>> parseBatch() {
LOG(INFO) << "Parsing next batch in parallel" << std::endl;
std::vector<array<string, 3>> buf;
// for small knowledge bases on small systems that fit in one
// batch (e.g. during tests) the reserve may fail which is not bad in this
// case
try {
buf.reserve(_bufferSize);
} catch (const std::bad_alloc& b) {
buf = std::vector<array<string, 3>>();
}
while (buf.size() < _bufferSize) {
buf.emplace_back();
if (!_p.getLine(buf.back())) {
buf.pop_back();
return {false, std::move(buf)};
}
if (buf.size() % 10000000 == 0) {
LOG(INFO) << "Parsed " << buf.size() << " triples." << std::endl;
}
}
return {true, std::move(buf)};
}
};

0 comments on commit 441f999

Please sign in to comment.