Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel parsing of clean turtle Files. #466

Merged
merged 6 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/global/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ static const std::string WARNING_ASCII_ONLY_PREFIXES =
"explicitly in TurtleParserMain). This means that the input Turtle data "
"may only use characters from the ASCII range and that no escape sequences "
"may be used in prefixed names (e.g., rdfs:label\\,el is not allowed). "
" Additionally, multiline literals are not allowed and triples have to end "
"at line boundaries (The regex \". *\\n\" must safely identify the end of "
"a triple)."
"This is stricter than the SPARQL standard but makes parsing faster. It "
"works for many Turtle dumps, e.g. that from Wikidata.";
static const std::string LOCALE_DEFAULT_LANG = "en";
Expand Down
2 changes: 1 addition & 1 deletion src/index/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ add_library(index
Index.h Index.cpp Index.Text.cpp
Vocabulary.h Vocabulary.cpp
VocabularyGenerator.h VocabularyGeneratorImpl.h
ConstantsIndexCreation.h
ConstantsIndexBuilding.h
ExternalVocabulary.h ExternalVocabulary.cpp
IndexMetaData.h IndexMetaDataImpl.h
MetaDataTypes.h MetaDataTypes.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,17 @@ static const std::string TMP_BASENAME_COMPRESSION = ".tmp.compression_index";
// is a good value. On systems with very few CPUs, a lower value might be
// beneficial.
constexpr size_t NUM_PARALLEL_ITEM_MAPS = 4;

// The number of threads that are parsing in parallel, when the parallel Turtle
// parser is used.
constexpr size_t NUM_PARALLEL_PARSER_THREADS = 5;

// Increasing the following two constants increases the RAM usage without much
// benefit to the performance.

// The number of unparsed blocks of triples, that may wait for parsing at the
// same time
constexpr size_t QUEUE_SIZE_BEFORE_PARALLEL_PARSING = 10;
// The number of parsed blocks of triples, that may wait for parsing at the same
// time
constexpr size_t QUEUE_SIZE_AFTER_PARALLEL_PARSING = 10;
2 changes: 1 addition & 1 deletion src/index/CreatePatternsMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#include "../util/File.h"
#include "../util/ReadableNumberFact.h"
#include "../util/StringUtils.h"
#include "./ConstantsIndexCreation.h"
#include "./ConstantsIndexBuilding.h"
#include "./Index.h"

using std::cerr;
Expand Down
6 changes: 3 additions & 3 deletions src/index/Index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ void Index::createFromFile(const string& filename) {
if constexpr (std::is_same_v<std::decay_t<Parser>, TurtleParserDummy>) {
if (_onlyAsciiTurtlePrefixes) {
LOG(INFO) << "Using the CTRE library for Tokenization\n";
vocabData =
createIdTriplesAndVocab<TurtleStreamParser<TokenizerCtre>>(filename);
vocabData = createIdTriplesAndVocab<TurtleParallelParser<TokenizerCtre>>(
filename);
} else {
LOG(INFO) << "Using the Google Re2 library for Tokenization\n";
vocabData =
createIdTriplesAndVocab<TurtleStreamParser<Tokenizer>>(filename);
createIdTriplesAndVocab<TurtleParallelParser<Tokenizer>>(filename);
}

} else {
Expand Down
2 changes: 1 addition & 1 deletion src/index/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include "../util/HashMap.h"
#include "../util/MmapVector.h"
#include "../util/Timer.h"
#include "./ConstantsIndexCreation.h"
#include "./ConstantsIndexBuilding.h"
#include "./DocsDB.h"
#include "./IndexBuilderTypes.h"
#include "./IndexMetaData.h"
Expand Down
2 changes: 1 addition & 1 deletion src/index/IndexBuilderMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include "../util/File.h"
#include "../util/ReadableNumberFact.h"
#include "../util/StringUtils.h"
#include "./ConstantsIndexCreation.h"
#include "./ConstantsIndexBuilding.h"
#include "./Index.h"

using std::cerr;
Expand Down
2 changes: 1 addition & 1 deletion src/index/IndexBuilderTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "../util/Conversions.h"
#include "../util/HashMap.h"
#include "../util/TupleHelpers.h"
#include "./ConstantsIndexCreation.h"
#include "./ConstantsIndexBuilding.h"
#include "./StringSortComparator.h"

#ifndef QLEVER_INDEXBUILDERTYPES_H
Expand Down
2 changes: 1 addition & 1 deletion src/index/Vocabulary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "../util/File.h"
#include "../util/HashMap.h"
#include "../util/HashSet.h"
#include "./ConstantsIndexCreation.h"
#include "./ConstantsIndexBuilding.h"

using std::string;

Expand Down
2 changes: 1 addition & 1 deletion src/index/VocabularyGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include "../global/Id.h"
#include "../util/HashMap.h"
#include "../util/MmapVector.h"
#include "./ConstantsIndexCreation.h"
#include "./ConstantsIndexBuilding.h"
#include "./IndexBuilderTypes.h"
#include "Vocabulary.h"

Expand Down
2 changes: 1 addition & 1 deletion src/index/VocabularyGeneratorImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "../util/Exception.h"
#include "../util/HashMap.h"
#include "../util/Log.h"
#include "./ConstantsIndexCreation.h"
#include "./ConstantsIndexBuilding.h"
#include "./Vocabulary.h"
#include "./VocabularyGenerator.h"

Expand Down
2 changes: 1 addition & 1 deletion src/parser/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ add_library(parser
ContextFileParser.cpp ContextFileParser.h
ParallelParseBuffer.h
PropertyPathParser.h PropertyPathParser.cpp
SparqlLexer.h SparqlLexer.cpp TokenizerCtre.h TurtleTokenId.h)
SparqlLexer.h SparqlLexer.cpp TokenizerCtre.h TurtleTokenId.h ParallelBuffer.cpp)
target_link_libraries(parser rdfEscaping re2 absl::flat_hash_map)

add_subdirectory(sparqlParser)
112 changes: 112 additions & 0 deletions src/parser/ParallelBuffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2021, University of Freiburg, Chair of Algorithms and Data
// Structures. Author: Johannes Kalmbach <kalmbacj@cs.uni-freiburg.de>

#include "./ParallelBuffer.h"

// _________________________________________________________________________
void ParallelFileBuffer::open(const string& filename) {
_file.open(filename, "r");
_eof = false;
_buf.resize(_blocksize);
auto task = [&file = this->_file, bs = this->_blocksize,
&buf = this->_buf]() { return file.read(buf.data(), bs); };
_fut = std::async(task);
}

// ___________________________________________________________________________
std::optional<std::vector<char>> ParallelFileBuffer::getNextBlock() {
if (_eof) {
return std::nullopt;
}

AD_CHECK(_file.isOpen() && _fut.valid());
auto numBytesRead = _fut.get();
if (numBytesRead == 0) {
_eof = true;
return std::nullopt;
}
_buf.resize(numBytesRead);
std::optional<std::vector<char>> ret = std::move(_buf);

_buf.resize(_blocksize);
auto getNextBlock = [&file = this->_file, bs = this->_blocksize,
&buf = this->_buf]() {
return file.read(buf.data(), bs);
};
_fut = std::async(getNextBlock);

return ret;
}

// ____________________________________________________________________________
std::optional<size_t> ParallelBufferWithEndRegex::findRegexNearEnd(
const std::vector<char>& vec, const re2::RE2& regex) {
size_t chunkSize = 1000;
size_t inputSize = vec.size();
re2::StringPiece regexResult;
bool match = false;
while (true) {
if (chunkSize >= inputSize) {
break;
}

auto startIdx = inputSize - chunkSize;
auto regexInput = re2::StringPiece{vec.data() + startIdx, chunkSize};

match = RE2::PartialMatch(regexInput, regex, &regexResult);
if (match) {
break;
}

if (chunkSize == inputSize - 1) {
break;
}
chunkSize = std::min(chunkSize * 2, inputSize - 1);
}
if (!match) {
return std::nullopt;
}

// regexResult.data() is a pointer to the beginning of the match, vec.data()
// is a pointer to the beginning of the total input.
return regexResult.data() + regexResult.size() - vec.data();
}

// _____________________________________________________________________________
std::optional<std::vector<char>> ParallelBufferWithEndRegex::getNextBlock() {
auto rawInput = _rawBuffer.getNextBlock();
if (!rawInput || _exhausted) {
_exhausted = true;
if (_remainder.empty()) {
return std::nullopt;
}
auto copy = std::move(_remainder);
// The C++ standard does not require that _remainder is empty after the
// move, but we need it to be empty to make the logic above work.
_remainder.clear();
return copy;
}

auto endPosition = findRegexNearEnd(rawInput.value(), _endRegex);
if (!endPosition) {
if (_rawBuffer.getNextBlock()) {
throw std::runtime_error(
"The regex which marks the end of a statement was not found at "
"all within a single batch that was not the last one. Please "
"increase the FILE_BUFFER_SIZE "
"or choose a different parser");
}
// This was the last (possibly incomplete) block, simply concatenate
endPosition = rawInput->size();
_exhausted = true;
}
std::vector<char> result;
result.reserve(_remainder.size() + *endPosition);
result.insert(result.end(), _remainder.begin(), _remainder.end());
result.insert(result.end(), rawInput->begin(),
rawInput->begin() + *endPosition);
_remainder.clear();
_remainder.insert(_remainder.end(), rawInput->begin() + *endPosition,
rawInput->end());
return result;
}
59 changes: 34 additions & 25 deletions src/parser/ParallelBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
//

#pragma once
#include <re2/re2.h>

#include <future>
#include <optional>
#include <string>
Expand Down Expand Up @@ -44,6 +46,8 @@ class ParallelBuffer {
*/
virtual std::optional<std::vector<char>> getNextBlock() = 0;

const size_t& blocksize() const { return _blocksize; }

protected:
size_t _blocksize = 100 * (2 << 20);
};
Expand All @@ -60,37 +64,42 @@ class ParallelFileBuffer : public ParallelBuffer {
ParallelFileBuffer() : ParallelBuffer(){};
ParallelFileBuffer(size_t blocksize) : ParallelBuffer(blocksize) {}

virtual void open(const string& filename) override {
_file.open(filename, "r");
_eof = false;
_buf.resize(_blocksize);
auto task = [&file = this->_file, bs = this->_blocksize,
&buf = this->_buf]() { return file.read(buf.data(), bs); };
_fut = std::async(task);
}
// _________________________________________________________________________
virtual void open(const string& filename) override;

// _____________________________________________________
virtual std::optional<std::vector<char>> getNextBlock() override {
if (_eof) {
return std::nullopt;
}

AD_CHECK(_file.isOpen() && _fut.valid());
auto numBytesRead = _fut.get();
_buf.resize(numBytesRead);
std::optional<std::vector<char>> ret = std::move(_buf);

_buf.resize(_blocksize);
auto task = [&file = this->_file, bs = this->_blocksize,
&buf = this->_buf]() { return file.read(buf.data(), bs); };
_fut = std::async(task);

return ret;
}
virtual std::optional<std::vector<char>> getNextBlock() override;

private:
ad_utility::File _file;
bool _eof = true;
std::vector<char> _buf;
std::future<size_t> _fut;
};

/// A parallel buffer, where each of the blocks except for the last one has to
/// end with a certain regex (e.g. a full stop followed by whitespace and a
/// newline to denote the end of a triple in a .ttl file).
class ParallelBufferWithEndRegex : public ParallelBuffer {
public:
ParallelBufferWithEndRegex(size_t blocksize, std::string endRegex)
: ParallelBuffer{blocksize}, _endRegex{endRegex} {}

// __________________________________________________________________________
std::optional<std::vector<char>> getNextBlock() override;

// Open the file from which the blocks are read.
void open(const string& filename) override { _rawBuffer.open(filename); }

private:
// Find `regex` near the end of `vec` by searching in blocks of 1000, 2000,
// 4000... bytes. We have to do this, because "reverse" regex matching is not
// trivial. Returns the st Returns the number of bytes in `vec` until the end
// of the regex match, or std::nullopt if the regex was not found at all.
static std::optional<size_t> findRegexNearEnd(const std::vector<char>& vec,
const re2::RE2& regex);
ParallelFileBuffer _rawBuffer{_blocksize};
std::vector<char> _remainder;
re2::RE2 _endRegex;
bool _exhausted = false;
};
19 changes: 19 additions & 0 deletions src/parser/ParallelParseBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,25 @@ class ParserBatcher {
}
}

// The second requires evaluates to `true` only if the `Parser` type has a
// getBatch() member function. The first requires enables this function only
// if the second "requires" evaluates to true
std::optional<std::vector<Triple>> getBatch() requires requires(Parser p) {
p.getBatch();
}
{
if (m_numTriplesAlreadyParsed >= m_maxNumTriples) {
return std::nullopt;
}
auto opt = m_parser->getBatch();
if (!opt) {
m_exhaustedCallback();
} else {
m_numTriplesAlreadyParsed += opt->size();
}
return opt;
}

std::shared_ptr<Parser> m_parser;
size_t m_maxNumTriples;
size_t m_numTriplesAlreadyParsed = 0;
Expand Down