Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

F.memory limited cache #348

Merged
merged 11 commits into from
Apr 16, 2021
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ endif()
## Build targets for address sanitizer
# AddressSanitize
set(CMAKE_C_FLAGS_ASAN
"-fsanitize=address -fno-optimize-sibling-calls -fsanitize-address-use-after-scope -fno-omit-frame-pointer -g -O1"
"-fsanitize=address -fsanitize=undefined -fno-optimize-sibling-calls -fsanitize-address-use-after-scope -fno-omit-frame-pointer -g -O1"
CACHE STRING "Flags used by the C compiler during AddressSanitizer builds."
FORCE)
set(CMAKE_CXX_FLAGS_ASAN
"-fsanitize=address -fno-optimize-sibling-calls -fsanitize-address-use-after-scope -fno-omit-frame-pointer -g -O1"
"-fsanitize=address -fsanitize=undefined -fno-optimize-sibling-calls -fsanitize-address-use-after-scope -fno-omit-frame-pointer -g -O1"
CACHE STRING "Flags used by the C++ compiler during AddressSanitizer builds."
FORCE)

Expand Down
6 changes: 5 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ EXPOSE 7001
VOLUME ["/input", "/index"]

ENV INDEX_PREFIX index
ENV MEMORY_FOR_QUERIES 70
ENV CACHE_MAX_SIZE_GB 30
ENV CACHE_MAX_SIZE_GB_SINGLE_ENTRY 5
ENV CACHE_MAX_NUM_ENTRIES 1000
# Need the shell to get the INDEX_PREFIX envirionment variable
ENTRYPOINT ["/bin/sh", "-c", "exec ServerMain -i \"/index/${INDEX_PREFIX}\" -p 7001 \"$@\"", "--"]
ENTRYPOINT ["/bin/sh", "-c", "exec ServerMain -i \"/index/${INDEX_PREFIX}\" -j 8 -m ${MEMORY_FOR_QUERIES} -c ${CACHE_MAX_SIZE_GB} -e ${CACHE_MAX_SIZE_GB_SINGLE_ENTRY} -k ${CACHE_MAX_NUM_ENTRIES} -p 7001 \"$@\"", "--"]

# docker build -t qlever-<name> .
# # When running with user namespaces you may need to make the index folder accessible
Expand Down
59 changes: 47 additions & 12 deletions src/ServerMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
#include <getopt.h>
#include <stdio.h>
#include <stdlib.h>

#include <iomanip>
#include <iostream>
#include <string>
#include <vector>

#include "engine/Server.h"
#include "global/Constants.h"
#include "util/ReadableNumberFact.h"

using std::cerr;
Expand All @@ -24,16 +26,20 @@ using std::vector;
#define EMPH_OFF "\033[22m"

// Available options.
struct option options[] = {{"help", no_argument, NULL, 'h'},
{"index", required_argument, NULL, 'i'},
{"worker-threads", required_argument, NULL, 'j'},
{"memory-for-queries", required_argument, NULL, 'm'},
{"on-disk-literals", no_argument, NULL, 'l'},
{"port", required_argument, NULL, 'p'},
{"no-patterns", no_argument, NULL, 'P'},
{"no-pattern-trick", no_argument, NULL, 'T'},
{"text", no_argument, NULL, 't'},
{NULL, 0, NULL, 0}};
struct option options[] = {
{"help", no_argument, NULL, 'h'},
{"index", required_argument, NULL, 'i'},
{"worker-threads", required_argument, NULL, 'j'},
{"memory-for-queries", required_argument, NULL, 'm'},
{"cache-max-size-gb", required_argument, NULL, 'c'},
{"cache-max-size-gb-single-entry", required_argument, NULL, 'e'},
{"cache-max-num-entries", required_argument, NULL, 'k'},
{"on-disk-literals", no_argument, NULL, 'l'},
{"port", required_argument, NULL, 'p'},
{"no-patterns", no_argument, NULL, 'P'},
{"no-pattern-trick", no_argument, NULL, 'T'},
{"text", no_argument, NULL, 't'},
{NULL, 0, NULL, 0}};

