Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Caching #288

Merged
merged 6 commits into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/SparqlEngineMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,15 @@ int main(int argc, char** argv) {
try {
Engine engine;
Index index;
SubtreeCache cache(NOF_SUBTREES_TO_CACHE);
index.setUsePatterns(usePatterns);
index.setOnDiskLiterals(onDiskLiterals);
index.createFromOnDiskIndex(indexName);
if (text) {
index.addTextFromOnDiskIndex();
}

QueryExecutionContext qec(index, engine);
QueryExecutionContext qec(index, engine, &cache);
if (costFactosFileName.size() > 0) {
qec.readCostFactorsFromTSVFile(costFactosFileName);
}
Expand Down
3 changes: 2 additions & 1 deletion src/WriteIndexListsMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ int main(int argc, char** argv) {
index.dumpAsciiLists(lists, decodeGapsAndFrequency);

Engine engine;
QueryExecutionContext qec(index, engine);
SubtreeCache cache(NOF_SUBTREES_TO_CACHE);
lehmann-4178656ch marked this conversation as resolved.
Show resolved Hide resolved
QueryExecutionContext qec(index, engine, &cache);
ParsedQuery q;
if (!freebase) {
q = SparqlParser("SELECT ?x WHERE {?x <is-a> <Scientist>}").parse();
Expand Down
99 changes: 89 additions & 10 deletions src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,97 @@ vector<size_t> IndexScan::resultSortedOn() const {
// _____________________________________________________________________________
ad_utility::HashMap<string, size_t> IndexScan::getVariableColumns() const {
ad_utility::HashMap<string, size_t> res;
size_t colIdx = 0;
if (_subject[0] == '?') {
res[_subject] = colIdx++;
}
if (_predicate[0] == '?') {
res[_predicate] = colIdx++;
}
size_t col = 0;

switch (_type) {
case SPO_FREE_P:
case FULL_INDEX_SCAN_SPO:
if (_subject[0] == '?') {
res[_subject] = col++;
}
if (_predicate[0] == '?') {
res[_predicate] = col++;
}

if (_object[0] == '?') {
res[_object] = col++;
}
return res;
case SOP_FREE_O:
case SOP_BOUND_O:
case FULL_INDEX_SCAN_SOP:
if (_subject[0] == '?') {
res[_subject] = col++;
}

if (_object[0] == '?') {
res[_object] = col++;
}

if (_predicate[0] == '?') {
res[_predicate] = col++;
}
return res;
case PSO_BOUND_S:
case PSO_FREE_S:
case FULL_INDEX_SCAN_PSO:
if (_predicate[0] == '?') {
res[_predicate] = col++;
}
if (_subject[0] == '?') {
res[_subject] = col++;
}

if (_object[0] == '?') {
res[_object] = col++;
}
return res;
case POS_BOUND_O:
case POS_FREE_O:
case FULL_INDEX_SCAN_POS:
if (_predicate[0] == '?') {
res[_predicate] = col++;
}

if (_object[0] == '?') {
res[_object] = col++;
}

if (_subject[0] == '?') {
res[_subject] = col++;
}
return res;
case OPS_FREE_P:
case FULL_INDEX_SCAN_OPS:
if (_object[0] == '?') {
res[_object] = col++;
}

if (_object[0] == '?') {
res[_object] = colIdx++;
if (_predicate[0] == '?') {
res[_predicate] = col++;
}

if (_subject[0] == '?') {
res[_subject] = col++;
}
return res;
case OSP_FREE_S:
case FULL_INDEX_SCAN_OSP:
if (_object[0] == '?') {
res[_object] = col++;
}

if (_subject[0] == '?') {
res[_subject] = col++;
}

if (_predicate[0] == '?') {
res[_predicate] = col++;
}
return res;
default:
AD_THROW(ad_semsearch::Exception::CHECK_FAILED, "Should be unreachable.");
}
return res;
}
// _____________________________________________________________________________
void IndexScan::computeResult(ResultTable* result) {
Expand Down
62 changes: 38 additions & 24 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ using std::shared_ptr;
class Operation {
public:
// Default Constructor.
Operation() : _executionContext(NULL), _hasComputedSortColumns(false) {}
Operation() : _executionContext(nullptr), _hasComputedSortColumns(false) {}

// Typical Constructor.
explicit Operation(QueryExecutionContext* executionContext)
Expand All @@ -40,10 +40,14 @@ class Operation {
shared_ptr<const ResultTable> getResult() {
ad_utility::Timer timer;
timer.start();
auto& cache = _executionContext->getQueryTreeCache();
const string cacheKey = asString();
const bool pinResult = _executionContext->pin;
LOG(TRACE) << "Check cache for Operation result" << endl;
LOG(TRACE) << "Using key: \n" << asString() << endl;
auto [newResult, existingResult] =
_executionContext->getQueryTreeCache().tryEmplace(asString());
LOG(TRACE) << "Using key: \n" << cacheKey << endl;
auto [newResult, existingResult] = (pinResult)
? cache.tryEmplacePinned(cacheKey)
: cache.tryEmplace(cacheKey);

if (newResult) {
LOG(TRACE) << "Not in the cache, need to compute result" << endl;
Expand All @@ -52,28 +56,22 @@ class Operation {
try {
computeResult(newResult->_resTable.get());
} catch (const ad_semsearch::AbortException& e) {
newResult->_resTable->abort();
// AbortExceptions have already been printed simply rethrow to
// unwind the callstack until the whole query is aborted
// A child Operation was aborted, abort this Operation
// as well. The child already printed
abort(newResult, false);
// Continue unwinding the stack
throw;
} catch (const std::exception& e) {
// Only print the Operation at the innermost (original) failure
// then "rethrow" as special ad_semsearch::AbortException
LOG(ERROR) << "Failed to compute Operation result for:" << endl;
LOG(ERROR) << asString() << endl;
LOG(ERROR) << e.what() << endl;
newResult->_resTable->abort();
// We are in the innermost level of the exception, so print
abort(newResult, true);
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException(e);
} catch (...) {
// Only print the Operation at the innermost (original) failure
// then create not so weird AbortException
LOG(ERROR) << "Failed to compute Operation result for:" << endl;
LOG(ERROR) << asString() << endl;
// We are in the innermost level of the exception, so print
abort(newResult, true);
LOG(ERROR) << "WEIRD_EXCEPTION not inheriting from std::exception"
<< endl;
newResult->_resTable->abort();
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException("WEIRD_EXCEPTION");
Expand All @@ -93,11 +91,12 @@ class Operation {
newResult->_resTable->finish();
return newResult->_resTable;
}

existingResult->_resTable->awaitFinished();
if (existingResult->_resTable->status() == ResultTable::ABORTED) {
LOG(ERROR) << "Result in the cache was aborted" << endl;
LOG(ERROR) << "Operation aborted while awaiting result" << endl;
AD_THROW(ad_semsearch::Exception::BAD_QUERY,
"Operation was found aborted in the cache");
"Operation aborted while awaiting result");
}
timer.stop();
_runtimeInfo = existingResult->_runtimeInfo;
Expand All @@ -114,9 +113,23 @@ class Operation {
return existingResult->_resTable;
}

// Set the QueryExecutionContext for this particular element.
void setQueryExecutionContext(QueryExecutionContext* executionContext) {
_executionContext = executionContext;
/**
* Abort this Operation. Removes the Operation's result from the cache so
* that it can be retried. The result must be owned meaning only the
* computing thread can abort an Operation. Retrying may succeed for example
* when memory pressure was lowered in the meantime. When print is true the
* Operation is printed to the ERROR LOG
*/
void abort(const shared_ptr<CacheValue>& cachedResult, bool print) {
const std::string opString = asString();
if (print) {
LOG(ERROR) << "Aborted Operation:" << endl;
LOG(ERROR) << opString << endl;
}
// Remove Operation from cache so we may retry it later. Anyone with a live
// pointer will be waiting and register the abort.
_executionContext->getQueryTreeCache().erase(opString);
cachedResult->_resTable->abort();
}

/**
Expand Down Expand Up @@ -181,6 +194,7 @@ class Operation {
virtual void computeResult(ResultTable* result) = 0;

vector<size_t> _resultSortedColumns;
bool _hasComputedSortColumns;
RuntimeInformation _runtimeInfo;

bool _hasComputedSortColumns;
};
15 changes: 10 additions & 5 deletions src/engine/QueryExecutionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,22 @@ typedef ad_utility::LRUCache<string, CacheValue> SubtreeCache;
// Holds references to index and engine, implements caching.
class QueryExecutionContext {
public:
QueryExecutionContext(const Index& index, const Engine& engine)
: _subtreeCache(NOF_SUBTREES_TO_CACHE),
QueryExecutionContext(const Index& index, const Engine& engine,
SubtreeCache* const cache,
const bool pinSubtrees = false)
: pin(pinSubtrees),
_index(index),
_engine(engine),
_subtreeCache(cache),
_costFactors() {}

SubtreeCache& getQueryTreeCache() { return _subtreeCache; }
SubtreeCache& getQueryTreeCache() { return *_subtreeCache; }

const Engine& getEngine() const { return _engine; }

const Index& getIndex() const { return _index; }

void clearCache() { _subtreeCache.clear(); }
void clearCache() { getQueryTreeCache().clear(); }

void readCostFactorsFromTSVFile(const string& fileName) {
_costFactors.readFromFile(fileName);
Expand All @@ -54,9 +57,11 @@ class QueryExecutionContext {
return _costFactors.getCostFactor(key);
};

const bool pin;

private:
SubtreeCache _subtreeCache;
const Index& _index;
const Engine& _engine;
SubtreeCache* const _subtreeCache;
QueryPlanningCostFactors _costFactors;
};
2 changes: 1 addition & 1 deletion src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
using std::string;

// _____________________________________________________________________________
QueryExecutionTree::QueryExecutionTree(QueryExecutionContext* qec)
QueryExecutionTree::QueryExecutionTree(QueryExecutionContext* const qec)
: _qec(qec),
_variableColumnMap(),
_rootOperation(nullptr),
Expand Down
2 changes: 1 addition & 1 deletion src/engine/QueryExecutionTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ using std::string;
// needed to solve a query.
class QueryExecutionTree {
public:
explicit QueryExecutionTree(QueryExecutionContext* qec);
explicit QueryExecutionTree(QueryExecutionContext* const qec);

enum OperationType {
UNDEFINED = 0,
Expand Down
28 changes: 14 additions & 14 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,16 @@ void Server::run() {
LOG(ERROR) << "Cannot start an uninitialized server!" << std::endl;
exit(1);
}
QueryExecutionContext qec(_index, _engine);
std::vector<std::thread> threads;
for (int i = 0; i < _numThreads; ++i) {
threads.emplace_back(&Server::runAcceptLoop, this, &qec);
threads.emplace_back(&Server::runAcceptLoop, this);
}
for (std::thread& worker : threads) {
worker.join();
}
}
// _____________________________________________________________________________
void Server::runAcceptLoop(QueryExecutionContext* qec) {
void Server::runAcceptLoop() {
// Loop and wait for queries. Run forever, for now.
while (true) {
// Wait for new query
Expand All @@ -85,19 +84,19 @@ void Server::runAcceptLoop(QueryExecutionContext* qec) {
}
client.setKeepAlive(true);
LOG(INFO) << "Incoming connection, processing..." << std::endl;
process(&client, qec);
process(&client);
client.close();
}
}

// _____________________________________________________________________________
void Server::process(Socket* client, QueryExecutionContext* qec) const {
string response;
string query;
void Server::process(Socket* client) {
string contentType;
LOG(DEBUG) << "Waiting for receive call to complete." << endl;
string request;
string response;
string headers;
string query;
client->getHTTPRequest(request, headers);
LOG(DEBUG) << "Got request from client with size: " << request.size()
<< " and headers with total size: " << headers.size() << endl;
Expand Down Expand Up @@ -147,7 +146,7 @@ void Server::process(Socket* client, QueryExecutionContext* qec) const {
}

if (ad_utility::getLowercase(params["cmd"]) == "clearcache") {
qec->clearCache();
_cache.clear();
}
auto it = params.find("send");
size_t maxSend = MAX_NOF_ROWS_IN_RESULT;
Expand All @@ -162,15 +161,16 @@ void Server::process(Socket* client, QueryExecutionContext* qec) const {
exit(0);
}
#endif
const bool pinSubtrees =
ad_utility::getLowercase(params["pinsubtrees"]) == "true";
query = createQueryFromHttpParams(params);
LOG(INFO) << "Query:\n" << query << '\n';
LOG(INFO) << "Query" << ((pinSubtrees) ? " (Cache pinned): " : ": ")
<< query << '\n';
ParsedQuery pq = SparqlParser(query).parse();
pq.expandPrefixes();

// QueryGraph qg(qec);
// qg.createFromParsedQuery(pq);
// const QueryExecutionTree& qet = qg.getExecutionTree();
QueryPlanner qp(qec);
QueryExecutionContext qec(_index, _engine, &_cache, pinSubtrees);
QueryPlanner qp(&qec);
qp.setEnablePatternTrick(_enablePatternTrick);
QueryExecutionTree qet = qp.createExecutionTree(pq);
LOG(TRACE) << qet.asString() << std::endl;
Expand All @@ -183,7 +183,7 @@ void Server::process(Socket* client, QueryExecutionContext* qec) const {
"Content-Disposition: attachment;filename=export.csv";
} else if (ad_utility::getLowercase(params["action"]) == "tsv_export") {
// TSV export
response = composeResponseSepValues(pq, qet, '\t');
string response = composeResponseSepValues(pq, qet, '\t');
contentType =
"text/tsv\r\n"
"Content-Disposition: attachment;filename=export.tsv";
Expand Down