Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Caching #282

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/SparqlEngineMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,15 @@ int main(int argc, char** argv) {
try {
Engine engine;
Index index;
SubtreeCache cache(NOF_SUBTREES_TO_CACHE);
index.setUsePatterns(usePatterns);
index.setOnDiskLiterals(onDiskLiterals);
index.createFromOnDiskIndex(indexName);
if (text) {
index.addTextFromOnDiskIndex();
}

QueryExecutionContext qec(index, engine);
QueryExecutionContext qec(index, engine, &cache);
if (costFactosFileName.size() > 0) {
qec.readCostFactorsFromTSVFile(costFactosFileName);
}
Expand Down Expand Up @@ -201,7 +202,7 @@ void processQuery(QueryExecutionContext& qec, const string& query) {
auto qet = qp.createExecutionTree(pq);
timer.stop();
LOG(INFO) << "Time to create Execution Tree: " << timer.msecs() << "ms\n";
LOG(INFO) << "Execution Tree: " << qet.asString() << "ms\n";
LOG(INFO) << "Execution Tree: " << qet->asString() << "ms\n";
size_t limit = MAX_NOF_ROWS_IN_RESULT;
size_t offset = 0;
if (pq._limit.size() > 0) {
Expand All @@ -210,10 +211,10 @@ void processQuery(QueryExecutionContext& qec, const string& query) {
if (pq._offset.size() > 0) {
offset = static_cast<size_t>(atol(pq._offset.c_str()));
}
qet.writeResultToStream(cout, pq._selectedVariables, limit, offset);
qet->writeResultToStream(cout, pq._selectedVariables, limit, offset);
t.stop();
std::cout << "\nDone. Time: " << t.usecs() / 1000.0 << " ms\n";
size_t numMatches = qet.getResult()->size();
size_t numMatches = qet->getResult()->size();
std::cout << "\nNumber of matches (no limit): " << numMatches << "\n";
size_t effectiveLimit =
atoi(pq._limit.c_str()) > 0 ? atoi(pq._limit.c_str()) : numMatches;
Expand Down
5 changes: 3 additions & 2 deletions src/WriteIndexListsMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ int main(int argc, char** argv) {
index.dumpAsciiLists(lists, decodeGapsAndFrequency);

Engine engine;
QueryExecutionContext qec(index, engine);
SubtreeCache cache(NOF_SUBTREES_TO_CACHE);
QueryExecutionContext qec(index, engine, &cache);
ParsedQuery q;
if (!freebase) {
q = SparqlParser("SELECT ?x WHERE {?x <is-a> <Scientist>}").parse();
Expand All @@ -100,7 +101,7 @@ int main(int argc, char** argv) {
}
QueryPlanner queryPlanner(&qec);
auto qet = queryPlanner.createExecutionTree(q);
const auto res = qet.getResult();
const auto res = qet->getResult();
AD_CHECK(res->size() > 0);
AD_CHECK(res->_data.cols() == 1);
string personlistFile = indexName + ".list.scientists";
Expand Down
99 changes: 89 additions & 10 deletions src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,97 @@ vector<size_t> IndexScan::resultSortedOn() const {
// _____________________________________________________________________________
ad_utility::HashMap<string, size_t> IndexScan::getVariableColumns() const {
ad_utility::HashMap<string, size_t> res;
size_t colIdx = 0;
if (_subject[0] == '?') {
res[_subject] = colIdx++;
}
if (_predicate[0] == '?') {
res[_predicate] = colIdx++;
}
size_t col = 0;

switch (_type) {
case SPO_FREE_P:
case FULL_INDEX_SCAN_SPO:
if (_subject[0] == '?') {
res[_subject] = col++;
}
if (_predicate[0] == '?') {
res[_predicate] = col++;
}

if (_object[0] == '?') {
res[_object] = col++;
}
return res;
case SOP_FREE_O:
case SOP_BOUND_O:
case FULL_INDEX_SCAN_SOP:
if (_subject[0] == '?') {
res[_subject] = col++;
}

if (_object[0] == '?') {
res[_object] = col++;
}

if (_predicate[0] == '?') {
res[_predicate] = col++;
}
return res;
case PSO_BOUND_S:
case PSO_FREE_S:
case FULL_INDEX_SCAN_PSO:
if (_predicate[0] == '?') {
res[_predicate] = col++;
}
if (_subject[0] == '?') {
res[_subject] = col++;
}

if (_object[0] == '?') {
res[_object] = col++;
}
return res;
case POS_BOUND_O:
case POS_FREE_O:
case FULL_INDEX_SCAN_POS:
if (_predicate[0] == '?') {
res[_predicate] = col++;
}

if (_object[0] == '?') {
res[_object] = col++;
}

if (_subject[0] == '?') {
res[_subject] = col++;
}
return res;
case OPS_FREE_P:
case FULL_INDEX_SCAN_OPS:
if (_object[0] == '?') {
res[_object] = col++;
}

if (_object[0] == '?') {
res[_object] = colIdx++;
if (_predicate[0] == '?') {
res[_predicate] = col++;
}

if (_subject[0] == '?') {
res[_subject] = col++;
}
return res;
case OSP_FREE_S:
case FULL_INDEX_SCAN_OSP:
if (_object[0] == '?') {
res[_object] = col++;
}

if (_subject[0] == '?') {
res[_subject] = col++;
}

if (_predicate[0] == '?') {
res[_predicate] = col++;
}
return res;
default:
AD_THROW(ad_semsearch::Exception::CHECK_FAILED, "Should be unreachable.");
}
return res;
}
// _____________________________________________________________________________
void IndexScan::computeResult(ResultTable* result) {
Expand Down
66 changes: 38 additions & 28 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ using std::shared_ptr;
class Operation {
public:
// Default Constructor.
Operation() : _executionContext(NULL), _hasComputedSortColumns(false) {}
Operation() : _executionContext(nullptr), _hasComputedSortColumns(false) {}

// Typical Constructor.
explicit Operation(QueryExecutionContext* executionContext)
Expand All @@ -40,10 +40,14 @@ class Operation {
shared_ptr<const ResultTable> getResult() {
ad_utility::Timer timer;
timer.start();
auto& cache = _executionContext->getQueryTreeCache();
const string cacheKey = asString();
const bool pinResult = _executionContext->pin;
LOG(TRACE) << "Check cache for Operation result" << endl;
LOG(TRACE) << "Using key: \n" << asString() << endl;
auto [newResult, existingResult] =
_executionContext->getQueryTreeCache().tryEmplace(asString());
LOG(TRACE) << "Using key: \n" << cacheKey << endl;
auto [newResult, existingResult] = (pinResult)
? cache.tryEmplacePinned(cacheKey)
: cache.tryEmplace(cacheKey);

if (newResult) {
LOG(TRACE) << "Not in the cache, need to compute result" << endl;
Expand All @@ -52,28 +56,22 @@ class Operation {
try {
computeResult(newResult->_resTable.get());
} catch (const ad_semsearch::AbortException& e) {
newResult->_resTable->abort();
// AbortExceptions have already been printed simply rethrow to
// unwind the callstack until the whole query is aborted
// A child Operation was aborted, abort this Operation
// as well. The child already printed
abort(newResult, false);
// Continue unwinding the stack
throw;
} catch (const std::exception& e) {
// Only print the Operation at the innermost (original) failure
// then "rethrow" as special ad_semsearch::AbortException
LOG(ERROR) << "Failed to compute Operation result for:" << endl;
LOG(ERROR) << asString() << endl;
LOG(ERROR) << e.what() << endl;
newResult->_resTable->abort();
// We are in the innermost level of the exception, so print
abort(newResult, true);
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException(e);
} catch (...) {
// Only print the Operation at the innermost (original) failure
// then create not so weird AbortException
LOG(ERROR) << "Failed to compute Operation result for:" << endl;
LOG(ERROR) << asString() << endl;
// We are in the innermost level of the exception, so print
abort(newResult, true);
LOG(ERROR) << "WEIRD_EXCEPTION not inheriting from std::exception"
<< endl;
newResult->_resTable->abort();
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException("WEIRD_EXCEPTION");
Expand All @@ -93,11 +91,12 @@ class Operation {
newResult->_resTable->finish();
return newResult->_resTable;
}

existingResult->_resTable->awaitFinished();
if (existingResult->_resTable->status() == ResultTable::ABORTED) {
LOG(ERROR) << "Result in the cache was aborted" << endl;
LOG(ERROR) << "Operation aborted while awaiting result" << endl;
AD_THROW(ad_semsearch::Exception::BAD_QUERY,
"Operation was found aborted in the cache");
"Operation aborted while awaiting result");
}
timer.stop();
_runtimeInfo = existingResult->_runtimeInfo;
Expand All @@ -114,9 +113,23 @@ class Operation {
return existingResult->_resTable;
}

// Set the QueryExecutionContext for this particular element.
void setQueryExecutionContext(QueryExecutionContext* executionContext) {
_executionContext = executionContext;
/**
* Abort this Operation. Removes the Operation's result from the cache so
* that it can be retried. The result must be owned meaning only the
* computing thread can abort an Operation. Retrying may succeed for example
* when memory pressure was lowered in the meantime. When print is true the
* Operation is printed to the ERROR LOG
*/
void abort(const shared_ptr<CacheValue>& cachedResult, bool print) {
const std::string opString = asString();
if (print) {
LOG(ERROR) << "Aborted Operation:" << endl;
LOG(ERROR) << opString << endl;
}
// Remove Operation from cache so we may retry it later. Anyone with a live
// pointer will be waiting and register the abort.
_executionContext->getQueryTreeCache().erase(opString);
cachedResult->_resTable->abort();
}

/**
Expand Down Expand Up @@ -152,10 +165,6 @@ class Operation {
RuntimeInformation& getRuntimeInfo() { return _runtimeInfo; }

protected:
QueryExecutionContext* getExecutionContext() const {
return _executionContext;
}

// The QueryExecutionContext for this particular element.
// No ownership.
QueryExecutionContext* _executionContext;
Expand All @@ -181,6 +190,7 @@ class Operation {
virtual void computeResult(ResultTable* result) = 0;

vector<size_t> _resultSortedColumns;
bool _hasComputedSortColumns;
RuntimeInformation _runtimeInfo;

bool _hasComputedSortColumns;
};
15 changes: 10 additions & 5 deletions src/engine/QueryExecutionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,22 @@ typedef ad_utility::LRUCache<string, CacheValue> SubtreeCache;
// Holds references to index and engine, implements caching.
class QueryExecutionContext {
public:
QueryExecutionContext(const Index& index, const Engine& engine)
: _subtreeCache(NOF_SUBTREES_TO_CACHE),
QueryExecutionContext(const Index& index, const Engine& engine,
SubtreeCache* const cache,
const bool pinSubtrees = false)
: pin(pinSubtrees),
_index(index),
_engine(engine),
_subtreeCache(cache),
_costFactors() {}

SubtreeCache& getQueryTreeCache() { return _subtreeCache; }
SubtreeCache& getQueryTreeCache() { return *_subtreeCache; }

const Engine& getEngine() const { return _engine; }

const Index& getIndex() const { return _index; }

void clearCache() { _subtreeCache.clear(); }
void clearCache() { getQueryTreeCache().clear(); }

void readCostFactorsFromTSVFile(const string& fileName) {
_costFactors.readFromFile(fileName);
Expand All @@ -54,9 +57,11 @@ class QueryExecutionContext {
return _costFactors.getCostFactor(key);
};

const bool pin;

private:
SubtreeCache _subtreeCache;
const Index& _index;
const Engine& _engine;
SubtreeCache* const _subtreeCache;
QueryPlanningCostFactors _costFactors;
};
11 changes: 6 additions & 5 deletions src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
using std::string;

// _____________________________________________________________________________
QueryExecutionTree::QueryExecutionTree(QueryExecutionContext* qec)
QueryExecutionTree::QueryExecutionTree(QueryExecutionContext* const qec)
: _qec(qec),
_variableColumnMap(),
_rootOperation(nullptr),
Expand Down Expand Up @@ -53,9 +53,9 @@ string QueryExecutionTree::asString(size_t indent) {

// _____________________________________________________________________________
void QueryExecutionTree::setOperation(QueryExecutionTree::OperationType type,
std::shared_ptr<Operation> op) {
std::unique_ptr<Operation> op) {
_type = type;
_rootOperation = op;
_rootOperation = std::move(op);
_asString = "";
_sizeEstimate = std::numeric_limits<size_t>::max();
// with setting the operation the initialization is done and we can try to
Expand All @@ -71,11 +71,12 @@ void QueryExecutionTree::setVariableColumn(const string& variable,

// _____________________________________________________________________________
size_t QueryExecutionTree::getVariableColumn(const string& variable) const {
if (_variableColumnMap.count(variable) == 0) {
const auto mapIt = _variableColumnMap.find(variable);
if (mapIt == _variableColumnMap.end()) {
AD_THROW(ad_semsearch::Exception::CHECK_FAILED,
"Variable could not be mapped to result column. Var: " + variable);
}
return _variableColumnMap.find(variable)->second;
return mapIt->second;
}

// _____________________________________________________________________________
Expand Down