Skip to content

Commit

Permalink
F.memory limited cache (#348)
Browse files Browse the repository at this point in the history
* Memory Limit for the Cache

- There is now also a configurable limit on the actual size
  of cache elements, including a limit on the maximal size
  of a single cache element.
- Pinned elements also count towards the size.
- Results are only inserted into the cache once they are fully
  computed.
- If a result is requested from the cache, while it is still
  being computed, this computation can be awaited. This
  functionality existed before, but is now encapsulated
  in a separate CacheAdapter class. This encapsulation allows
  us to test this functionality which was previously untested.
- Removed thread-safety from the cache class itself,
  As it can be added easily using ad_utility::Synchronized
  • Loading branch information
joka921 committed Apr 16, 2021
1 parent d060088 commit 00a633a
Show file tree
Hide file tree
Showing 22 changed files with 1,032 additions and 478 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ endif()
## Build targets for address sanitizer
# AddressSanitize
set(CMAKE_C_FLAGS_ASAN
"-fsanitize=address -fno-optimize-sibling-calls -fsanitize-address-use-after-scope -fno-omit-frame-pointer -g -O1"
"-fsanitize=address -fsanitize=undefined -fno-optimize-sibling-calls -fsanitize-address-use-after-scope -fno-omit-frame-pointer -g -O1"
CACHE STRING "Flags used by the C compiler during AddressSanitizer builds."
FORCE)
set(CMAKE_CXX_FLAGS_ASAN
"-fsanitize=address -fno-optimize-sibling-calls -fsanitize-address-use-after-scope -fno-omit-frame-pointer -g -O1"
"-fsanitize=address -fsanitize=undefined -fno-optimize-sibling-calls -fsanitize-address-use-after-scope -fno-omit-frame-pointer -g -O1"
CACHE STRING "Flags used by the C++ compiler during AddressSanitizer builds."
FORCE)

Expand Down
6 changes: 5 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ EXPOSE 7001
VOLUME ["/input", "/index"]

ENV INDEX_PREFIX index
ENV MEMORY_FOR_QUERIES 70
ENV CACHE_MAX_SIZE_GB 30
ENV CACHE_MAX_SIZE_GB_SINGLE_ENTRY 5
ENV CACHE_MAX_NUM_ENTRIES 1000
# Need the shell to get the INDEX_PREFIX envirionment variable
ENTRYPOINT ["/bin/sh", "-c", "exec ServerMain -i \"/index/${INDEX_PREFIX}\" -p 7001 \"$@\"", "--"]
ENTRYPOINT ["/bin/sh", "-c", "exec ServerMain -i \"/index/${INDEX_PREFIX}\" -j 8 -m ${MEMORY_FOR_QUERIES} -c ${CACHE_MAX_SIZE_GB} -e ${CACHE_MAX_SIZE_GB_SINGLE_ENTRY} -k ${CACHE_MAX_NUM_ENTRIES} -p 7001 \"$@\"", "--"]

# docker build -t qlever-<name> .
# # When running with user namespaces you may need to make the index folder accessible
Expand Down
59 changes: 47 additions & 12 deletions src/ServerMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
#include <getopt.h>
#include <stdio.h>
#include <stdlib.h>

#include <iomanip>
#include <iostream>
#include <string>
#include <vector>

#include "engine/Server.h"
#include "global/Constants.h"
#include "util/ReadableNumberFact.h"

using std::cerr;
Expand All @@ -24,16 +26,20 @@ using std::vector;
#define EMPH_OFF "\033[22m"

// Available options.
struct option options[] = {{"help", no_argument, NULL, 'h'},
{"index", required_argument, NULL, 'i'},
{"worker-threads", required_argument, NULL, 'j'},
{"memory-for-queries", required_argument, NULL, 'm'},
{"on-disk-literals", no_argument, NULL, 'l'},
{"port", required_argument, NULL, 'p'},
{"no-patterns", no_argument, NULL, 'P'},
{"no-pattern-trick", no_argument, NULL, 'T'},
{"text", no_argument, NULL, 't'},
{NULL, 0, NULL, 0}};
struct option options[] = {
{"help", no_argument, NULL, 'h'},
{"index", required_argument, NULL, 'i'},
{"worker-threads", required_argument, NULL, 'j'},
{"memory-for-queries", required_argument, NULL, 'm'},
{"cache-max-size-gb", required_argument, NULL, 'c'},
{"cache-max-size-gb-single-entry", required_argument, NULL, 'e'},
{"cache-max-num-entries", required_argument, NULL, 'k'},
{"on-disk-literals", no_argument, NULL, 'l'},
{"port", required_argument, NULL, 'p'},
{"no-patterns", no_argument, NULL, 'P'},
{"no-pattern-trick", no_argument, NULL, 'T'},
{"text", no_argument, NULL, 't'},
{NULL, 0, NULL, 0}};

void printUsage(char* execName) {
std::ios coutState(nullptr);
Expand All @@ -55,6 +61,22 @@ void printUsage(char* execName) {
"query processing and caching. If exceeded, query will return with "
"an error, but the engine will not crash."
<< endl;
cout << " " << std::setw(20) << "c, cache-size-in-gb" << std::setw(1)
<< "Maximum memory size in GB for all cache entries (pinned and "
"non-pinned). Note that the cache is part of the amount of memory "
"limited by --memory-for-queries."
<< endl;
cout << " " << std::setw(20) << "e, cache-max-size-single-element"
<< std::setw(1)
<< "Maximum size in GB for a single cache entry. In other words, "
"results larger than this will never be cached."
<< endl;
cout << " " << std::setw(20) << "k, cache-max-num-values" << std::setw(1)
<< "Maximum number of entries in the cache. If exceeded, remove "
"least-recently used entries from the cache if possible. Note that "
"this condition and the size limit specified via --cache-max-size-gb "
"both have to hold (logical AND)."
<< endl;
cout << " " << std::setw(20) << "no-patterns" << std::setw(1) << " "
<< "Disable the use of patterns. This disables ql:has-predicate."
<< endl;
Expand Down Expand Up @@ -88,11 +110,14 @@ int main(int argc, char** argv) {
bool enablePatternTrick = true;

size_t memLimit = DEFAULT_MEM_FOR_QUERIES_IN_GB;
size_t cacheMaxSizeGB = DEFAULT_CACHE_MAX_SIZE_GB;
size_t cacheMaxSizeGBSingleEntry = DEFAULT_CACHE_MAX_SIZE_GB_SINGLE_ENTRY;
size_t cacheMaxNumEntries = DEFAULT_CACHE_MAX_NUM_ENTRIES;

optind = 1;
// Process command line arguments.
while (true) {
int c = getopt_long(argc, argv, "i:p:j:tauhm:lT", options, NULL);
int c = getopt_long(argc, argv, "i:p:j:tauhm:lc:e:k:T", options, NULL);
if (c == -1) break;
switch (c) {
case 'i':
Expand Down Expand Up @@ -125,6 +150,15 @@ int main(int argc, char** argv) {
"will be ignored for ServerMain. The correct setting for "
"this flag is read directly from the index\n";
break;
case 'c':
cacheMaxSizeGB = atoi(optarg);
break;
case 'e':
cacheMaxSizeGBSingleEntry = atoi(optarg);
break;
case 'k':
cacheMaxNumEntries = atoi(optarg);
break;
default:
cout << endl
<< "! ERROR in processing options (getopt returned '" << c
Expand Down Expand Up @@ -154,7 +188,8 @@ int main(int argc, char** argv) {
cout << "Set locale LC_CTYPE to: " << locale << endl;

try {
Server server(port, numThreads, memLimit * 1 << 30u);
Server server(port, numThreads, memLimit, cacheMaxSizeGB,
cacheMaxSizeGBSingleEntry, cacheMaxNumEntries);
server.initialize(index, text, usePatterns, enablePatternTrick);
server.run();
} catch (const std::exception& e) {
Expand Down
2 changes: 1 addition & 1 deletion src/SparqlEngineMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ int main(int argc, char** argv) {
try {
Engine engine;
Index index;
SubtreeCache cache(NOF_SUBTREES_TO_CACHE);
ConcurrentLruCache cache(DEFAULT_CACHE_MAX_NUM_ENTRIES);
PinnedSizes pinnedSizes;
index.setUsePatterns(usePatterns);
index.setOnDiskLiterals(onDiskLiterals);
Expand Down
4 changes: 3 additions & 1 deletion src/WriteIndexListsMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
#include <getopt.h>
#include <libgen.h>
#include <stdlib.h>

#include <iomanip>
#include <iostream>
#include <string>

#include "engine/QueryPlanner.h"
#include "global/Constants.h"
#include "parser/SparqlParser.h"
#include "util/ReadableNumberFact.h"
#include "util/Timer.h"
Expand Down Expand Up @@ -87,7 +89,7 @@ int main(int argc, char** argv) {
index.dumpAsciiLists(lists, decodeGapsAndFrequency);

Engine engine;
SubtreeCache cache(NOF_SUBTREES_TO_CACHE);
ConcurrentLruCache cache(DEFAULT_CACHE_MAX_NUM_ENTRIES);
PinnedSizes pinnedSizes;
ad_utility::AllocatorWithLimit<Id> allocator{
ad_utility::makeAllocationMemoryLeftThreadsafeObject(
Expand Down
124 changes: 52 additions & 72 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,89 +52,69 @@ shared_ptr<const ResultTable> Operation::getResult(bool isRoot) {
timer.start();
auto& cache = _executionContext->getQueryTreeCache();
const string cacheKey = asString();
const bool pinChildIndexScanSizes = _executionContext->_pinResult && isRoot;
const bool pinFinalResultButNotSubtrees =
_executionContext->_pinResult && isRoot;
const bool pinResult =
_executionContext->_pinSubtrees || pinChildIndexScanSizes;
LOG(TRACE) << "Check cache for Operation result" << endl;
LOG(TRACE) << "Using key: \n" << cacheKey << endl;
auto [newResult, existingResult] =
(pinResult)
? cache.tryEmplacePinned(cacheKey, _executionContext->getAllocator())
: cache.tryEmplace(cacheKey, _executionContext->getAllocator());
_executionContext->_pinSubtrees || pinFinalResultButNotSubtrees;

if (pinChildIndexScanSizes) {
// When we pin the final result but no subtrees, we need to remember the sizes
// of all involved index scans that have only one free variable. Note that
// these index scans are executed already during query planning because they
// have to be executed anyway, for any query plan. If we don't remember these
// sizes here, future queries that take the result from the cache would redo
// these index scans. Note that we do not need to remember the multiplicity
// (and distinctness) because the multiplicity for an index scan with a single
// free variable is always 1.
if (pinFinalResultButNotSubtrees) {
auto lock = getExecutionContext()->getPinnedSizes().wlock();
forAllDescendants([&lock](QueryExecutionTree* child) {
if (child->getType() == QueryExecutionTree::OperationType::SCAN) {
if (child->getType() == QueryExecutionTree::OperationType::SCAN &&
child->getResultWidth() == 1) {
(*lock)[child->getRootOperation()->asString()] =
child->getSizeEstimate();
}
});
}

if (newResult) {
LOG(TRACE) << "Not in the cache, need to compute result" << endl;
LOG(DEBUG) << "Available memory (in MB) before operation: "
<< (_executionContext->getAllocator().numFreeBytes() >> 20)
<< std::endl;
// Passing the raw pointer here is ok as the result shared_ptr remains
// in scope
try {
computeResult(newResult->_resTable.get());
} catch (const ad_semsearch::AbortException& e) {
// A child Operation was aborted, abort this Operation
// as well. The child already printed
abort(newResult, false);
// Continue unwinding the stack
throw;
} catch (const std::exception& e) {
// We are in the innermost level of the exception, so print
abort(newResult, true);
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException(e);
} catch (...) {
// We are in the innermost level of the exception, so print
abort(newResult, true);
LOG(ERROR) << "WEIRD_EXCEPTION not inheriting from std::exception"
<< endl;
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException("WEIRD_EXCEPTION");
}
timer.stop();
_runtimeInfo.setRows(newResult->_resTable->size());
_runtimeInfo.setCols(getResultWidth());
_runtimeInfo.setDescriptor(getDescriptor());
_runtimeInfo.setColumnNames(getVariableColumns());
try {
auto computeLambda = [this] {
CacheValue val(getExecutionContext()->getAllocator());
computeResult(val._resultTable.get());
return val;
};

_runtimeInfo.setTime(timer.msecs());
_runtimeInfo.setWasCached(false);
// cache the runtime information for the execution as well
newResult->_runtimeInfo = _runtimeInfo;
// Only now we can let other threads access the result
// and runtime information
newResult->_resTable->finish();
return newResult->_resTable;
}
auto result = (pinResult) ? cache.computeOncePinned(cacheKey, computeLambda)
: cache.computeOnce(cacheKey, computeLambda);

existingResult->_resTable->awaitFinished();
if (existingResult->_resTable->status() == ResultTable::ABORTED) {
LOG(ERROR) << "Operation aborted while awaiting result" << endl;
AD_THROW(ad_semsearch::Exception::BAD_QUERY,
"Operation aborted while awaiting result");
timer.stop();
createRuntimeInformation(result, timer.msecs());
return result._resultPointer->_resultTable;
} catch (const ad_semsearch::AbortException& e) {
// A child Operation was aborted, do not print the information again.
throw;
} catch (const ad_utility::WaitedForResultWhichThenFailedException& e) {
LOG(ERROR) << "Waited for a result from another thread which then failed"
<< endl;
LOG(ERROR) << asString();
throw ad_semsearch::AbortException(e);
} catch (const std::exception& e) {
// We are in the innermost level of the exception, so print
LOG(ERROR) << "Aborted Operation:" << endl;
LOG(ERROR) << asString() << endl;
LOG(ERROR) << e.what() << endl;
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException(e);
} catch (...) {
// We are in the innermost level of the exception, so print
LOG(ERROR) << "Aborted Operation:" << endl;
LOG(ERROR) << asString() << endl;
LOG(ERROR)
<< "Unexpected exception that is not a subclass of std::exception"
<< endl;
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException(
"Unexpected expection that is not a subclass of std::exception");
}
timer.stop();
_runtimeInfo = existingResult->_runtimeInfo;
// We need to update column names and descriptor as we may have cached with
// different variable names
_runtimeInfo.setDescriptor(getDescriptor());
_runtimeInfo.setColumnNames(getVariableColumns());
_runtimeInfo.setTime(timer.msecs());
_runtimeInfo.setWasCached(true);
_runtimeInfo.addDetail("original_total_time",
existingResult->_runtimeInfo.getTime());
_runtimeInfo.addDetail("original_operation_time",
existingResult->_runtimeInfo.getOperationTime());
return existingResult->_resTable;
}
51 changes: 32 additions & 19 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,6 @@ class Operation {
// recursively collect all Warnings generated by all descendants
vector<string> collectWarnings() const;

/**
* Abort this Operation. Removes the Operation's result from the cache so
* that it can be retried. The result must be owned meaning only the
* computing thread can abort an Operation. Retrying may succeed for example
* when memory pressure was lowered in the meantime. When print is true the
* Operation is printed to the ERROR LOG
*/
void abort(const shared_ptr<CacheValue>& cachedResult, bool print) {
const std::string opString = asString();
if (print) {
LOG(ERROR) << "Aborted Operation:" << endl;
LOG(ERROR) << opString << endl;
}
// Remove Operation from cache so we may retry it later. Anyone with a live
// pointer will be waiting and register the abort.
_executionContext->getQueryTreeCache().erase(opString);
cachedResult->_resTable->abort();
}

/**
* @return A list of columns on which the result of this operation is sorted.
*/
Expand Down Expand Up @@ -143,6 +124,38 @@ class Operation {
//! Computes both, an EntityList and a HitList.
virtual void computeResult(ResultTable* result) = 0;

// Create and store the complete runtime Information for this operation.
// All data that was previously stored in the runtime information will be
// deleted.
virtual void createRuntimeInformation(
const ConcurrentLruCache ::ResultAndCacheStatus& resultAndCacheStatus,
size_t timeInMilliseconds) final {
// reset
_runtimeInfo = RuntimeInformation();
// the column names might differ between a cached result and this operation,
// so we have to take the local ones.
_runtimeInfo.setColumnNames(getVariableColumns());

_runtimeInfo.setCols(getResultWidth());
_runtimeInfo.setDescriptor(getDescriptor());

// Only the result that was actually computed (or read from cache) knows
// the correct information about the children computations.
_runtimeInfo.children() =
resultAndCacheStatus._resultPointer->_runtimeInfo.children();

_runtimeInfo.setTime(timeInMilliseconds);
_runtimeInfo.setRows(
resultAndCacheStatus._resultPointer->_resultTable->size());
_runtimeInfo.setWasCached(resultAndCacheStatus._wasCached);
_runtimeInfo.addDetail(
"original_total_time",
resultAndCacheStatus._resultPointer->_runtimeInfo.getTime());
_runtimeInfo.addDetail(
"original_operation_time",
resultAndCacheStatus._resultPointer->_runtimeInfo.getOperationTime());
}

vector<size_t> _resultSortedColumns;
RuntimeInformation _runtimeInfo;

Expand Down

0 comments on commit 00a633a

Please sign in to comment.