Skip to content

Commit

Permalink
make use of cachecheck in RestAqlHandler, change logging and fix other
Browse files Browse the repository at this point in the history
  • Loading branch information
ObiWahn committed Sep 25, 2017
1 parent 1260768 commit 31ddbc1
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 32 deletions.
2 changes: 1 addition & 1 deletion arangod/Aql/AqlQueryResultCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ using EN = arangodb::aql::ExecutionNode;
std::string fakeQueryString(ExecutionPlan const* subPlan){
std::string result;
bool stringValid = subPlan->root()->fakeQueryString(result);
//LOG_TOPIC(ERR, Logger::FIXME) << "### ### subPlan String: '" << result <<"' " << std::boolalpha << stringValid;
LOG_DEVEL_IF(stringValid) << " #COORDINATOR# subPlan String: '" << result <<"'";
if (!stringValid) {
result.clear();
}
Expand Down
5 changes: 3 additions & 2 deletions arangod/Aql/ExecutionNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1451,8 +1451,9 @@ SubqueryNode::SubqueryNode(ExecutionPlan* plan,
_outVariable(Variable::varFromVPack(plan->getAst(), base, "outVariable")) {}

bool SubqueryNode::fakeQueryStringThisNode(std::string& outString) const {
outString.append("SU",2);
return _subquery->fakeQueryString(outString);
return false;
//outString.append("SU",2);
//return _subquery->fakeQueryString(outString);
}
/// @brief toVelocyPack, for SubqueryNode
void SubqueryNode::toVelocyPackHelper(VPackBuilder& nodes, bool verbose) const {
Expand Down
39 changes: 28 additions & 11 deletions arangod/Aql/Query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,13 @@ Query::~Query() {
AqlFeature::unlease();
}

Result Query::resultStart(){
Result Query::resultStart(bool checkCache){
Result rv;

if (checkCache) {
_invaidationCounters = QueryCache::instance()->getInvalidationCounters(_vocbase,this->collectionNames());
}

if(! _resultBuilder){
VPackOptions options = VPackOptions::Defaults;
options.buildUnindexedArrays = true;
Expand Down Expand Up @@ -287,20 +292,24 @@ Result Query::resultAddComplete(AqlItemBlock const& value
return rv;
}

Result Query::cacheStore(uint64_t queryHash){
Result Query::cacheStore(uint64_t queryHash, bool checkCache){
Result rv;
TRI_ASSERT(_resultBuilder != nullptr);
TRI_ASSERT(_resultBuilder->isOpenArray());
_resultBuilder->close();
if (_warnings.empty()) {
// finally store the generated result in the query cache
//LOG_DEVEL << queryHash << " query - store: '" << _queryString; // << "' contents: @@@" << _resultBuilder->slice().toJson() << "@@@";
auto result = QueryCache::instance()->store(
_vocbase, queryHash, _queryString,
_resultBuilder, _trx->state()->collectionNames());

if (result == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
auto* cache = QueryCache::instance();

if(!checkCache || cache->getInvalidationCounters(_vocbase, this->collectionNames()) == _invaidationCounters){
// finally store the generated result in the query cache
//LOG_DEVEL << queryHash << " query - store: '" << _queryString; // << "' contents: @@@" << _resultBuilder->slice().toJson() << "@@@";
auto result = cache->store(
_vocbase, queryHash, _queryString,
_resultBuilder, _trx->state()->collectionNames());

if (result == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
}
}
}
return rv;
Expand Down Expand Up @@ -379,10 +388,18 @@ bool Query::cacheExhausted(){
return ! _cachedResultIterator->valid();
}

void Query::cacheCursorReset(){
Result Query::cacheCursorReset(std::size_t pos){
Result rv;
if (this->cacheEntryAvailable()){
_cachedResultIterator.reset(new VPackArrayIterator(_cachedResultBuilder->slice()));
while (pos--) {
_cachedResultIterator->next();
if (!_cachedResultIterator->valid()){
rv.reset(TRI_ERROR_BAD_PARAMETER, "could not advance cursor to requested position");
}
}
}
return rv;
}


Expand Down
7 changes: 4 additions & 3 deletions arangod/Aql/Query.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ class Query {

//// Cache Operations
/// Create and Finalize Cache
Result resultStart();
Result cacheStore(uint64_t queryHash);
Result resultStart(bool checkCache = false);
Result cacheStore(uint64_t queryHash, bool checkCache = false);

/// Add Items to Cache
//Result cacheAdd(VPackSlice const&); //single doc
Expand All @@ -128,7 +128,7 @@ class Query {
bool cacheBuildingResult() { if (_resultBuilder){return true;} return false;}
Result cacheGetOrSkipSomePart(VPackBuilder& builder, bool skip, std::size_t atLeast, std::size_t atMost);
bool cacheExhausted();
void cacheCursorReset();
Result cacheCursorReset(std::size_t pos = 0);

/// @brief Inject a transaction from outside. Use with care!
void injectTransaction (transaction::Methods* trx) {
Expand Down Expand Up @@ -359,6 +359,7 @@ class Query {
std::shared_ptr<arangodb::velocypack::Builder> _cachedResultBuilder;
std::unique_ptr<velocypack::ArrayIterator> _cachedResultIterator;
std::string _cachedResultIdString;
std::vector<uint64_t> _invaidationCounters;

/// @brief query options
QueryOptions _queryOptions;
Expand Down
39 changes: 24 additions & 15 deletions arangod/Aql/RestAqlHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void RestAqlHandler::createQueryFromVelocyPack() {
if (doCache && !cacheEntryFound && isDBServerInCluster() && query->cacheId()){ // enable caching only for DbServers in Cluster
TRI_ASSERT(cacheId == query->cacheId());
//LOG_DEVEL << cacheId << " starting new cache - isDBServer: " << isDBServerInCluster;
query->resultStart(); //open cache entry
query->resultStart(/*enable cache checking*/ true); //open cache entry
}
query.release();
} catch (...) {
Expand Down Expand Up @@ -899,7 +899,7 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
} else if (operation == "initialize") {
if(query->cacheEntryAvailable()){
answerBuilder.add("error", VPackValue(false));
answerBuilder.add("code", VPackValue(static_cast<double>(200)));
answerBuilder.add("code", VPackValue(0));
query->cacheCursorReset();
} else {
int res;
Expand All @@ -915,18 +915,22 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
return;
}
answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR));
answerBuilder.add("code", VPackValue(static_cast<double>(res)));
answerBuilder.add("code", VPackValue(res));
}
} else if (operation == "initializeCursor") {
auto pos = VelocyPackHelper::getNumericValue<size_t>(querySlice, "pos", 0);
if(query->cacheEntryAvailable()){
if (VelocyPackHelper::getBooleanValue(querySlice, "exhausted", true)) {
query->cacheCursorReset();
} else {
//query->cacheCursorReset(pos);
}
answerBuilder.add("error", VPackValue(true));
answerBuilder.add("code", VPackValue(static_cast<double>(200)));
Result rv;
if (VelocyPackHelper::getBooleanValue(querySlice, "exhausted", true)) {
rv = query->cacheCursorReset();
} else {
LOG_DEVEL << " #### position pos; " << pos;
TRI_ASSERT(false);
rv = query->cacheCursorReset(pos);
}
answerBuilder.add("error", VPackValue(rv.fail()));
answerBuilder.add("code", VPackValue(rv.errorNumber()));
answerBuilder.add("message", VPackValue(rv.errorMessage()));
} else {
std::unique_ptr<AqlItemBlock> items;
int res;
Expand All @@ -936,6 +940,10 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
res = query->engine()->initializeCursor(nullptr, 0);
} else {
items.reset(new AqlItemBlock(query->resourceMonitor(), querySlice.get("items")));
if(items || pos){
//LOG_DEVEL << "dealing with subquery";
query->resultCancel(); //we need to cancel as we are dealing with something that has a subquery
}
res = query->engine()->initializeCursor(items.get(), pos);
}
} catch (arangodb::basics::Exception const& ex) {
Expand All @@ -948,7 +956,7 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
return;
}
answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR));
answerBuilder.add("code", VPackValue(static_cast<double>(res)));
answerBuilder.add("code", VPackValue(res));
}
} else if (operation == "shutdown") {
int res = TRI_ERROR_INTERNAL;
Expand All @@ -959,7 +967,7 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
if(query->cacheBuildingResult()){
TRI_ASSERT(query->cacheId());
//LOG_DEVEL << query->cacheId() << " handler - shutdown store";
query->cacheStore(query->cacheId());
query->cacheStore(query->cacheId(), /*check cache invalidation*/ true);
}

res = query->engine()->shutdown(
Expand Down Expand Up @@ -995,9 +1003,10 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
return;
}
} // answerBuilder guard scope
//LOG_DEVEL_IF( operation == "getSome") << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!";
//LOG_DEVEL_IF( operation == "getSome") << "get some result: " << answerBuilder.slice().toJson();
//LOG_DEVEL_IF( operation == "getSome") << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!";
bool pred = true;
LOG_DEVEL_IF( pred ) << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!";
LOG_DEVEL_IF( pred ) << operation; // << ": " << answerBuilder.slice().toJson();
LOG_DEVEL_IF( pred ) << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!";
sendResponse(rest::ResponseCode::OK, answerBuilder.slice(),
transactionContext.get());
} catch (arangodb::basics::Exception const& e) {
Expand Down

0 comments on commit 31ddbc1

Please sign in to comment.