Skip to content

Commit

Permalink
query result cache: add cacheUse() function to AqlQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
ObiWahn committed Aug 30, 2017
1 parent ad103f1 commit f5c99f7
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 56 deletions.
98 changes: 45 additions & 53 deletions arangod/Aql/Query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ Query::~Query() {

Result Query::cacheStart(){
Result rv;
if(! _cacheResultBuilder){
_cacheResultBuilder = std::make_shared<VPackBuilder>();
_cacheResultBuilder->buffer()->reserve( 16 * 1024); // reserve some space in Builder to avoid frequent reallocs
_cacheResultBuilder->openArray();
if(! _resultBuilder){
_resultBuilder = std::make_shared<VPackBuilder>();
_resultBuilder->buffer()->reserve( 16 * 1024); // reserve some space in Builder to avoid frequent reallocs
_resultBuilder->openArray();
} else {
return rv.reset(TRI_ERROR_INTERNAL,"cache entry already open");
}
Expand All @@ -251,7 +251,7 @@ Result Query::cacheAdd(AqlItemBlock const& value){
for (size_t i = 0; i < n; ++i) {
AqlValue const& val = value.getValueReference(i, resultRegister);
if (!val.isEmpty()) {
val.toVelocyPack(_trx, *_cacheResultBuilder, true);
val.toVelocyPack(_trx, *_resultBuilder, true);
}
}
return rv;
Expand All @@ -275,7 +275,7 @@ Result Query::cacheAdd(AqlItemBlock const& value
}

if(canCache) {
val.toVelocyPack(_trx, *_cacheResultBuilder, true);
val.toVelocyPack(_trx, *_resultBuilder, true);
}
}
}
Expand All @@ -284,19 +284,43 @@ Result Query::cacheAdd(AqlItemBlock const& value

Result Query::cacheStore(uint64_t queryHash){
Result rv;
_cacheResultBuilder->close();
_resultBuilder->close();
if (_warnings.empty()) {
// finally store the generated result in the query cache
auto result = QueryCache::instance()->store(
_vocbase, queryHash, _queryString,
_cacheResultBuilder, _trx->state()->collectionNames());
_resultBuilder, _trx->state()->collectionNames());

if (result == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
}
}
return rv;
}

Result Query::cacheUse(uint64_t queryHash){
Result rv;
auto cacheEntry = arangodb::aql::QueryCache::instance()->lookup( _vocbase, queryHash, _queryString);
arangodb::aql::QueryCacheResultEntryGuard guard(cacheEntry);

if (cacheEntry != nullptr) {
// got a result from the query cache
if(ExecContext::CURRENT != nullptr) {
AuthInfo* info = AuthenticationFeature::INSTANCE->authInfo();
for (std::string const& collectionName : cacheEntry->_collections) {
if (info->canUseCollection(ExecContext::CURRENT->user(),
ExecContext::CURRENT->database(),
collectionName) == AuthLevel::NONE) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_FORBIDDEN);
}
}
}
_cachedResultBuilder = cacheEntry->_queryResult;
_cachedResultIterator = std::unique_ptr<VPackArrayIterator>( new VPackArrayIterator(_cachedResultBuilder->slice()));
}
return rv;
}