void printUsage(char* execName) {
std::ios coutState(nullptr);
Expand All @@ -55,6 +61,22 @@ void printUsage(char* execName) {
"query processing and caching. If exceeded, query will return with "
"an error, but the engine will not crash."
<< endl;
cout << " " << std::setw(20) << "c, cache-size-in-gb" << std::setw(1)
<< "Maximum memory size in GB for all cache entries (pinned and "
"non-pinned). Note that the cache is part of the amount of memory "
"limited by --memory-for-queries."
<< endl;
cout << " " << std::setw(20) << "e, cache-max-size-single-element"
<< std::setw(1)
<< "Maximum size in GB for a single cache entry. In other words, "
"results larger than this will never be cached."
<< endl;
cout << " " << std::setw(20) << "k, cache-max-num-values" << std::setw(1)
<< "Maximum number of entries in the cache. If exceeded, remove "
"least-recently used entries from the cache if possible. Note that "
"this condition and the size limit specified via --cache-max-size-gb "
"both have to hold (logical AND)."
<< endl;
cout << " " << std::setw(20) << "no-patterns" << std::setw(1) << " "
<< "Disable the use of patterns. This disables ql:has-predicate."
<< endl;
Expand Down Expand Up @@ -88,11 +110,14 @@ int main(int argc, char** argv) {
bool enablePatternTrick = true;

size_t memLimit = DEFAULT_MEM_FOR_QUERIES_IN_GB;
size_t cacheMaxSizeGB = DEFAULT_CACHE_MAX_SIZE_GB;
size_t cacheMaxSizeGBSingleEntry = DEFAULT_CACHE_MAX_SIZE_GB_SINGLE_ENTRY;
size_t cacheMaxNumEntries = DEFAULT_CACHE_MAX_NUM_ENTRIES;

optind = 1;
// Process command line arguments.
while (true) {
int c = getopt_long(argc, argv, "i:p:j:tauhm:lT", options, NULL);
int c = getopt_long(argc, argv, "i:p:j:tauhm:lc:e:k:T", options, NULL);
if (c == -1) break;
switch (c) {
case 'i':
Expand Down Expand Up @@ -125,6 +150,15 @@ int main(int argc, char** argv) {
"will be ignored for ServerMain. The correct setting for "
"this flag is read directly from the index\n";
break;
case 'c':
cacheMaxSizeGB = atoi(optarg);
break;
case 'e':
cacheMaxSizeGBSingleEntry = atoi(optarg);
break;
case 'k':
cacheMaxNumEntries = atoi(optarg);
break;
default:
cout << endl
<< "! ERROR in processing options (getopt returned '" << c
Expand Down Expand Up @@ -154,7 +188,8 @@ int main(int argc, char** argv) {
cout << "Set locale LC_CTYPE to: " << locale << endl;

try {
Server server(port, numThreads, memLimit * 1 << 30u);
Server server(port, numThreads, memLimit, cacheMaxSizeGB,
cacheMaxSizeGBSingleEntry, cacheMaxNumEntries);
server.initialize(index, text, usePatterns, enablePatternTrick);
server.run();
} catch (const std::exception& e) {
Expand Down
2 changes: 1 addition & 1 deletion src/SparqlEngineMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ int main(int argc, char** argv) {
try {
Engine engine;
Index index;
SubtreeCache cache(NOF_SUBTREES_TO_CACHE);
ConcurrentLruCache cache(DEFAULT_CACHE_MAX_NUM_ENTRIES);
PinnedSizes pinnedSizes;
index.setUsePatterns(usePatterns);
index.setOnDiskLiterals(onDiskLiterals);
Expand Down
4 changes: 3 additions & 1 deletion src/WriteIndexListsMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
#include <getopt.h>
#include <libgen.h>
#include <stdlib.h>

#include <iomanip>
#include <iostream>
#include <string>

#include "engine/QueryPlanner.h"
#include "global/Constants.h"
#include "parser/SparqlParser.h"
#include "util/ReadableNumberFact.h"
#include "util/Timer.h"
Expand Down Expand Up @@ -87,7 +89,7 @@ int main(int argc, char** argv) {
index.dumpAsciiLists(lists, decodeGapsAndFrequency);

Engine engine;
SubtreeCache cache(NOF_SUBTREES_TO_CACHE);
ConcurrentLruCache cache(DEFAULT_CACHE_MAX_NUM_ENTRIES);
PinnedSizes pinnedSizes;
ad_utility::AllocatorWithLimit<Id> allocator{
ad_utility::makeAllocationMemoryLeftThreadsafeObject(
Expand Down
113 changes: 44 additions & 69 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,86 +55,61 @@ shared_ptr<const ResultTable> Operation::getResult(bool isRoot) {
const bool pinChildIndexScanSizes = _executionContext->_pinResult && isRoot;
const bool pinResult =
_executionContext->_pinSubtrees || pinChildIndexScanSizes;
LOG(TRACE) << "Check cache for Operation result" << endl;
LOG(TRACE) << "Using key: \n" << cacheKey << endl;
auto [newResult, existingResult] =
(pinResult)
? cache.tryEmplacePinned(cacheKey, _executionContext->getAllocator())
: cache.tryEmplace(cacheKey, _executionContext->getAllocator());

// When we pin a final result only, we also need to remember the sizes of all
// involved IndexScans with two bound columns. If we don't do this, the query
// planner will otherwise trigger their computation even if it is uneeded
// because the final result can be found in the cache.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// When we pin the final result but no subtrees, we need to remember the sizes of all involved index scans that have only one free variable. Note that these index scans are executed already during query planning because they have to be executed anyway, for any query plan. If we don't remember these sizes here, future queries that take the result from the cache would redo these index scans. Note that we do not need to remember the multiplicity (and distinctness) because the multiplicity for an index scan with a single free variable is always 1.

if (pinChildIndexScanSizes) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename variable to pinFinalResultButNotSubtrees

auto lock = getExecutionContext()->getPinnedSizes().wlock();
forAllDescendants([&lock](QueryExecutionTree* child) {
if (child->getType() == QueryExecutionTree::OperationType::SCAN) {
if (child->getType() == QueryExecutionTree::OperationType::SCAN &&
child->getResultWidth() == 1) {
(*lock)[child->getRootOperation()->asString()] =
child->getSizeEstimate();
}
});
}

if (newResult) {
LOG(TRACE) << "Not in the cache, need to compute result" << endl;
LOG(DEBUG) << "Available memory (in MB) before operation: "
<< (_executionContext->getAllocator().numFreeBytes() >> 20)
<< std::endl;
// Passing the raw pointer here is ok as the result shared_ptr remains
// in scope
try {
computeResult(newResult->_resTable.get());
} catch (const ad_semsearch::AbortException& e) {
// A child Operation was aborted, abort this Operation
// as well. The child already printed
abort(newResult, false);
// Continue unwinding the stack
throw;
} catch (const std::exception& e) {
// We are in the innermost level of the exception, so print
abort(newResult, true);
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException(e);
} catch (...) {
// We are in the innermost level of the exception, so print
abort(newResult, true);
LOG(ERROR) << "WEIRD_EXCEPTION not inheriting from std::exception"
<< endl;
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException("WEIRD_EXCEPTION");
}
timer.stop();
_runtimeInfo.setRows(newResult->_resTable->size());
_runtimeInfo.setCols(getResultWidth());
_runtimeInfo.setDescriptor(getDescriptor());
_runtimeInfo.setColumnNames(getVariableColumns());
try {
auto computeLambda = [this] {
CacheValue val(getExecutionContext()->getAllocator());
computeResult(val._resultTable.get());
return val;
};

_runtimeInfo.setTime(timer.msecs());
_runtimeInfo.setWasCached(false);
// cache the runtime information for the execution as well
newResult->_runtimeInfo = _runtimeInfo;
// Only now we can let other threads access the result
// and runtime information
newResult->_resTable->finish();
return newResult->_resTable;
}
auto result = (pinResult) ? cache.computeOncePinned(cacheKey, computeLambda)
: cache.computeOnce(cacheKey, computeLambda);

existingResult->_resTable->awaitFinished();
if (existingResult->_resTable->status() == ResultTable::ABORTED) {
LOG(ERROR) << "Operation aborted while awaiting result" << endl;
AD_THROW(ad_semsearch::Exception::BAD_QUERY,
"Operation aborted while awaiting result");
timer.stop();
createRuntimeInformation(result, timer.msecs());
return result._resultPointer->_resultTable;
} catch (const ad_semsearch::AbortException& e) {
// A child Operation was aborted, do not print the information again.
throw;
} catch (const ad_utility::WaitedForResultWhichThenFailedException& e) {
LOG(ERROR) << "Waited for a result from another thread which then failed"
<< endl;
LOG(ERROR) << asString();
throw ad_semsearch::AbortException(e);
} catch (const std::exception& e) {
// We are in the innermost level of the exception, so print
LOG(ERROR) << "Aborted Operation:" << endl;
LOG(ERROR) << asString() << endl;
LOG(ERROR) << e.what() << endl;
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException(e);
} catch (...) {
// We are in the innermost level of the exception, so print
LOG(ERROR) << "Aborted Operation:" << endl;
LOG(ERROR) << asString() << endl;
LOG(ERROR)
<< "Unexpected exception that is not a subclass of std::exception"
<< endl;
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException(
"Unexpected expection that is not a subclass of std::exception");
}
timer.stop();
_runtimeInfo = existingResult->_runtimeInfo;
// We need to update column names and descriptor as we may have cached with
// different variable names
_runtimeInfo.setDescriptor(getDescriptor());
_runtimeInfo.setColumnNames(getVariableColumns());
_runtimeInfo.setTime(timer.msecs());
_runtimeInfo.setWasCached(true);
_runtimeInfo.addDetail("original_total_time",
existingResult->_runtimeInfo.getTime());
_runtimeInfo.addDetail("original_operation_time",
existingResult->_runtimeInfo.getOperationTime());
return existingResult->_resTable;
}
51 changes: 32 additions & 19 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,6 @@ class Operation {
// recursively collect all Warnings generated by all descendants
vector<string> collectWarnings() const;

/**
* Abort this Operation. Removes the Operation's result from the cache so
* that it can be retried. The result must be owned meaning only the
* computing thread can abort an Operation. Retrying may succeed for example
* when memory pressure was lowered in the meantime. When print is true the
* Operation is printed to the ERROR LOG
*/
void abort(const shared_ptr<CacheValue>& cachedResult, bool print) {
const std::string opString = asString();
if (print) {
LOG(ERROR) << "Aborted Operation:" << endl;
LOG(ERROR) << opString << endl;
}
// Remove Operation from cache so we may retry it later. Anyone with a live
// pointer will be waiting and register the abort.
_executionContext->getQueryTreeCache().erase(opString);
cachedResult->_resTable->abort();
}

/**
* @return A list of columns on which the result of this operation is sorted.
*/
Expand Down Expand Up @@ -143,6 +124,38 @@ class Operation {
//! Computes both, an EntityList and a HitList.
virtual void computeResult(ResultTable* result) = 0;

// Create and store the complete runtime Information for this operation.
// All data that was previously stored in the runtime information will be
// deleted.
virtual void createRuntimeInformation(
const ConcurrentLruCache ::ResultAndCacheStatus& resultAndCacheStatus,
size_t timeInMilliseconds) final {
// reset
_runtimeInfo = RuntimeInformation();
// the column names might differ between a cached result and this operation,
// so we have to take the local ones.
_runtimeInfo.setColumnNames(getVariableColumns());

_runtimeInfo.setCols(getResultWidth());
_runtimeInfo.setDescriptor(getDescriptor());

// Only the result that was actually computed (or read from cache) knows
// the correct information about the children computations.
_runtimeInfo.children() =
resultAndCacheStatus._resultPointer->_runtimeInfo.children();

_runtimeInfo.setTime(timeInMilliseconds);
_runtimeInfo.setRows(
resultAndCacheStatus._resultPointer->_resultTable->size());
_runtimeInfo.setWasCached(resultAndCacheStatus._wasCached);
_runtimeInfo.addDetail(
"original_total_time",
resultAndCacheStatus._resultPointer->_runtimeInfo.getTime());
_runtimeInfo.addDetail(
"original_operation_time",
resultAndCacheStatus._resultPointer->_runtimeInfo.getOperationTime());
}

vector<size_t> _resultSortedColumns;
RuntimeInformation _runtimeInfo;

Expand Down