Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runtime Parameters which can be set via the HTTP api #527

Merged
merged 12 commits into from
Dec 22, 2021
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ configure_file(src/web/script.js script.js)
add_executable(IndexBuilderMain src/index/IndexBuilderMain.cpp)
target_link_libraries(IndexBuilderMain index ${CMAKE_THREAD_LIBS_INIT})

add_executable(CreatePatternsMain src/index/CreatePatternsMain.cpp)
add_executable(CreatePatternsMain src/index/CreatePatternsMain.cpp src/util/ConstexprSmallString.h)
target_link_libraries(CreatePatternsMain index ${CMAKE_THREAD_LIBS_INIT})

add_executable(SparqlEngineMain src/SparqlEngineMain.cpp)
Expand Down
12 changes: 4 additions & 8 deletions src/ServerMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ int main(int argc, char** argv) {
bool enablePatternTrick = true;

size_t memLimit = DEFAULT_MEM_FOR_QUERIES_IN_GB;
size_t cacheMaxSizeGB = DEFAULT_CACHE_MAX_SIZE_GB;
size_t cacheMaxSizeGBSingleEntry = DEFAULT_CACHE_MAX_SIZE_GB_SINGLE_ENTRY;
size_t cacheMaxNumEntries = DEFAULT_CACHE_MAX_NUM_ENTRIES;

optind = 1;
// Process command line arguments.
Expand Down Expand Up @@ -151,13 +148,13 @@ int main(int argc, char** argv) {
"this flag is read directly from the index\n";
break;
case 'c':
cacheMaxSizeGB = atoi(optarg);
RuntimeParameters().set<"cache-max-size-gb">(atoi(optarg));
break;
case 'e':
cacheMaxSizeGBSingleEntry = atoi(optarg);
RuntimeParameters().set<"cache-max-size-gb-single-entry">(atoi(optarg));
break;
case 'k':
cacheMaxNumEntries = atoi(optarg);
RuntimeParameters().set<"cache-max-num-entries">(atoi(optarg));
break;
default:
cout << endl
Expand Down Expand Up @@ -188,8 +185,7 @@ int main(int argc, char** argv) {
cout << "Set locale LC_CTYPE to: " << locale << endl;

try {
Server server(port, numThreads, memLimit, cacheMaxSizeGB,
cacheMaxSizeGBSingleEntry, cacheMaxNumEntries);
Server server(port, numThreads, memLimit);
server.initialize(index, text, usePatterns, enablePatternTrick);
server.run();
} catch (const std::exception& e) {
Expand Down
2 changes: 1 addition & 1 deletion src/SparqlEngineMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ int main(int argc, char** argv) {
try {
Engine engine;
Index index;
QueryResultCache cache{DEFAULT_CACHE_MAX_NUM_ENTRIES};
QueryResultCache cache{1000u};
index.setUsePatterns(usePatterns);
index.setOnDiskLiterals(onDiskLiterals);
index.createFromOnDiskIndex(indexName);
Expand Down
2 changes: 1 addition & 1 deletion src/WriteIndexListsMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ int main(int argc, char** argv) {
index.dumpAsciiLists(lists, decodeGapsAndFrequency);

Engine engine;
QueryResultCache cache{DEFAULT_CACHE_MAX_NUM_ENTRIES};
QueryResultCache cache{1000u};
ad_utility::AllocatorWithLimit<Id> allocator{
ad_utility::makeAllocationMemoryLeftThreadsafeObject(
DEFAULT_MEM_FOR_QUERIES_IN_GB)};
Expand Down
4 changes: 3 additions & 1 deletion src/engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ add_library(engine
IdTable.h
../util/Random.h
Minus.h Minus.cpp
ResultType.h)
ResultType.h
../util/Parameters.h)


target_link_libraries(engine index parser sparqlExpressions httpServer SortPerformanceEstimator absl::flat_hash_set ${ICU_LIBRARIES})
6 changes: 4 additions & 2 deletions src/engine/OrderBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,18 @@ void OrderBy::computeResult(ResultTable* result) {
double remainingSecs =
static_cast<double>(_timeoutTimer->wlock()->remainingMicroseconds()) /
1'000'000;
auto sortEstimateCancellationFactor =
RuntimeParameters().get<"sort-estimate-cancellation-factor">();
if (getExecutionContext()
->getSortPerformanceEstimator()
.estimatedSortTimeInSeconds(subRes->size(), subRes->width()) >
remainingSecs * SORT_ESTIMATE_CANCELLATION_FACTOR) {
remainingSecs * sortEstimateCancellationFactor) {
// The estimated time for this sort is much larger than the actually
// remaining time, cancel this operation
throw ad_utility::TimeoutException(
"OrderBy operation was canceled, because time estimate exceeded "
"remaining time by a factor of " +
std::to_string(SORT_ESTIMATE_CANCELLATION_FACTOR));
std::to_string(sortEstimateCancellationFactor));
}

RuntimeInformation& runtimeInfo = getRuntimeInfo();
Expand Down
38 changes: 28 additions & 10 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,31 +90,49 @@ boost::asio::awaitable<void> Server::process(
LOG(INFO) << "Supplying index stats..." << std::endl;
auto response = createJsonResponse(composeStatsJson(), request);
co_return co_await sendWithCors(std::move(response));
} else if (cmd == "cachestats") {
} else if (cmd == "cache-stats") {
LOG(INFO) << "Supplying cache stats..." << std::endl;
auto response = createJsonResponse(composeCacheStatsJson(), request);
co_return co_await sendWithCors(std::move(response));
} else if (cmd == "clearcache") {
} else if (cmd == "clear-cache") {
LOG(INFO) << "Clearing the cache, unpinned elements only" << std::endl;
_cache.clearUnpinnedOnly();
responseFromCommand = createOkResponse(
"Successfully cleared the cache (unpinned elements only)", request,
ad_utility::MediaType::textPlain);
} else if (cmd == "clearcachecomplete") {
responseFromCommand =
createJsonResponse(composeCacheStatsJson(), request);
} else if (cmd == "clear-cache-complete") {
LOG(INFO) << "Clearing the cache completely, including unpinned elements"
<< std::endl;
_cache.clearAll();
responseFromCommand = createOkResponse(
"Successfully cleared the cache (including the pinned elements)",
request, ad_utility::MediaType::textPlain);
responseFromCommand =
createJsonResponse(composeCacheStatsJson(), request);
} else if (cmd == "get-settings") {
LOG(INFO) << "Supplying settings..." << std::endl;
json settingsJson = RuntimeParameters().toMap();
co_await sendWithCors(createJsonResponse(settingsJson, request));
co_return;
} else {
co_await sendWithCors(createBadRequestResponse(
R"(unknown value for query paramter "cmd" : ")" + cmd + '\"',
R"(Unknown value for query parameter "cmd": ")" + cmd + '\"',
request));
co_return;
}
}

// TODO<joka921> Restrict this access by a token.
// TODO<joka921> Warn about unknown parameters
bool anyParamWasChanged = false;
for (const auto& [key, value] : params) {
if (RuntimeParameters().getKeys().contains(key)) {
RuntimeParameters().set(key, value);
anyParamWasChanged = true;
}
}

if (anyParamWasChanged) {
json settingsJson = RuntimeParameters().toMap();
responseFromCommand = createJsonResponse(settingsJson, request);
}

if (params.contains("query")) {
if (params.at("query").empty()) {
co_return co_await sendWithCors(createBadRequestResponse(
Expand Down
31 changes: 24 additions & 7 deletions src/engine/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@ using std::vector;

using ad_utility::Socket;

//! The HTTP Sever used.
//! The HTTP Server used.
class Server {
private:
public:
explicit Server(const int port, const int numThreads, size_t maxMemGB,
size_t cacheMaxSizeGB, size_t cacheMaxSizeGBSingleEntry,
size_t cacheMaxNumEntries)
explicit Server(const int port, const int numThreads, size_t maxMemGB)
: _numThreads(numThreads),
_port(port),
_cache(cacheMaxNumEntries, cacheMaxSizeGB * (1ull << 30u) / sizeof(Id),
cacheMaxSizeGBSingleEntry * (1ull << 30u) / sizeof(Id)),
_allocator{ad_utility::makeAllocationMemoryLeftThreadsafeObject(
maxMemGB * (1ull << 30u)),
[this](size_t numBytesToAllocate) {
Expand All @@ -46,7 +43,27 @@ class Server {
_sortPerformanceEstimator(),
_index(),
_engine(),
_initialized(false) {}
_initialized(false) {
// TODO<joka921> Write a strong type for KB, MB, GB etc and use it
// in the cache and the memory limit
// Convert a number of gigabytes to the number of Ids that find in that
// amount of memory.
auto toNumIds = [](size_t gigabytes) -> size_t {
return gigabytes * (1ull << 30u) / sizeof(Id);
};
// This also directly triggers the update functions and propagates the
// values of the parameters to the cache.
RuntimeParameters().setOnUpdateAction<"cache-max-num-entries">(
[this](size_t newValue) { _cache.setMaxNumEntries(newValue); });
RuntimeParameters().setOnUpdateAction<"cache-max-size-gb">(
[this, toNumIds](size_t newValue) {
_cache.setMaxSize(toNumIds(newValue));
});
RuntimeParameters().setOnUpdateAction<"cache-max-size-gb-single-entry">(
[this, toNumIds](size_t newValue) {
_cache.setMaxSizeSingleEntry(toNumIds(newValue));
});
}

virtual ~Server() = default;

Expand Down
6 changes: 4 additions & 2 deletions src/engine/Sort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,18 @@ void Sort::computeResult(ResultTable* result) {
double remainingSecs =
static_cast<double>(_timeoutTimer->wlock()->remainingMicroseconds()) /
1'000'000;
auto sortEstimateCancellationFactor =
RuntimeParameters().get<"sort-estimate-cancellation-factor">();
if (getExecutionContext()
->getSortPerformanceEstimator()
.estimatedSortTimeInSeconds(subRes->size(), subRes->width()) >
remainingSecs * SORT_ESTIMATE_CANCELLATION_FACTOR) {
remainingSecs * sortEstimateCancellationFactor) {
// The estimated time for this sort is much larger than the actually
// remaining time, cancel this operation
throw ad_utility::TimeoutException(
"Sort operation was canceled, because time estimate exceeded "
"remaining time by a factor of " +
std::to_string(SORT_ESTIMATE_CANCELLATION_FACTOR));
std::to_string(sortEstimateCancellationFactor));
}

RuntimeInformation& runtimeInfo = getRuntimeInfo();
Expand Down
22 changes: 15 additions & 7 deletions src/global/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
#include <stdexcept>
#include <string>

#include "../util/Parameters.h"

static const size_t STXXL_MEMORY_TO_USE = 1024L * 1024L * 1024L * 2L;
static const size_t STXXL_DISK_SIZE_INDEX_BUILDER = 1000 * 1000;
static const size_t STXXL_DISK_SIZE_INDEX_TEST = 10;

static constexpr size_t DEFAULT_MEM_FOR_QUERIES_IN_GB = 4;

static const size_t DEFAULT_CACHE_MAX_NUM_ENTRIES = 1000;
static const size_t DEFAULT_CACHE_MAX_SIZE_GB = 30;
static const size_t DEFAULT_CACHE_MAX_SIZE_GB_SINGLE_ENTRY = 5;
static const size_t MAX_NOF_ROWS_IN_RESULT = 100000;
static const size_t MIN_WORD_PREFIX_SIZE = 4;
static const char PREFIX_CHAR = '*';
Expand Down Expand Up @@ -114,10 +113,6 @@ static constexpr size_t NUM_OPERATIONS_BETWEEN_TIMEOUT_CHECKS = 32000;
// operation
static constexpr size_t NUM_OPERATIONS_HASHSET_LOOKUP = 32;

// If the time estimate for a sort operation is larger by more than this factor
// than the remaining time, then the sort is canceled with a timeout exception
static constexpr double SORT_ESTIMATE_CANCELLATION_FACTOR = 3.0;

// When initializing a sort performance estimator, at most this percentage of
// the number of triples in the index is being sorted at once.
static constexpr size_t PERCENTAGE_OF_TRIPLES_FOR_SORT_ESTIMATE = 5;
Expand All @@ -126,6 +121,19 @@ static constexpr size_t PERCENTAGE_OF_TRIPLES_FOR_SORT_ESTIMATE = 5;
// times this factor.
static constexpr double MAKE_ROOM_SLACK_FACTOR = 2;

inline auto& RuntimeParameters() {
using ad_utility::detail::parameterShortNames::Double;
using ad_utility::detail::parameterShortNames::SizeT;
static ad_utility::Parameters params{
// If the time estimate for a sort operation is larger by more than this
// factor than the remaining time, then the sort is canceled with a
// timeout exception.
Double<"sort-estimate-cancellation-factor">{3.0},
SizeT<"cache-max-num-entries">{1000}, SizeT<"cache-max-size-gb">{30},
SizeT<"cache-max-size-gb-single-entry">{5}};
return params;
}

#ifdef _PARALLEL_SORT
static constexpr bool USE_PARALLEL_SORT = true;
#include <parallel/algorithm>
Expand Down
18 changes: 16 additions & 2 deletions src/util/Cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,26 @@ class FlexibleCache {
return valPtr;
}

//! Set or change the capacity.
//! Set or change the maximum number of entries
void setMaxNumEntries(const size_t maxNumEntries) {
_maxNumEntries = maxNumEntries;
makeRoomIfFits(0);
}

//! Set or change the maximum total size of the cache
void setMaxSize(const size_t maxSize) {
_maxSize = maxSize;
makeRoomIfFits(0);
}

//! Set or change the maximum size of a single Entry
void setMaxSizeSingleEntry(const size_t maxSizeSingleEntry) {
_maxSizeSingleEntry = maxSizeSingleEntry;
// We currently do not delete entries that are now too big
// after the update.
// TODO<joka921>:: implement this functionality
}

//! Checks if there is an entry with the given key.
bool contains(const Key& key) const {
return _pinnedMap.count(key) > 0 || _accessMap.count(key) > 0;
Expand Down Expand Up @@ -456,7 +470,7 @@ class HeapBasedLRUCache
detail::timeUpdater, detail::timeAsScore, ValueSizeGetter>;

public:
explicit HeapBasedLRUCache(size_t capacityNumEls,
explicit HeapBasedLRUCache(size_t capacityNumEls = size_t_max,
size_t capacitySize = size_t_max,
size_t maxSizeSingleEl = size_t_max)
: Base(capacityNumEls, capacitySize, maxSizeSingleEl, std::less<>(),
Expand Down
11 changes: 11 additions & 0 deletions src/util/ConcurrentCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,17 @@ class ConcurrentCache {
return _cacheAndInProgressMap.wlock()->_cache[k];
}

// These functions set the different capacity/size settings of the cache
void setMaxSize(size_t maxSize) {
_cacheAndInProgressMap.wlock()->_cache.setMaxSize(maxSize);
}
void setMaxNumEntries(size_t maxNumEntries) {
_cacheAndInProgressMap.wlock()->_cache.setMaxNumEntries(maxNumEntries);
}
void setMaxSizeSingleEntry(size_t maxSize) {
_cacheAndInProgressMap.wlock()->_cache.setMaxSizeSingleEntry(maxSize);
}

private:
using ResultInProgress = ConcurrentCacheDetail::ResultInProgress<Value>;

Expand Down