Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First draft of a timeout for operations #346

Merged
merged 17 commits into from
Apr 25, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 56 additions & 22 deletions src/engine/GroupBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,27 @@ struct resizeIfVec<vector<C>, C> {
* its already allocated storage.
*/
template <int IN_WIDTH, int OUT_WIDTH>
void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
size_t blockEnd, const IdTableView<IN_WIDTH>& input,
const vector<ResultTable::ResultType>& inputTypes,
IdTableStatic<OUT_WIDTH>* result, size_t resultRow,
const ResultTable* inTable, ResultTable* outTable,
const Index& index,
ad_utility::HashSet<size_t>& distinctHashSet) {

void GroupBy::processGroup(const GroupBy::Aggregate& a, size_t blockStart,
size_t blockEnd, const IdTableView<IN_WIDTH>& input,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's with the two empty lines here?

const vector<ResultTable::ResultType>& inputTypes,
IdTableStatic<OUT_WIDTH>* result, size_t resultRow,
const ResultTable* inTable, ResultTable* outTable,
const Index& index,
ad_utility::HashSet<size_t>& distinctHashSet) const {
auto check = [this](size_t i) {
if (i % 32768 == 0) {
checkTimeout();
}
};
switch (a._type) {
case ParsedQuery::AggregateType::AVG: {
float res = 0;
if (inputTypes[a._inCol] == ResultTable::ResultType::VERBATIM) {
if (a._distinct) {
for (size_t i = blockStart; i <= blockEnd; i++) {
check(i);
const auto it = distinctHashSet.find(input(i, a._inCol));
if (it == distinctHashSet.end()) {
distinctHashSet.insert(input(i, a._inCol));
Expand All @@ -196,6 +204,7 @@ void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
distinctHashSet.clear();
} else {
for (size_t i = blockStart; i <= blockEnd; i++) {
check(i);
res += input(i, a._inCol);
}
}
Expand All @@ -204,6 +213,7 @@ void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
float tmpF;
if (a._distinct) {
for (size_t i = blockStart; i <= blockEnd; i++) {
check(i);
const auto it = distinctHashSet.find(input(i, a._inCol));
if (it == distinctHashSet.end()) {
distinctHashSet.insert(input(i, a._inCol));
Expand All @@ -214,6 +224,7 @@ void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
distinctHashSet.clear();
} else {
for (size_t i = blockStart; i <= blockEnd; i++) {
check(i);
std::memcpy(&tmpF, &input(i, a._inCol), sizeof(float));
res += tmpF;
}
Expand All @@ -224,6 +235,7 @@ void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
} else {
if (a._distinct) {
for (size_t i = blockStart; i <= blockEnd; i++) {
check(i);
const auto it = distinctHashSet.find(input(i, a._inCol));
if (it == distinctHashSet.end()) {
distinctHashSet.insert(input(i, a._inCol));
Expand All @@ -242,6 +254,7 @@ void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
distinctHashSet.clear();
} else {
for (size_t i = blockStart; i <= blockEnd; i++) {
check(i);
// load the string, parse it as an xsd::int or float
// TODO(schnelle): What's the correct way to handle OPTIONAL here
std::string entity =
Expand All @@ -265,6 +278,7 @@ void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
if (a._distinct) {
size_t count = 0;
for (size_t i = blockStart; i <= blockEnd; i++) {
check(i);
const auto it = distinctHashSet.find(input(i, a._inCol));
if (it == distinctHashSet.end()) {
count++;
Expand All @@ -283,6 +297,7 @@ void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
if (inputTypes[a._inCol] == ResultTable::ResultType::VERBATIM) {
if (a._distinct) {
for (size_t i = blockStart; i + 1 <= blockEnd; i++) {
check(i);
const auto it = distinctHashSet.find(input(i, a._inCol));
if (it == distinctHashSet.end()) {
distinctHashSet.insert(input(i, a._inCol));
Expand All @@ -296,6 +311,7 @@ void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
distinctHashSet.clear();
} else {
for (size_t i = blockStart; i + 1 <= blockEnd; i++) {
check(i);
out << input(i, a._inCol) << *delim;
}
out << input(blockEnd, a._inCol);
Expand All @@ -304,6 +320,7 @@ void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
float f;
if (a._distinct) {
for (size_t i = blockStart; i + 1 <= blockEnd; i++) {
check(i);
const auto it = distinctHashSet.find(input(i, a._inCol));
if (it == distinctHashSet.end()) {
distinctHashSet.insert(input(i, a._inCol));
Expand All @@ -319,6 +336,7 @@ void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
distinctHashSet.clear();
} else {
for (size_t i = blockStart; i + 1 <= blockEnd; i++) {
check(i);
std::memcpy(&f, &input(i, a._inCol), sizeof(float));
out << f << *delim;
}
Expand All @@ -328,6 +346,7 @@ void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
} else if (inputTypes[a._inCol] == ResultTable::ResultType::TEXT) {
if (a._distinct) {
for (size_t i = blockStart; i + 1 <= blockEnd; i++) {
check(i);
const auto it = distinctHashSet.find(input(i, a._inCol));
if (it == distinctHashSet.end()) {
distinctHashSet.insert(input(i, a._inCol));
Expand All @@ -341,13 +360,15 @@ void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
distinctHashSet.clear();
} else {
for (size_t i = blockStart; i + 1 <= blockEnd; i++) {
check(i);
out << index.getTextExcerpt(input(i, a._inCol)) << *delim;
}
out << index.getTextExcerpt(input(blockEnd, a._inCol));
}
} else if (inputTypes[a._inCol] == ResultTable::ResultType::LOCAL_VOCAB) {
if (a._distinct) {
for (size_t i = blockStart; i + 1 <= blockEnd; i++) {
check(i);
const auto it = distinctHashSet.find(input(i, a._inCol));
if (it == distinctHashSet.end()) {
distinctHashSet.insert(input(i, a._inCol));
Expand All @@ -366,6 +387,7 @@ void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
distinctHashSet.clear();
} else {
for (size_t i = blockStart; i + 1 <= blockEnd; i++) {
check(i);
// TODO(schnelle): What's the correct way to handle OPTIONAL here
out << inTable->idToOptionalString(input(i, a._inCol)).value_or("")
<< *delim;
Expand All @@ -376,6 +398,7 @@ void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
} else {
if (a._distinct) {
for (size_t i = blockStart; i + 1 <= blockEnd; i++) {
check(i);
const auto it = distinctHashSet.find(input(i, a._inCol));
if (it == distinctHashSet.end()) {
distinctHashSet.insert(input(i, a._inCol));
Expand Down Expand Up @@ -405,6 +428,7 @@ void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
distinctHashSet.clear();
} else {
for (size_t i = blockStart; i + 1 <= blockEnd; i++) {
check(i);
// TODO(schnelle): What's the correct way to handle OPTIONAL here
std::string entity =
index.idToOptionalString(input(i, a._inCol)).value_or("");
Expand Down Expand Up @@ -583,12 +607,12 @@ void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
}

template <int IN_WIDTH, int OUT_WIDTH>
void doGroupBy(const IdTable& dynInput,
const vector<ResultTable::ResultType>& inputTypes,
const vector<size_t>& groupByCols,
const vector<GroupBy::Aggregate>& aggregates, IdTable* dynResult,
const ResultTable* inTable, ResultTable* outTable,
const Index& index) {
void GroupBy::doGroupBy(const IdTable& dynInput,
const vector<ResultTable::ResultType>& inputTypes,
const vector<size_t>& groupByCols,
const vector<GroupBy::Aggregate>& aggregates,
IdTable* dynResult, const ResultTable* inTable,
ResultTable* outTable, const Index& index) const {
LOG(DEBUG) << "Group by input size " << dynInput.size() << std::endl;
if (dynInput.size() == 0) {
return;
Expand Down Expand Up @@ -621,6 +645,9 @@ void doGroupBy(const IdTable& dynInput,
size_t blockStart = 0;
size_t blockEnd = 0;
for (size_t pos = 1; pos < input.size(); pos++) {
if (pos % 32768 == 0) {
checkTimeout();
}
bool rowMatchesCurrentBlock = true;
for (size_t i = 0; i < currentGroupBlock.size(); i++) {
if (input(pos, currentGroupBlock[i].first) !=
Expand Down Expand Up @@ -764,16 +791,23 @@ void GroupBy::computeResult(ResultTable* result) {

int inWidth = subresult->_data.cols();
int outWidth = result->_data.cols();
CALL_FIXED_SIZE_2(inWidth, outWidth, doGroupBy, subresult->_data,
inputResultTypes, groupByCols, aggregates, &result->_data,
subresult.get(), result, getIndex());

// Free the user data used by GROUP_CONCAT aggregates.
for (Aggregate& a : aggregates) {
if (a._type == ParsedQuery::AggregateType::GROUP_CONCAT) {
delete static_cast<std::string*>(a._userData);

auto cleanup = [&]() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specify variables in capture explicitly when it'a a short list and/or a short lambda?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general you are right. In this case I would argue that the default capture by reference is not problematic, because we do not pass the lambda outside, and only use it as a "copied code block", but with NEVER using default captures you agree with one of my favourite C++ authors, so I am totally convinced.

// Free the user data used by GROUP_CONCAT aggregates.
for (Aggregate& a : aggregates) {
if (a._type == ParsedQuery::AggregateType::GROUP_CONCAT) {
delete static_cast<std::string*>(a._userData);
}
joka921 marked this conversation as resolved.
Show resolved Hide resolved
}
};
try {
CALL_FIXED_SIZE_2(inWidth, outWidth, doGroupBy, subresult->_data,
inputResultTypes, groupByCols, aggregates, &result->_data,
subresult.get(), result, getIndex());
} catch (...) {
cleanup();
throw;
}

cleanup();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you briefly explain which case is handled by this which was not properly handled before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as we had a groupConcat operation (which requires manual cleanup of the _userData) and the CALL_FIXED_SIZE_2 (basically a call to doGroupBy) would throw an exception, then the _userData memory would leak. I don't know if this was previously safe, but in the meantime we at least have Timeout and OutOfMemory exceptions, which might very well occur here and can lead to the memory leak.
As stated before: This type erasure - style _userData should become a type with a proper constructor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(which I would do in a separate, small PR)

LOG(DEBUG) << "GroupBy result computation done." << std::endl;
}
28 changes: 19 additions & 9 deletions src/engine/GroupBy.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,23 @@ class GroupBy : public Operation {
ad_utility::HashMap<string, size_t> _varColMap;

virtual void computeResult(ResultTable* result) override;
};

// This method is declared here solely for unit testing purposes
template <int IN_WIDTH, int OUT_WIDTH>
void doGroupBy(const IdTable& dynInput,
const vector<ResultTable::ResultType>& inputTypes,
const vector<size_t>& groupByCols,
const vector<GroupBy::Aggregate>& aggregates, IdTable* dynResult,
const ResultTable* inTable, ResultTable* outTable,
const Index& index);
template <int IN_WIDTH, int OUT_WIDTH>
void processGroup(const GroupBy::Aggregate& a, size_t blockStart,
size_t blockEnd, const IdTableView<IN_WIDTH>& input,
const vector<ResultTable::ResultType>& inputTypes,
IdTableStatic<OUT_WIDTH>* result, size_t resultRow,
const ResultTable* inTable, ResultTable* outTable,
const Index& index,
ad_utility::HashSet<size_t>& distinctHashSet) const;
Comment on lines +103 to +109
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks new, but processGroup was used in GroupBy.cpp before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processGroup was previously a free function, which was only defined in the GroupBy.cpp and not in the header. (In other classes such functions previously were often static). Now it has to be a member function because it has to check the timeout status. So it is simply a declaration of the function which was previously only defined in the .cpp


template <int IN_WIDTH, int OUT_WIDTH>
void doGroupBy(const IdTable& dynInput,
const vector<ResultTable::ResultType>& inputTypes,
const vector<size_t>& groupByCols,
const vector<GroupBy::Aggregate>& aggregates,
IdTable* dynResult, const ResultTable* inTable,
ResultTable* outTable, const Index& index) const;

FRIEND_TEST(GroupByTest, doGroupBy);
};
10 changes: 5 additions & 5 deletions src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ void IndexScan::computePSOfreeS(ResultTable* result) const {
result->_resultTypes.push_back(ResultTable::ResultType::KB);
result->_sortedBy = {0, 1};
const auto& idx = _executionContext->getIndex();
idx.scan(_predicate, &result->_data, idx._PSO);
idx.scan(_predicate, &result->_data, idx._PSO, _timeoutTimer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed as an argument here (and in the following)? Does idx.scan not have access to _timeoutTimer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the Index is global (the KB index on disk that is used by the whole QLever instance). The _timeoutTimer is per Query [Execution].

The real question behind this is:
should the scan function be implemented in the Index or in the IndexScan class.
There are other examples of this, there is also Engine::sort (global) which is then called by the OrderBy operation class. But this has nothing to do with the timeout, but rather with software architecture.

}

// _____________________________________________________________________________
Expand All @@ -299,7 +299,7 @@ void IndexScan::computePOSfreeO(ResultTable* result) const {
result->_resultTypes.push_back(ResultTable::ResultType::KB);
result->_sortedBy = {0, 1};
const auto& idx = _executionContext->getIndex();
idx.scan(_predicate, &result->_data, idx._POS);
idx.scan(_predicate, &result->_data, idx._POS, _timeoutTimer);
}

// _____________________________________________________________________________
Expand Down Expand Up @@ -338,7 +338,7 @@ void IndexScan::computeSPOfreeP(ResultTable* result) const {
result->_resultTypes.push_back(ResultTable::ResultType::KB);
result->_sortedBy = {0, 1};
const auto& idx = _executionContext->getIndex();
idx.scan(_subject, &result->_data, idx._SPO);
idx.scan(_subject, &result->_data, idx._SPO, _timeoutTimer);
}

// _____________________________________________________________________________
Expand All @@ -357,7 +357,7 @@ void IndexScan::computeSOPfreeO(ResultTable* result) const {
result->_resultTypes.push_back(ResultTable::ResultType::KB);
result->_sortedBy = {0, 1};
const auto& idx = _executionContext->getIndex();
idx.scan(_subject, &result->_data, idx._SOP);
idx.scan(_subject, &result->_data, idx._SOP, _timeoutTimer);
}

// _____________________________________________________________________________
Expand All @@ -377,7 +377,7 @@ void IndexScan::computeOSPfreeS(ResultTable* result) const {
result->_resultTypes.push_back(ResultTable::ResultType::KB);
result->_sortedBy = {0, 1};
const auto& idx = _executionContext->getIndex();
idx.scan(_object, &result->_data, idx._OSP);
idx.scan(_object, &result->_data, idx._OSP, _timeoutTimer);
}

// _____________________________________________________________________________
Expand Down
18 changes: 17 additions & 1 deletion src/engine/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Join::Join(QueryExecutionContext* qec, std::shared_ptr<QueryExecutionTree> t1,
size_t t2JoinCol, bool keepJoinColumn)
: Operation(qec) {
// Make sure subtrees are ordered so that identical queries can be identified.
if (t1.get()->asString() < t2.get()->asString()) {
if (t1 && t2 && t1.get()->asString() < t2.get()->asString()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throw exception if t1 or t2 is null

_left = t1;
_leftJoinCol = t1JoinCol;
_right = t2;
Expand Down Expand Up @@ -336,6 +336,8 @@ void Join::doComputeJoinWithFullScanDummyLeft(const IdTable& ndr,
// Do a scan.
LOG(TRACE) << "Inner scan with ID: " << currentJoinId << endl;
IdTable jr(2, _executionContext->getAllocator());
checkTimeout(); // the scan is a disk operation, so we can check the

joka921 marked this conversation as resolved.
Show resolved Hide resolved
scan(currentJoinId, &jr);
LOG(TRACE) << "Got #items: " << jr.size() << endl;
// Build the cross product.
Expand Down Expand Up @@ -376,6 +378,8 @@ void Join::doComputeJoinWithFullScanDummyRight(const IdTable& ndr,
} else {
// Do a scan.
LOG(TRACE) << "Inner scan with ID: " << currentJoinId << endl;
checkTimeout(); // the scan is a disk operation, so we can check the
// timeout frequently
IdTable jr(2, _executionContext->getAllocator());
scan(currentJoinId, &jr);
LOG(TRACE) << "Got #items: " << jr.size() << endl;
Expand Down Expand Up @@ -525,13 +529,19 @@ void Join::join(const IdTable& dynA, size_t jc1, const IdTable& dynB,
while (i < a.size() && j < b.size()) {
while (a(i, jc1) < b(j, jc2)) {
++i;
if (i % (1024 * 16) == 0) {
checkTimeout();
}
if (i >= a.size()) {
goto finish;
}
}

while (b(j, jc2) < a(i, jc1)) {
++j;
if (j % (1024 * 16) == 0) {
checkTimeout();
}
joka921 marked this conversation as resolved.
Show resolved Hide resolved
if (j >= b.size()) {
goto finish;
}
Expand Down Expand Up @@ -559,12 +569,18 @@ void Join::join(const IdTable& dynA, size_t jc1, const IdTable& dynB,
}

++j;
if (j % (1024 * 4) == 0) {
checkTimeout();
}
if (j >= b.size()) {
// The next i might still match
break;
}
}
++i;
if (i % (1024 * 4) == 0) {
checkTimeout();
}
if (i >= a.size()) {
goto finish;
}
Expand Down
4 changes: 2 additions & 2 deletions src/engine/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ class Join : public Operation {
* the result in dynRes. Creates a cross product for matching rows
**/
template <int L_WIDTH, int R_WIDTH, int OUT_WIDTH>
static void join(const IdTable& dynA, size_t jc1, const IdTable& dynB,
size_t jc2, IdTable* dynRes);
void join(const IdTable& dynA, size_t jc1, const IdTable& dynB, size_t jc2,
IdTable* dynRes);

class RightLargerTag {};
class LeftLargerTag {};
Expand Down