/// @brief clone a query
/// note: as a side-effect, this will also create and start a transaction for
/// the query
Expand All @@ -305,6 +329,7 @@ Query* Query::clone(QueryPart part, bool withPlan) {
std::make_unique<Query>(false, _vocbase, _queryString,
std::shared_ptr<VPackBuilder>(), _options, part);

//TODO FIXME - does this function need to be updated
clone->_resourceMonitor = _resourceMonitor;
clone->_resourceMonitor.clear();

Expand Down Expand Up @@ -616,32 +641,20 @@ QueryResult Query::execute(QueryRegistry* registry) {

// check the query cache for an existing result
// and use the result to answer

if (canReadFromQueryCache()) {
auto cacheEntry = arangodb::aql::QueryCache::instance()->lookup(
_vocbase, queryHash, _queryString);
arangodb::aql::QueryCacheResultEntryGuard guard(cacheEntry);
cacheUse(queryHash);

if (cacheEntry != nullptr) {
if (_cachedResultBuilder) {
// got a result from the query cache
if(ExecContext::CURRENT != nullptr) {
AuthInfo* info = AuthenticationFeature::INSTANCE->authInfo();
for (std::string const& collectionName : cacheEntry->_collections) {
if (info->canUseCollection(ExecContext::CURRENT->user(),
ExecContext::CURRENT->database(),
collectionName) == AuthLevel::NONE) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_FORBIDDEN);
}
}
}

QueryResult res;
// we don't have yet a transaction when we're here, so let's create
// a mimimal context to build the result
res.context = std::make_shared<transaction::StandaloneContext>(_vocbase);

res.warnings = warningsToVelocyPack();
TRI_ASSERT(cacheEntry->_queryResult != nullptr);
res.result = cacheEntry->_queryResult;
res.result = std::move(_cachedResultBuilder);
res.cached = true;
return res;
}
Expand All @@ -662,15 +675,13 @@ QueryResult Query::execute(QueryRegistry* registry) {

log();


VPackOptions options = VPackOptions::Defaults;
options.buildUnindexedArrays = true;
options.buildUnindexedObjects = true;

TRI_ASSERT(_engine != nullptr);

// this is the RegisterId our results can be found in
std::shared_ptr<VPackBuilder> resultBuilder(nullptr);

AqlItemBlock* value = nullptr; //must be ouside try so it can be deleted by catch
try {
Expand All @@ -688,11 +699,8 @@ QueryResult Query::execute(QueryRegistry* registry) {
if (canWriteToQueryCache()) {
cacheStore(queryHash);
THROW_ARANGO_EXCEPTION_RESULT(rv);
resultBuilder = _cacheResultBuilder;
} else {
_cacheResultBuilder->close(); //close as is not done by store
resultBuilder = std::move(_cacheResultBuilder);
_cacheResultBuilder.reset();
_resultBuilder->close(); //close as is not done by store
}
} catch (...) {
delete value;
Expand Down Expand Up @@ -720,7 +728,8 @@ QueryResult Query::execute(QueryRegistry* registry) {
cleanupPlanAndEngine(TRI_ERROR_NO_ERROR, stats.get());

result.warnings = warningsToVelocyPack();
result.result = std::move(resultBuilder);
result.result = std::move(_resultBuilder);
_resultBuilder.reset();
result.stats = std::move(stats);

// patch stats in place
Expand Down Expand Up @@ -775,38 +784,22 @@ QueryResultV8 Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry) {
uint64_t queryHash = hash();

if (canReadFromQueryCache()) {
// check the query cache for an existing result
auto cacheEntry = arangodb::aql::QueryCache::instance()->lookup(
_vocbase, queryHash, _queryString);
arangodb::aql::QueryCacheResultEntryGuard guard(cacheEntry);
cacheUse(queryHash);

if (cacheEntry != nullptr) {
// got a result from the query cache
if(ExecContext::CURRENT != nullptr) {
AuthInfo* info = AuthenticationFeature::INSTANCE->authInfo();
for (std::string const& collectionName : cacheEntry->_collections) {
if (info->canUseCollection(ExecContext::CURRENT->user(),
ExecContext::CURRENT->database(),
collectionName) == AuthLevel::NONE) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_FORBIDDEN);
}
}
}
if (_cachedResultBuilder) {

QueryResultV8 result;
// we don't have yet a transaction when we're here, so let's create
// a mimimal context to build the result
result.context = std::make_shared<transaction::StandaloneContext>(_vocbase);

v8::Handle<v8::Value> values =
TRI_VPackToV8(isolate, cacheEntry->_queryResult->slice(),
result.context->getVPackOptions());
v8::Handle<v8::Value> values = TRI_VPackToV8(isolate, _cachedResultBuilder->slice(), result.context->getVPackOptions());
TRI_ASSERT(values->IsArray());
result.result = v8::Handle<v8::Array>::Cast(values);
result.cached = true;
return result;
}
}
} // useQueryCache

// will throw if it fails
prepare(registry, queryHash);
Expand Down Expand Up @@ -848,8 +841,7 @@ QueryResultV8 Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry) {
if (canCache) {
cacheStore(queryHash);
THROW_ARANGO_EXCEPTION_RESULT(rv);
} else {
_cacheResultBuilder.reset();
_resultBuilder.reset();
}
} catch (...) {
LOG_TOPIC(DEBUG, Logger::QUERIES) << TRI_microtime() - _startTime << " "
Expand Down
9 changes: 6 additions & 3 deletions arangod/Aql/Query.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class Query {
Result cacheAdd(AqlItemBlock const&, v8::Isolate* isolate, QueryResultV8&, bool canCache = true);

Result cacheStore(uint64_t queryHash);
Result cacheUse(uint64_t queryHash); //fills _cachedResultBuilder member variable and initalizes / Array Iterator

/// @brief Inject a transaction from outside. Use with care!
void injectTransaction (transaction::Methods* trx) {
Expand Down Expand Up @@ -351,9 +352,11 @@ class Query {
/// @brief query options
std::shared_ptr<arangodb::velocypack::Builder> _options;

/// @brief cacheResultBuilder
std::shared_ptr<arangodb::velocypack::Builder> _cacheResultBuilder;
std::string _cacheResultIdString;
/// @brief cachedResultBuilder
std::shared_ptr<arangodb::velocypack::Builder> _resultBuilder;
std::shared_ptr<arangodb::velocypack::Builder> _cachedResultBuilder;
std::unique_ptr<velocypack::ArrayIterator> _cachedResultIterator;
std::string _cachedResultIdString;

/// @brief query options
QueryOptions _queryOptions;
Expand Down

0 comments on commit f5c99f7

Please sign in to comment.