Skip to content

Commit

Permalink
Merge pull request #318 from joka921/f.FlexibleCache
Browse files Browse the repository at this point in the history
Implementation of a More Flexible Cache
  • Loading branch information
niklas88 committed Apr 13, 2020
2 parents 0c0f5de + faab2ec commit ec8c914
Show file tree
Hide file tree
Showing 24 changed files with 1,845 additions and 334 deletions.
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ if (USE_PARALLEL)
add_definitions("-D_PARALLEL_SORT")
endif()

if (USE_TREE_BASED_CACHE)
add_definitions("-D_QLEVER_USE_TREE_BASED_CACHE")
endif()

################################
# STXXL
################################
Expand Down
3 changes: 2 additions & 1 deletion src/SparqlEngineMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,15 @@ int main(int argc, char** argv) {
Engine engine;
Index index;
SubtreeCache cache(NOF_SUBTREES_TO_CACHE);
PinnedSizes pinnedSizes;
index.setUsePatterns(usePatterns);
index.setOnDiskLiterals(onDiskLiterals);
index.createFromOnDiskIndex(indexName);
if (text) {
index.addTextFromOnDiskIndex();
}

QueryExecutionContext qec(index, engine, &cache);
QueryExecutionContext qec(index, engine, &cache, &pinnedSizes);
if (costFactosFileName.size() > 0) {
qec.readCostFactorsFromTSVFile(costFactosFileName);
}
Expand Down
3 changes: 2 additions & 1 deletion src/WriteIndexListsMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ int main(int argc, char** argv) {

Engine engine;
SubtreeCache cache(NOF_SUBTREES_TO_CACHE);
QueryExecutionContext qec(index, engine, &cache);
PinnedSizes pinnedSizes;
QueryExecutionContext qec(index, engine, &cache, &pinnedSizes);
ParsedQuery q;
if (!freebase) {
q = SparqlParser("SELECT ?x WHERE {?x <is-a> <Scientist>}").parse();
Expand Down
3 changes: 2 additions & 1 deletion src/engine/CountAvailablePredicates.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class CountAvailablePredicates : public Operation {
virtual vector<size_t> resultSortedOn() const override;

vector<QueryExecutionTree*> getChildren() override {
return {_subtree.get()};
using R = vector<QueryExecutionTree*>;
return _subtree != nullptr ? R{_subtree.get()} : R{};
}

ad_utility::HashMap<string, size_t> getVariableColumns() const;
Expand Down
7 changes: 7 additions & 0 deletions src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,13 @@ size_t IndexScan::computeSizeEstimate() {

// We have to do a simple scan anyway so might as well do it now
if (getResultWidth() == 1) {
auto key = asString();
{
auto rlock = getExecutionContext()->getPinnedSizes().rlock();
if (rlock->count(key)) {
return rlock->at(key);
}
}
return getResult()->size();
}
if (_type == SPO_FREE_P || _type == SOP_FREE_O) {
Expand Down
115 changes: 115 additions & 0 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,30 @@
#include "Operation.h"
#include "QueryExecutionTree.h"

template <typename F>
void Operation::forAllDescendants(F f) {
static_assert(
std::is_same_v<void, std::invoke_result_t<F, QueryExecutionTree*>>);
for (auto ptr : getChildren()) {
if (ptr) {
f(ptr);
ptr->forAllDescendants(f);
}
}
}

template <typename F>
void Operation::forAllDescendants(F f) const {
static_assert(
std::is_same_v<void, std::invoke_result_t<F, const QueryExecutionTree*>>);
for (auto ptr : getChildren()) {
if (ptr) {
f(ptr);
ptr->forAllDescendants(f);
}
}
}

// __________________________________________________________________________________________________________
vector<string> Operation::collectWarnings() const {
vector<string> res = getWarnings();
Expand All @@ -19,3 +43,94 @@ vector<string> Operation::collectWarnings() const {

return res;
}

// Get the result for the subtree rooted at this element.
// Use existing results if they are already available, otherwise
// trigger computation.
shared_ptr<const ResultTable> Operation::getResult(bool isRoot) {
ad_utility::Timer timer;
timer.start();
auto& cache = _executionContext->getQueryTreeCache();
const string cacheKey = asString();
const bool pinChildIndexScanSizes = _executionContext->_pinResult && isRoot;
const bool pinResult =
_executionContext->_pinSubtrees || pinChildIndexScanSizes;
LOG(TRACE) << "Check cache for Operation result" << endl;
LOG(TRACE) << "Using key: \n" << cacheKey << endl;
auto [newResult, existingResult] = (pinResult)
? cache.tryEmplacePinned(cacheKey)
: cache.tryEmplace(cacheKey);

if (pinChildIndexScanSizes) {
auto lock = getExecutionContext()->getPinnedSizes().wlock();
forAllDescendants([&lock](QueryExecutionTree* child) {
if (child->getType() == QueryExecutionTree::OperationType::SCAN) {
(*lock)[child->getRootOperation()->asString()] =
child->getSizeEstimate();
}
});
}

if (newResult) {
LOG(TRACE) << "Not in the cache, need to compute result" << endl;
// Passing the raw pointer here is ok as the result shared_ptr remains
// in scope
try {
computeResult(newResult->_resTable.get());
} catch (const ad_semsearch::AbortException& e) {
// A child Operation was aborted, abort this Operation
// as well. The child already printed
abort(newResult, false);
// Continue unwinding the stack
throw;
} catch (const std::exception& e) {
// We are in the innermost level of the exception, so print
abort(newResult, true);
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException(e);
} catch (...) {
// We are in the innermost level of the exception, so print
abort(newResult, true);
LOG(ERROR) << "WEIRD_EXCEPTION not inheriting from std::exception"
<< endl;
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException("WEIRD_EXCEPTION");
}
timer.stop();
_runtimeInfo.setRows(newResult->_resTable->size());
_runtimeInfo.setCols(getResultWidth());
_runtimeInfo.setDescriptor(getDescriptor());
_runtimeInfo.setColumnNames(getVariableColumns());

_runtimeInfo.setTime(timer.msecs());
_runtimeInfo.setWasCached(false);
// cache the runtime information for the execution as well
newResult->_runtimeInfo = _runtimeInfo;
// Only now we can let other threads access the result
// and runtime information
newResult->_resTable->finish();
return newResult->_resTable;
}

existingResult->_resTable->awaitFinished();
if (existingResult->_resTable->status() == ResultTable::ABORTED) {
LOG(ERROR) << "Operation aborted while awaiting result" << endl;
AD_THROW(ad_semsearch::Exception::BAD_QUERY,
"Operation aborted while awaiting result");
}
timer.stop();
_runtimeInfo = existingResult->_runtimeInfo;
// We need to update column names and descriptor as we may have cached with
// different variable names
_runtimeInfo.setDescriptor(getDescriptor());
_runtimeInfo.setColumnNames(getVariableColumns());
_runtimeInfo.setTime(timer.msecs());
_runtimeInfo.setWasCached(true);
_runtimeInfo.addDetail("original_total_time",
existingResult->_runtimeInfo.getTime());
_runtimeInfo.addDetail("original_operation_time",
existingResult->_runtimeInfo.getOperationTime());
return existingResult->_resTable;
}
92 changes: 13 additions & 79 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,85 +52,6 @@ class Operation {
// recursively collect all Warnings generated by all descendants
vector<string> collectWarnings() const;

// Get the result for the subtree rooted at this element.
// Use existing results if they are already available, otherwise
// trigger computation.
shared_ptr<const ResultTable> getResult() {
ad_utility::Timer timer;
timer.start();
auto& cache = _executionContext->getQueryTreeCache();
const string cacheKey = asString();
const bool pinResult = _executionContext->pin;
LOG(TRACE) << "Check cache for Operation result" << endl;
LOG(TRACE) << "Using key: \n" << cacheKey << endl;
auto [newResult, existingResult] = (pinResult)
? cache.tryEmplacePinned(cacheKey)
: cache.tryEmplace(cacheKey);

if (newResult) {
LOG(TRACE) << "Not in the cache, need to compute result" << endl;
// Passing the raw pointer here is ok as the result shared_ptr remains
// in scope
try {
computeResult(newResult->_resTable.get());
} catch (const ad_semsearch::AbortException& e) {
// A child Operation was aborted, abort this Operation
// as well. The child already printed
abort(newResult, false);
// Continue unwinding the stack
throw;
} catch (const std::exception& e) {
// We are in the innermost level of the exception, so print
abort(newResult, true);
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException(e);
} catch (...) {
// We are in the innermost level of the exception, so print
abort(newResult, true);
LOG(ERROR) << "WEIRD_EXCEPTION not inheriting from std::exception"
<< endl;
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException("WEIRD_EXCEPTION");
}
timer.stop();
_runtimeInfo.setRows(newResult->_resTable->size());
_runtimeInfo.setCols(getResultWidth());
_runtimeInfo.setDescriptor(getDescriptor());
_runtimeInfo.setColumnNames(getVariableColumns());

_runtimeInfo.setTime(timer.msecs());
_runtimeInfo.setWasCached(false);
// cache the runtime information for the execution as well
newResult->_runtimeInfo = _runtimeInfo;
// Only now we can let other threads access the result
// and runtime information
newResult->_resTable->finish();
return newResult->_resTable;
}

existingResult->_resTable->awaitFinished();
if (existingResult->_resTable->status() == ResultTable::ABORTED) {
LOG(ERROR) << "Operation aborted while awaiting result" << endl;
AD_THROW(ad_semsearch::Exception::BAD_QUERY,
"Operation aborted while awaiting result");
}
timer.stop();
_runtimeInfo = existingResult->_runtimeInfo;
// We need to update column names and descriptor as we may have cached with
// different variable names
_runtimeInfo.setDescriptor(getDescriptor());
_runtimeInfo.setColumnNames(getVariableColumns());
_runtimeInfo.setTime(timer.msecs());
_runtimeInfo.setWasCached(true);
_runtimeInfo.addDetail("original_total_time",
existingResult->_runtimeInfo.getTime());
_runtimeInfo.addDetail("original_operation_time",
existingResult->_runtimeInfo.getOperationTime());
return existingResult->_resTable;
}

/**
* Abort this Operation. Removes the Operation's result from the cache so
* that it can be retried. The result must be owned meaning only the
Expand Down Expand Up @@ -182,6 +103,11 @@ class Operation {

RuntimeInformation& getRuntimeInfo() { return _runtimeInfo; }

// Get the result for the subtree rooted at this element.
// Use existing results if they are already available, otherwise
// trigger computation.
shared_ptr<const ResultTable> getResult(bool isRoot = false);

protected:
QueryExecutionContext* getExecutionContext() const {
return _executionContext;
Expand Down Expand Up @@ -225,4 +151,12 @@ class Operation {
/// collect all the warnings that were created during the creation or
/// execution of this operation
std::vector<std::string> _warnings;

// recursively call a function on all children
template <typename F>
void forAllDescendants(F f);

// recursively call a function on all children
template <typename F>
void forAllDescendants(F f) const;
};
24 changes: 20 additions & 4 deletions src/engine/QueryExecutionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
#pragma once

#include <memory>
#include <shared_mutex>
#include <string>
#include <vector>
#include "../global/Constants.h"
#include "../index/Index.h"
#include "../util/LRUCache.h"
#include "../util/Cache.h"
#include "../util/Log.h"
#include "../util/Synchronized.h"
#include "./Engine.h"
#include "./ResultTable.h"
#include "QueryPlanningCostFactors.h"
Expand All @@ -24,25 +26,37 @@ struct CacheValue {
CacheValue() : _resTable(std::make_shared<ResultTable>()), _runtimeInfo() {}
std::shared_ptr<ResultTable> _resTable;
RuntimeInformation _runtimeInfo;
[[nodiscard]] size_t size() const {
return _resTable ? _resTable->size() * _resTable->width() : 0;
}
};

typedef ad_utility::LRUCache<string, CacheValue> SubtreeCache;
using PinnedSizes =
ad_utility::Synchronized<ad_utility::HashMap<std::string, size_t>,
std::shared_mutex>;

// Execution context for queries.
// Holds references to index and engine, implements caching.
class QueryExecutionContext {
public:
QueryExecutionContext(const Index& index, const Engine& engine,
SubtreeCache* const cache,
const bool pinSubtrees = false)
: pin(pinSubtrees),
PinnedSizes* const pinnedSizes,
const bool pinSubtrees = false,
const bool pinResult = false)
: _pinSubtrees(pinSubtrees),
_pinResult(pinResult),
_index(index),
_engine(engine),
_subtreeCache(cache),
_pinnedSizes(pinnedSizes),
_costFactors() {}

SubtreeCache& getQueryTreeCache() { return *_subtreeCache; }

PinnedSizes& getPinnedSizes() { return *_pinnedSizes; }

const Engine& getEngine() const { return _engine; }

const Index& getIndex() const { return _index; }
Expand All @@ -57,11 +71,13 @@ class QueryExecutionContext {
return _costFactors.getCostFactor(key);
};

const bool pin;
const bool _pinSubtrees;
const bool _pinResult;

private:
const Index& _index;
const Engine& _engine;
SubtreeCache* const _subtreeCache;
PinnedSizes* const _pinnedSizes;
QueryPlanningCostFactors _costFactors;
};

0 comments on commit ec8c914

Please sign in to comment.