Skip to content

Commit

Permalink
Memory Limit for the Cache
Browse files Browse the repository at this point in the history
- There is now also a configurable limit on the actual size
  of cache elements, including a limit on the maximal size
  of a single cache element
- Pinned elements also count towards the size.
- Results are only inserted into the cache once they are fully
  computed.
- If a result is requested from the cache, while it is still
  being computed, this computation can be awaited. This
  functionality existed before, but is now encapsulated
  in a separate CacheAdapter class. This encapsulation allows
  us to test this functionality which was previously untested.
- Removed thread-safety from the cache class itself,
  As it can be added easily using ad_utility::Synchronized
  • Loading branch information
joka921 committed Apr 10, 2021
1 parent d060088 commit 86a516c
Show file tree
Hide file tree
Showing 16 changed files with 673 additions and 373 deletions.
108 changes: 39 additions & 69 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,10 @@ shared_ptr<const ResultTable> Operation::getResult(bool isRoot) {
const bool pinChildIndexScanSizes = _executionContext->_pinResult && isRoot;
const bool pinResult =
_executionContext->_pinSubtrees || pinChildIndexScanSizes;
LOG(TRACE) << "Check cache for Operation result" << endl;
LOG(TRACE) << "Using key: \n" << cacheKey << endl;
auto [newResult, existingResult] =
(pinResult)
? cache.tryEmplacePinned(cacheKey, _executionContext->getAllocator())
: cache.tryEmplace(cacheKey, _executionContext->getAllocator());

// When pinning a final result only, we also need to remember all of the
// involved IndexScans' sizes, otherwise the queryPlanner will retrigger these
// computations when reading the result from the cache.
if (pinChildIndexScanSizes) {
auto lock = getExecutionContext()->getPinnedSizes().wlock();
forAllDescendants([&lock](QueryExecutionTree* child) {
Expand All @@ -72,69 +69,42 @@ shared_ptr<const ResultTable> Operation::getResult(bool isRoot) {
});
}

if (newResult) {
LOG(TRACE) << "Not in the cache, need to compute result" << endl;
LOG(DEBUG) << "Available memory (in MB) before operation: "
<< (_executionContext->getAllocator().numFreeBytes() >> 20)
<< std::endl;
// Passing the raw pointer here is ok as the result shared_ptr remains
// in scope
try {
computeResult(newResult->_resTable.get());
} catch (const ad_semsearch::AbortException& e) {
// A child Operation was aborted, abort this Operation
// as well. The child already printed
abort(newResult, false);
// Continue unwinding the stack
throw;
} catch (const std::exception& e) {
// We are in the innermost level of the exception, so print
abort(newResult, true);
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException(e);
} catch (...) {
// We are in the innermost level of the exception, so print
abort(newResult, true);
LOG(ERROR) << "WEIRD_EXCEPTION not inheriting from std::exception"
<< endl;
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException("WEIRD_EXCEPTION");
}
timer.stop();
_runtimeInfo.setRows(newResult->_resTable->size());
_runtimeInfo.setCols(getResultWidth());
_runtimeInfo.setDescriptor(getDescriptor());
_runtimeInfo.setColumnNames(getVariableColumns());
try {
auto computeLambda = [&, this] {
CacheValue val(getExecutionContext()->getAllocator());
computeResult(val._resTable.get());
return val;
};

_runtimeInfo.setTime(timer.msecs());
_runtimeInfo.setWasCached(false);
// cache the runtime information for the execution as well
newResult->_runtimeInfo = _runtimeInfo;
// Only now we can let other threads access the result
// and runtime information
newResult->_resTable->finish();
return newResult->_resTable;
}
auto result = (pinResult) ? cache.computeOncePinned(cacheKey, computeLambda)
: cache.computeOnce(cacheKey, computeLambda);

existingResult->_resTable->awaitFinished();
if (existingResult->_resTable->status() == ResultTable::ABORTED) {
LOG(ERROR) << "Operation aborted while awaiting result" << endl;
AD_THROW(ad_semsearch::Exception::BAD_QUERY,
"Operation aborted while awaiting result");
timer.stop();
createRuntimeInformation(result, timer.msecs());
return result._resultPointer->_resTable;
} catch (const ad_semsearch::AbortException& e) {
// A child Operation was aborted, do not print the information again.
throw;
} catch (const ad_utility::AbortedByOtherThreadException& e) {
LOG(ERROR) << "Aborted operation was found in the cache:" << std::endl;
LOG(ERROR) << asString();
throw ad_semsearch::AbortException(e);
} catch (const std::exception& e) {
// We are in the innermost level of the exception, so print
LOG(ERROR) << "Aborted Operation:" << endl;
LOG(ERROR) << asString() << endl;
LOG(ERROR) << e.what() << endl;
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException(e);
} catch (...) {
// We are in the innermost level of the exception, so print
LOG(ERROR) << "Aborted Operation:" << endl;
LOG(ERROR) << asString() << endl;
LOG(ERROR) << "WEIRD_EXCEPTION not inheriting from std::exception" << endl;
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException("WEIRD_EXCEPTION");
}
timer.stop();
_runtimeInfo = existingResult->_runtimeInfo;
// We need to update column names and descriptor as we may have cached with
// different variable names
_runtimeInfo.setDescriptor(getDescriptor());
_runtimeInfo.setColumnNames(getVariableColumns());
_runtimeInfo.setTime(timer.msecs());
_runtimeInfo.setWasCached(true);
_runtimeInfo.addDetail("original_total_time",
existingResult->_runtimeInfo.getTime());
_runtimeInfo.addDetail("original_operation_time",
existingResult->_runtimeInfo.getOperationTime());
return existingResult->_resTable;
}

}
54 changes: 43 additions & 11 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,19 @@ class Operation {
* when memory pressure was lowered in the meantime. When print is true the
* Operation is printed to the ERROR LOG
*/
void abort(const shared_ptr<CacheValue>& cachedResult, bool print) {
const std::string opString = asString();
if (print) {
LOG(ERROR) << "Aborted Operation:" << endl;
LOG(ERROR) << opString << endl;
}
// Remove Operation from cache so we may retry it later. Anyone with a live
// pointer will be waiting and register the abort.
_executionContext->getQueryTreeCache().erase(opString);
cachedResult->_resTable->abort();
}
/*
// TODO<joka921>: change to pointer for styleguide?
void abort(SubtreeCache::Res& cachedResult, bool print) {
const std::string opString = asString();
if (print) {
LOG(ERROR) << "Aborted Operation:" << endl;
LOG(ERROR) << opString << endl;
}
// Remove Operation from cache so we may retry it later. Anyone with a live
// pointer will be waiting and register the abort.
cachedResult.abort();
}
*/

/**
* @return A list of columns on which the result of this operation is sorted.
Expand Down Expand Up @@ -143,6 +145,36 @@ class Operation {
//! Computes both, an EntityList and a HitList.
virtual void computeResult(ResultTable* result) = 0;

// Create and store the complete runtime Information for this operation.
// All data that was previously stored in the runtime information will be
// deleted.
virtual void createRuntimeInformation(
const SubtreeCache::ResultAndCacheStatus& resAndCache,
size_t timeInMilliseconds) final {
// reset
_runtimeInfo = RuntimeInformation();
// the column names might differ between a cached result and this operation,
// so we have to take the local ones.
_runtimeInfo.setColumnNames(getVariableColumns());

_runtimeInfo.setCols(getResultWidth());
_runtimeInfo.setDescriptor(getDescriptor());

// Only the result that was actually computed (or read from cache) knows
// the correct information about the children computations.
_runtimeInfo.children() =
resAndCache._resultPointer->_runtimeInfo.children();

_runtimeInfo.setTime(timeInMilliseconds);
_runtimeInfo.setRows(resAndCache._resultPointer->_resTable->size());
_runtimeInfo.setWasCached(resAndCache._wasCached);
_runtimeInfo.addDetail("original_total_time",
resAndCache._resultPointer->_runtimeInfo.getTime());
_runtimeInfo.addDetail(
"original_operation_time",
resAndCache._resultPointer->_runtimeInfo.getOperationTime());
}

vector<size_t> _resultSortedColumns;
RuntimeInformation _runtimeInfo;

Expand Down
10 changes: 8 additions & 2 deletions src/engine/QueryExecutionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "../global/Constants.h"
#include "../index/Index.h"
#include "../util/Cache.h"
#include "../util/CacheAdapter.h"
#include "../util/Log.h"
#include "../util/Synchronized.h"
#include "./Engine.h"
Expand All @@ -33,7 +34,12 @@ struct CacheValue {
}
};

typedef ad_utility::LRUCache<string, CacheValue> SubtreeCache;

// Threadsafe LRU cache for (partial) query results, that
// checks on insertion, if the result is currently being computed
// by another query.
using SubtreeCache =
ad_utility::CacheAdapter<ad_utility::LRUCache<string, CacheValue>>;
using PinnedSizes =
ad_utility::Synchronized<ad_utility::HashMap<std::string, size_t>,
std::shared_mutex>;
Expand Down Expand Up @@ -88,4 +94,4 @@ class QueryExecutionContext {
// allocators are copied but hold shared state
ad_utility::AllocatorWithLimit<Id> _alloc;
QueryPlanningCostFactors _costFactors;
};
};
12 changes: 6 additions & 6 deletions src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ string QueryExecutionTree::asString(size_t indent) {
void QueryExecutionTree::setOperation(QueryExecutionTree::OperationType type,
std::shared_ptr<Operation> op) {
_type = type;
_rootOperation = op;
_rootOperation = std::move(op);
_asString = "";
_sizeEstimate = std::numeric_limits<size_t>::max();
// with setting the operation the initialization is done and we can try to
Expand Down Expand Up @@ -145,7 +145,7 @@ nlohmann::json QueryExecutionTree::writeResultAsJson(

// _____________________________________________________________________________
size_t QueryExecutionTree::getCostEstimate() {
if (_cachedResult && _cachedResult->status() == ResultTable::FINISHED) {
if (_cachedResult) {
// result is pinned in cache. Nothing to compute
return 0;
}
Expand All @@ -159,7 +159,7 @@ size_t QueryExecutionTree::getCostEstimate() {
// _____________________________________________________________________________
size_t QueryExecutionTree::getSizeEstimate() {
if (_sizeEstimate == std::numeric_limits<size_t>::max()) {
if (_cachedResult && _cachedResult->status() == ResultTable::FINISHED) {
if (_cachedResult) {
_sizeEstimate = _cachedResult->size();
} else {
// if we are in a unit test setting and there is no QueryExecutionContest
Expand All @@ -173,7 +173,7 @@ size_t QueryExecutionTree::getSizeEstimate() {

// _____________________________________________________________________________
bool QueryExecutionTree::knownEmptyResult() {
if (_cachedResult && _cachedResult->status() == ResultTable::FINISHED) {
if (_cachedResult) {
return _cachedResult->size() == 0;
}
return _rootOperation->knownEmptyResult();
Expand All @@ -190,9 +190,9 @@ void QueryExecutionTree::readFromCache() {
return;
}
auto& cache = _qec->getQueryTreeCache();
std::shared_ptr<const CacheValue> res = cache[asString()];
std::shared_ptr<const CacheValue> res = cache.cacheAt(asString());
if (res) {
_cachedResult = cache[asString()]->_resTable;
_cachedResult = res->_resTable;
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/engine/ResultTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@ ResultTable::ResultTable(ad_utility::AllocatorWithLimit<Id> allocator)
: _sortedBy(),
_data(std::move(allocator)),
_resultTypes(),
_localVocab(std::make_shared<std::vector<std::string>>()),
_status(ResultTable::IN_PROGRESS) {}
_localVocab(std::make_shared<std::vector<std::string>>()) {}

// _____________________________________________________________________________
void ResultTable::clear() {
_localVocab = nullptr;
_data.clear();
_status = IN_PROGRESS;
}

// _____________________________________________________________________________
Expand Down
28 changes: 0 additions & 28 deletions src/engine/ResultTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,29 +77,6 @@ class ResultTable {

virtual ~ResultTable();

void abort() {
lock_guard<mutex> lk(_cond_var_m);
clear();
_status = ResultTable::ABORTED;
_cond_var.notify_all();
}

void finish() {
lock_guard<mutex> lk(_cond_var_m);
_status = ResultTable::FINISHED;
_cond_var.notify_all();
}

Status status() const {
lock_guard<mutex> lk(_cond_var_m);
return _status;
}

void awaitFinished() const {
unique_lock<mutex> lk(_cond_var_m);
_cond_var.wait(lk, [&] { return _status != ResultTable::IN_PROGRESS; });
}

std::optional<std::string> idToOptionalString(Id id) const {
if (id < _localVocab->size()) {
return (*_localVocab)[id];
Expand All @@ -124,9 +101,4 @@ class ResultTable {
}

private:
// See this SO answer for why mutable is ok here
// https://stackoverflow.com/questions/3239905/c-mutex-and-const-correctness
mutable condition_variable _cond_var;
mutable mutex _cond_var_m;
Status _status;
};
6 changes: 6 additions & 0 deletions src/engine/RuntimeInformation.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ class RuntimeInformation {

void addChild(const RuntimeInformation& r) { _children.push_back(r); }

// direct access to the children
std::vector<RuntimeInformation>& children() { return _children; }
[[nodiscard]] const std::vector<RuntimeInformation>& children() const {
return _children;
}

template <typename T>
void addDetail(const std::string& key, const T& value) {
_details[key] = value;
Expand Down
6 changes: 4 additions & 2 deletions src/engine/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ class Server {
: _numThreads(numThreads),
_serverSocket(),
_port(port),
_cache(NOF_SUBTREES_TO_CACHE),
_cache(
NOF_SUBTREES_TO_CACHE,
CACHE_SIZE_IN_GB * (2ull << 30u) * sizeof(Id),
MAX_SIZE_SINGLE_CACHE_ELEMENT_IN_GB * (2ull << 30u) * sizeof(Id)),
_allocator(ad_utility::makeAllocationMemoryLeftThreadsafeObject(
maxMemInBytes)),

_index(),
_engine(),
_initialized(false) {}
Expand Down
2 changes: 2 additions & 0 deletions src/global/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ static const size_t STXXL_DISK_SIZE_INDEX_TEST = 10;
static constexpr size_t DEFAULT_MEM_FOR_QUERIES_IN_GB = 80;

static const size_t NOF_SUBTREES_TO_CACHE = 1000;
static const size_t CACHE_SIZE_IN_GB = 30;
static const size_t MAX_SIZE_SINGLE_CACHE_ELEMENT_IN_GB = 5;
static const size_t MAX_NOF_ROWS_IN_RESULT = 100000;
static const size_t MIN_WORD_PREFIX_SIZE = 4;
static const char PREFIX_CHAR = '*';
Expand Down

0 comments on commit 86a516c

Please sign in to comment.