Skip to content

Commit

Permalink
Parallel parsing of clean turtle Files. (#466)
Browse files Browse the repository at this point in the history
* Implementation of a parallel Turtle Parser that assumes the following additional constraints on the .ttl format:
** All `PREFIX` and `BASE` declarations appear at the beginning of the .ttl file before any triples
** Whenever the regex `. *\n` matches, we have reached the end of a triple (in the standard .ttl format this could also match in the middle of a multiline literal

* This parallel parser is active in the relaxed parsing mode (`ascii-prefixes-only`)
  • Loading branch information
joka921 committed Sep 8, 2021
1 parent ac5c3b8 commit 766843e
Show file tree
Hide file tree
Showing 18 changed files with 424 additions and 59 deletions.
3 changes: 3 additions & 0 deletions src/global/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ static const std::string WARNING_ASCII_ONLY_PREFIXES =
"explicitly in TurtleParserMain). This means that the input Turtle data "
"may only use characters from the ASCII range and that no escape sequences "
"may be used in prefixed names (e.g., rdfs:label\\,el is not allowed). "
" Additionally, multiline literals are not allowed and triples have to end "
"at line boundaries (The regex \". *\\n\" must safely identify the end of "
"a triple)."
"This is stricter than the SPARQL standard but makes parsing faster. It "
"works for many Turtle dumps, e.g. that from Wikidata.";
static const std::string LOCALE_DEFAULT_LANG = "en";
Expand Down
2 changes: 1 addition & 1 deletion src/index/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ add_library(index
Index.h Index.cpp Index.Text.cpp
Vocabulary.h Vocabulary.cpp
VocabularyGenerator.h VocabularyGeneratorImpl.h
ConstantsIndexCreation.h
ConstantsIndexBuilding.h
ExternalVocabulary.h ExternalVocabulary.cpp
IndexMetaData.h IndexMetaDataImpl.h
MetaDataTypes.h MetaDataTypes.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,17 @@ static const std::string TMP_BASENAME_COMPRESSION = ".tmp.compression_index";
// is a good value. On systems with very few CPUs, a lower value might be
// beneficial.
constexpr size_t NUM_PARALLEL_ITEM_MAPS = 4;

// The number of threads that are parsing in parallel, when the parallel Turtle
// parser is used.
constexpr size_t NUM_PARALLEL_PARSER_THREADS = 5;

// Increasing the following two constants increases the RAM usage without much
// benefit to the performance.

// The number of unparsed blocks of triples, that may wait for parsing at the
// same time
constexpr size_t QUEUE_SIZE_BEFORE_PARALLEL_PARSING = 10;
// The number of parsed blocks of triples, that may wait for parsing at the same
// time
constexpr size_t QUEUE_SIZE_AFTER_PARALLEL_PARSING = 10;
2 changes: 1 addition & 1 deletion src/index/CreatePatternsMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#include "../util/File.h"
#include "../util/ReadableNumberFact.h"
#include "../util/StringUtils.h"
#include "./ConstantsIndexCreation.h"
#include "./ConstantsIndexBuilding.h"
#include "./Index.h"

using std::cerr;
Expand Down
6 changes: 3 additions & 3 deletions src/index/Index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ void Index::createFromFile(const string& filename) {
if constexpr (std::is_same_v<std::decay_t<Parser>, TurtleParserDummy>) {
if (_onlyAsciiTurtlePrefixes) {
LOG(INFO) << "Using the CTRE library for Tokenization\n";
vocabData =
createIdTriplesAndVocab<TurtleStreamParser<TokenizerCtre>>(filename);
vocabData = createIdTriplesAndVocab<TurtleParallelParser<TokenizerCtre>>(
filename);
} else {
LOG(INFO) << "Using the Google Re2 library for Tokenization\n";
vocabData =
createIdTriplesAndVocab<TurtleStreamParser<Tokenizer>>(filename);
createIdTriplesAndVocab<TurtleParallelParser<Tokenizer>>(filename);
}

} else {
Expand Down
2 changes: 1 addition & 1 deletion src/index/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include "../util/HashMap.h"
#include "../util/MmapVector.h"
#include "../util/Timer.h"
#include "./ConstantsIndexCreation.h"
#include "./ConstantsIndexBuilding.h"
#include "./DocsDB.h"
#include "./IndexBuilderTypes.h"
#include "./IndexMetaData.h"
Expand Down
2 changes: 1 addition & 1 deletion src/index/IndexBuilderMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include "../util/File.h"
#include "../util/ReadableNumberFact.h"
#include "../util/StringUtils.h"
#include "./ConstantsIndexCreation.h"
#include "./ConstantsIndexBuilding.h"
#include "./Index.h"

using std::cerr;
Expand Down
2 changes: 1 addition & 1 deletion src/index/IndexBuilderTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "../util/Conversions.h"
#include "../util/HashMap.h"
#include "../util/TupleHelpers.h"
#include "./ConstantsIndexCreation.h"
#include "./ConstantsIndexBuilding.h"
#include "./StringSortComparator.h"

#ifndef QLEVER_INDEXBUILDERTYPES_H
Expand Down
2 changes: 1 addition & 1 deletion src/index/Vocabulary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "../util/File.h"
#include "../util/HashMap.h"
#include "../util/HashSet.h"
#include "./ConstantsIndexCreation.h"
#include "./ConstantsIndexBuilding.h"

using std::string;

Expand Down
2 changes: 1 addition & 1 deletion src/index/VocabularyGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include "../global/Id.h"
#include "../util/HashMap.h"
#include "../util/MmapVector.h"
#include "./ConstantsIndexCreation.h"
#include "./ConstantsIndexBuilding.h"
#include "./IndexBuilderTypes.h"
#include "Vocabulary.h"

Expand Down
2 changes: 1 addition & 1 deletion src/index/VocabularyGeneratorImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "../util/Exception.h"
#include "../util/HashMap.h"
#include "../util/Log.h"
#include "./ConstantsIndexCreation.h"
#include "./ConstantsIndexBuilding.h"
#include "./Vocabulary.h"
#include "./VocabularyGenerator.h"

Expand Down
2 changes: 1 addition & 1 deletion src/parser/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ add_library(parser
ContextFileParser.cpp ContextFileParser.h
ParallelParseBuffer.h
PropertyPathParser.h PropertyPathParser.cpp
SparqlLexer.h SparqlLexer.cpp TokenizerCtre.h TurtleTokenId.h)
SparqlLexer.h SparqlLexer.cpp TokenizerCtre.h TurtleTokenId.h ParallelBuffer.cpp)
target_link_libraries(parser rdfEscaping re2 absl::flat_hash_map)

add_subdirectory(sparqlParser)
112 changes: 112 additions & 0 deletions src/parser/ParallelBuffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2021, University of Freiburg, Chair of Algorithms and Data
// Structures. Author: Johannes Kalmbach <kalmbacj@cs.uni-freiburg.de>

#include "./ParallelBuffer.h"

// _________________________________________________________________________
void ParallelFileBuffer::open(const string& filename) {
_file.open(filename, "r");
_eof = false;
_buf.resize(_blocksize);
auto task = [&file = this->_file, bs = this->_blocksize,
&buf = this->_buf]() { return file.read(buf.data(), bs); };
_fut = std::async(task);
}

// ___________________________________________________________________________
std::optional<std::vector<char>> ParallelFileBuffer::getNextBlock() {
if (_eof) {
return std::nullopt;
}

AD_CHECK(_file.isOpen() && _fut.valid());
auto numBytesRead = _fut.get();
if (numBytesRead == 0) {
_eof = true;
return std::nullopt;
}
_buf.resize(numBytesRead);
std::optional<std::vector<char>> ret = std::move(_buf);

_buf.resize(_blocksize);
auto getNextBlock = [&file = this->_file, bs = this->_blocksize,
&buf = this->_buf]() {
return file.read(buf.data(), bs);
};
_fut = std::async(getNextBlock);

return ret;
}

// ____________________________________________________________________________
std::optional<size_t> ParallelBufferWithEndRegex::findRegexNearEnd(
const std::vector<char>& vec, const re2::RE2& regex) {
size_t chunkSize = 1000;
size_t inputSize = vec.size();
re2::StringPiece regexResult;
bool match = false;
while (true) {
if (chunkSize >= inputSize) {
break;
}

auto startIdx = inputSize - chunkSize;
auto regexInput = re2::StringPiece{vec.data() + startIdx, chunkSize};

match = RE2::PartialMatch(regexInput, regex, &regexResult);
if (match) {
break;
}

if (chunkSize == inputSize - 1) {
break;
}
chunkSize = std::min(chunkSize * 2, inputSize - 1);
}
if (!match) {
return std::nullopt;
}

// regexResult.data() is a pointer to the beginning of the match, vec.data()
// is a pointer to the beginning of the total input.
return regexResult.data() + regexResult.size() - vec.data();
}

// _____________________________________________________________________________
std::optional<std::vector<char>> ParallelBufferWithEndRegex::getNextBlock() {
auto rawInput = _rawBuffer.getNextBlock();
if (!rawInput || _exhausted) {
_exhausted = true;
if (_remainder.empty()) {
return std::nullopt;
}
auto copy = std::move(_remainder);
// The C++ standard does not require that _remainder is empty after the
// move, but we need it to be empty to make the logic above work.
_remainder.clear();
return copy;
}

auto endPosition = findRegexNearEnd(rawInput.value(), _endRegex);
if (!endPosition) {
if (_rawBuffer.getNextBlock()) {
throw std::runtime_error(
"The regex which marks the end of a statement was not found at "
"all within a single batch that was not the last one. Please "
"increase the FILE_BUFFER_SIZE "
"or choose a different parser");
}
// This was the last (possibly incomplete) block, simply concatenate
endPosition = rawInput->size();
_exhausted = true;
}
std::vector<char> result;
result.reserve(_remainder.size() + *endPosition);
result.insert(result.end(), _remainder.begin(), _remainder.end());
result.insert(result.end(), rawInput->begin(),
rawInput->begin() + *endPosition);
_remainder.clear();
_remainder.insert(_remainder.end(), rawInput->begin() + *endPosition,
rawInput->end());
return result;
}
59 changes: 34 additions & 25 deletions src/parser/ParallelBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
//

#pragma once
#include <re2/re2.h>

#include <future>
#include <optional>
#include <string>
Expand Down Expand Up @@ -44,6 +46,8 @@ class ParallelBuffer {
*/
virtual std::optional<std::vector<char>> getNextBlock() = 0;

const size_t& blocksize() const { return _blocksize; }

protected:
size_t _blocksize = 100 * (2 << 20);
};
Expand All @@ -60,37 +64,42 @@ class ParallelFileBuffer : public ParallelBuffer {
ParallelFileBuffer() : ParallelBuffer(){};
ParallelFileBuffer(size_t blocksize) : ParallelBuffer(blocksize) {}

virtual void open(const string& filename) override {
_file.open(filename, "r");
_eof = false;
_buf.resize(_blocksize);
auto task = [&file = this->_file, bs = this->_blocksize,
&buf = this->_buf]() { return file.read(buf.data(), bs); };
_fut = std::async(task);
}
// _________________________________________________________________________
virtual void open(const string& filename) override;

// _____________________________________________________
virtual std::optional<std::vector<char>> getNextBlock() override {
if (_eof) {
return std::nullopt;
}

AD_CHECK(_file.isOpen() && _fut.valid());
auto numBytesRead = _fut.get();
_buf.resize(numBytesRead);
std::optional<std::vector<char>> ret = std::move(_buf);

_buf.resize(_blocksize);
auto task = [&file = this->_file, bs = this->_blocksize,
&buf = this->_buf]() { return file.read(buf.data(), bs); };
_fut = std::async(task);

return ret;
}
virtual std::optional<std::vector<char>> getNextBlock() override;

private:
ad_utility::File _file;
bool _eof = true;
std::vector<char> _buf;
std::future<size_t> _fut;
};

/// A parallel buffer, where each of the blocks except for the last one has to
/// end with a certain regex (e.g. a full stop followed by whitespace and a
/// newline to denote the end of a triple in a .ttl file).
class ParallelBufferWithEndRegex : public ParallelBuffer {
public:
ParallelBufferWithEndRegex(size_t blocksize, std::string endRegex)
: ParallelBuffer{blocksize}, _endRegex{endRegex} {}

// __________________________________________________________________________
std::optional<std::vector<char>> getNextBlock() override;

// Open the file from which the blocks are read.
void open(const string& filename) override { _rawBuffer.open(filename); }

private:
// Find `regex` near the end of `vec` by searching in blocks of 1000, 2000,
// 4000... bytes. We have to do this, because "reverse" regex matching is not
// trivial. Returns the st Returns the number of bytes in `vec` until the end
// of the regex match, or std::nullopt if the regex was not found at all.
static std::optional<size_t> findRegexNearEnd(const std::vector<char>& vec,
const re2::RE2& regex);
ParallelFileBuffer _rawBuffer{_blocksize};
std::vector<char> _remainder;
re2::RE2 _endRegex;
bool _exhausted = false;
};
19 changes: 19 additions & 0 deletions src/parser/ParallelParseBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,25 @@ class ParserBatcher {
}
}

// The second requires evaluates to `true` only if the `Parser` type has a
// getBatch() member function. The first requires enables this function only
// if the second "requires" evaluates to true
std::optional<std::vector<Triple>> getBatch() requires requires(Parser p) {
p.getBatch();
}
{
if (m_numTriplesAlreadyParsed >= m_maxNumTriples) {
return std::nullopt;
}
auto opt = m_parser->getBatch();
if (!opt) {
m_exhaustedCallback();
} else {
m_numTriplesAlreadyParsed += opt->size();
}
return opt;
}

std::shared_ptr<Parser> m_parser;
size_t m_maxNumTriples;
size_t m_numTriplesAlreadyParsed = 0;
Expand Down

0 comments on commit 766843e

Please sign in to comment.