Skip to content

Commit

Permalink
Merge 1c85b1f into f2ece56
Browse files Browse the repository at this point in the history
  • Loading branch information
niklas88 committed Oct 1, 2019
2 parents f2ece56 + 1c85b1f commit cb8e8f3
Show file tree
Hide file tree
Showing 19 changed files with 616 additions and 367 deletions.
9 changes: 5 additions & 4 deletions src/SparqlEngineMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,15 @@ int main(int argc, char** argv) {
try {
Engine engine;
Index index;
SubtreeCache cache(NOF_SUBTREES_TO_CACHE);
index.setUsePatterns(usePatterns);
index.setOnDiskLiterals(onDiskLiterals);
index.createFromOnDiskIndex(indexName);
if (text) {
index.addTextFromOnDiskIndex();
}

QueryExecutionContext qec(index, engine);
QueryExecutionContext qec(index, engine, &cache);
if (costFactosFileName.size() > 0) {
qec.readCostFactorsFromTSVFile(costFactosFileName);
}
Expand Down Expand Up @@ -201,7 +202,7 @@ void processQuery(QueryExecutionContext& qec, const string& query) {
auto qet = qp.createExecutionTree(pq);
timer.stop();
LOG(INFO) << "Time to create Execution Tree: " << timer.msecs() << "ms\n";
LOG(INFO) << "Execution Tree: " << qet.asString() << "ms\n";
LOG(INFO) << "Execution Tree: " << qet->asString() << "ms\n";
size_t limit = MAX_NOF_ROWS_IN_RESULT;
size_t offset = 0;
if (pq._limit.size() > 0) {
Expand All @@ -210,10 +211,10 @@ void processQuery(QueryExecutionContext& qec, const string& query) {
if (pq._offset.size() > 0) {
offset = static_cast<size_t>(atol(pq._offset.c_str()));
}
qet.writeResultToStream(cout, pq._selectedVariables, limit, offset);
qet->writeResultToStream(cout, pq._selectedVariables, limit, offset);
t.stop();
std::cout << "\nDone. Time: " << t.usecs() / 1000.0 << " ms\n";
size_t numMatches = qet.getResult()->size();
size_t numMatches = qet->getResult()->size();
std::cout << "\nNumber of matches (no limit): " << numMatches << "\n";
size_t effectiveLimit =
atoi(pq._limit.c_str()) > 0 ? atoi(pq._limit.c_str()) : numMatches;
Expand Down
5 changes: 3 additions & 2 deletions src/WriteIndexListsMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ int main(int argc, char** argv) {
index.dumpAsciiLists(lists, decodeGapsAndFrequency);

Engine engine;
QueryExecutionContext qec(index, engine);
SubtreeCache cache(NOF_SUBTREES_TO_CACHE);
QueryExecutionContext qec(index, engine, &cache);
ParsedQuery q;
if (!freebase) {
q = SparqlParser("SELECT ?x WHERE {?x <is-a> <Scientist>}").parse();
Expand All @@ -100,7 +101,7 @@ int main(int argc, char** argv) {
}
QueryPlanner queryPlanner(&qec);
auto qet = queryPlanner.createExecutionTree(q);
const auto res = qet.getResult();
const auto res = qet->getResult();
AD_CHECK(res->size() > 0);
AD_CHECK(res->_data.cols() == 1);
string personlistFile = indexName + ".list.scientists";
Expand Down
99 changes: 89 additions & 10 deletions src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,97 @@ vector<size_t> IndexScan::resultSortedOn() const {
// _____________________________________________________________________________
ad_utility::HashMap<string, size_t> IndexScan::getVariableColumns() const {
ad_utility::HashMap<string, size_t> res;
size_t colIdx = 0;
if (_subject[0] == '?') {
res[_subject] = colIdx++;
}
if (_predicate[0] == '?') {
res[_predicate] = colIdx++;
}
size_t col = 0;

switch (_type) {
case SPO_FREE_P:
case FULL_INDEX_SCAN_SPO:
if (_subject[0] == '?') {
res[_subject] = col++;
}
if (_predicate[0] == '?') {
res[_predicate] = col++;
}

if (_object[0] == '?') {
res[_object] = col++;
}
return res;
case SOP_FREE_O:
case SOP_BOUND_O:
case FULL_INDEX_SCAN_SOP:
if (_subject[0] == '?') {
res[_subject] = col++;
}

if (_object[0] == '?') {
res[_object] = col++;
}

if (_predicate[0] == '?') {
res[_predicate] = col++;
}
return res;
case PSO_BOUND_S:
case PSO_FREE_S:
case FULL_INDEX_SCAN_PSO:
if (_predicate[0] == '?') {
res[_predicate] = col++;
}
if (_subject[0] == '?') {
res[_subject] = col++;
}

if (_object[0] == '?') {
res[_object] = col++;
}
return res;
case POS_BOUND_O:
case POS_FREE_O:
case FULL_INDEX_SCAN_POS:
if (_predicate[0] == '?') {
res[_predicate] = col++;
}

if (_object[0] == '?') {
res[_object] = col++;
}

if (_subject[0] == '?') {
res[_subject] = col++;
}
return res;
case OPS_FREE_P:
case FULL_INDEX_SCAN_OPS:
if (_object[0] == '?') {
res[_object] = col++;
}

if (_object[0] == '?') {
res[_object] = colIdx++;
if (_predicate[0] == '?') {
res[_predicate] = col++;
}

if (_subject[0] == '?') {
res[_subject] = col++;
}
return res;
case OSP_FREE_S:
case FULL_INDEX_SCAN_OSP:
if (_object[0] == '?') {
res[_object] = col++;
}

if (_subject[0] == '?') {
res[_subject] = col++;
}

if (_predicate[0] == '?') {
res[_predicate] = col++;
}
return res;
default:
AD_THROW(ad_semsearch::Exception::CHECK_FAILED, "Should be unreachable.");
}
return res;
}
// _____________________________________________________________________________
void IndexScan::computeResult(ResultTable* result) {
Expand Down
66 changes: 38 additions & 28 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ using std::shared_ptr;
class Operation {
public:
// Default Constructor.
Operation() : _executionContext(NULL), _hasComputedSortColumns(false) {}
Operation() : _executionContext(nullptr), _hasComputedSortColumns(false) {}

// Typical Constructor.
explicit Operation(QueryExecutionContext* executionContext)
Expand All @@ -40,10 +40,14 @@ class Operation {
shared_ptr<const ResultTable> getResult() {
ad_utility::Timer timer;
timer.start();
auto& cache = _executionContext->getQueryTreeCache();
const string cacheKey = asString();
const bool pinResult = _executionContext->pin;
LOG(TRACE) << "Check cache for Operation result" << endl;
LOG(TRACE) << "Using key: \n" << asString() << endl;
auto [newResult, existingResult] =
_executionContext->getQueryTreeCache().tryEmplace(asString());
LOG(TRACE) << "Using key: \n" << cacheKey << endl;
auto [newResult, existingResult] = (pinResult)
? cache.tryEmplacePinned(cacheKey)
: cache.tryEmplace(cacheKey);

if (newResult) {
LOG(TRACE) << "Not in the cache, need to compute result" << endl;
Expand All @@ -52,28 +56,22 @@ class Operation {
try {
computeResult(newResult->_resTable.get());
} catch (const ad_semsearch::AbortException& e) {
newResult->_resTable->abort();
// AbortExceptions have already been printed simply rethrow to
// unwind the callstack until the whole query is aborted
// A child Operation was aborted, abort this Operation
// as well. The child already printed
abort(newResult, false);
// Continue unwinding the stack
throw;
} catch (const std::exception& e) {
// Only print the Operation at the innermost (original) failure
// then "rethrow" as special ad_semsearch::AbortException
LOG(ERROR) << "Failed to compute Operation result for:" << endl;
LOG(ERROR) << asString() << endl;
LOG(ERROR) << e.what() << endl;
newResult->_resTable->abort();
// We are in the innermost level of the exception, so print
abort(newResult, true);
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException(e);
} catch (...) {
// Only print the Operation at the innermost (original) failure
// then create not so weird AbortException
LOG(ERROR) << "Failed to compute Operation result for:" << endl;
LOG(ERROR) << asString() << endl;
// We are in the innermost level of the exception, so print
abort(newResult, true);
LOG(ERROR) << "WEIRD_EXCEPTION not inheriting from std::exception"
<< endl;
newResult->_resTable->abort();
// Rethrow as QUERY_ABORTED allowing us to print the Operation
// only at innermost failure of a recursive call
throw ad_semsearch::AbortException("WEIRD_EXCEPTION");
Expand All @@ -93,11 +91,12 @@ class Operation {
newResult->_resTable->finish();
return newResult->_resTable;
}

existingResult->_resTable->awaitFinished();
if (existingResult->_resTable->status() == ResultTable::ABORTED) {
LOG(ERROR) << "Result in the cache was aborted" << endl;
LOG(ERROR) << "Operation aborted while awaiting result" << endl;
AD_THROW(ad_semsearch::Exception::BAD_QUERY,
"Operation was found aborted in the cache");
"Operation aborted while awaiting result");
}
timer.stop();
_runtimeInfo = existingResult->_runtimeInfo;
Expand All @@ -114,9 +113,23 @@ class Operation {
return existingResult->_resTable;
}

// Set the QueryExecutionContext for this particular element.
void setQueryExecutionContext(QueryExecutionContext* executionContext) {
_executionContext = executionContext;
/**
* Abort this Operation. Removes the Operation's result from the cache so
* that it can be retried. The result must be owned meaning only the
* computing thread can abort an Operation. Retrying may succeed for example
* when memory pressure was lowered in the meantime. When print is true the
* Operation is printed to the ERROR LOG
*/
void abort(const shared_ptr<CacheValue>& cachedResult, bool print) {
const std::string opString = asString();
if (print) {
LOG(ERROR) << "Aborted Operation:" << endl;
LOG(ERROR) << opString << endl;
}
// Remove Operation from cache so we may retry it later. Anyone with a live
// pointer will be waiting and register the abort.
_executionContext->getQueryTreeCache().erase(opString);
cachedResult->_resTable->abort();
}

/**
Expand Down Expand Up @@ -152,10 +165,6 @@ class Operation {
RuntimeInformation& getRuntimeInfo() { return _runtimeInfo; }

protected:
QueryExecutionContext* getExecutionContext() const {
return _executionContext;
}

// The QueryExecutionContext for this particular element.
// No ownership.
QueryExecutionContext* _executionContext;
Expand All @@ -181,6 +190,7 @@ class Operation {
virtual void computeResult(ResultTable* result) = 0;

vector<size_t> _resultSortedColumns;
bool _hasComputedSortColumns;
RuntimeInformation _runtimeInfo;

bool _hasComputedSortColumns;
};
15 changes: 10 additions & 5 deletions src/engine/QueryExecutionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,22 @@ typedef ad_utility::LRUCache<string, CacheValue> SubtreeCache;
// Holds references to index and engine, implements caching.
class QueryExecutionContext {
public:
QueryExecutionContext(const Index& index, const Engine& engine)
: _subtreeCache(NOF_SUBTREES_TO_CACHE),
QueryExecutionContext(const Index& index, const Engine& engine,
SubtreeCache* const cache,
const bool pinSubtrees = false)
: pin(pinSubtrees),
_index(index),
_engine(engine),
_subtreeCache(cache),
_costFactors() {}

SubtreeCache& getQueryTreeCache() { return _subtreeCache; }
SubtreeCache& getQueryTreeCache() { return *_subtreeCache; }

const Engine& getEngine() const { return _engine; }

const Index& getIndex() const { return _index; }

void clearCache() { _subtreeCache.clear(); }
void clearCache() { getQueryTreeCache().clear(); }

void readCostFactorsFromTSVFile(const string& fileName) {
_costFactors.readFromFile(fileName);
Expand All @@ -54,9 +57,11 @@ class QueryExecutionContext {
return _costFactors.getCostFactor(key);
};

const bool pin;

private:
SubtreeCache _subtreeCache;
const Index& _index;
const Engine& _engine;
SubtreeCache* const _subtreeCache;
QueryPlanningCostFactors _costFactors;
};
11 changes: 6 additions & 5 deletions src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
using std::string;

// _____________________________________________________________________________
QueryExecutionTree::QueryExecutionTree(QueryExecutionContext* qec)
QueryExecutionTree::QueryExecutionTree(QueryExecutionContext* const qec)
: _qec(qec),
_variableColumnMap(),
_rootOperation(nullptr),
Expand Down Expand Up @@ -53,9 +53,9 @@ string QueryExecutionTree::asString(size_t indent) {

// _____________________________________________________________________________
void QueryExecutionTree::setOperation(QueryExecutionTree::OperationType type,
std::shared_ptr<Operation> op) {
std::unique_ptr<Operation> op) {
_type = type;
_rootOperation = op;
_rootOperation = std::move(op);
_asString = "";
_sizeEstimate = std::numeric_limits<size_t>::max();
// with setting the operation the initialization is done and we can try to
Expand All @@ -71,11 +71,12 @@ void QueryExecutionTree::setVariableColumn(const string& variable,

// _____________________________________________________________________________
size_t QueryExecutionTree::getVariableColumn(const string& variable) const {
if (_variableColumnMap.count(variable) == 0) {
const auto mapIt = _variableColumnMap.find(variable);
if (mapIt == _variableColumnMap.end()) {
AD_THROW(ad_semsearch::Exception::CHECK_FAILED,
"Variable could not be mapped to result column. Var: " + variable);
}
return _variableColumnMap.find(variable)->second;
return mapIt->second;
}

// _____________________________________________________________________________
Expand Down

0 comments on commit cb8e8f3

Please sign in to comment.